1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.urlconnection; 6 7 import org.chromium.net.UploadDataProvider; 8 import org.chromium.net.UploadDataSink; 9 10 import androidx.annotation.VisibleForTesting; 11 12 import java.io.IOException; 13 import java.net.HttpRetryException; 14 import java.nio.ByteBuffer; 15 16 /** 17 * An implementation of {@link java.io.OutputStream} to send data to a server, 18 * when {@link CronetHttpURLConnection#setChunkedStreamingMode} is used. 19 * This implementation does not buffer the entire request body in memory. 20 * It does not support rewind. Note that {@link #write} should only be called 21 * from the thread on which the {@link #mConnection} is created. 22 */ 23 @VisibleForTesting 24 public final class CronetChunkedOutputStream extends CronetOutputStream { 25 private final CronetHttpURLConnection mConnection; 26 private final MessageLoop mMessageLoop; 27 private final ByteBuffer mBuffer; 28 private final UploadDataProvider mUploadDataProvider = new UploadDataProviderImpl(); 29 private boolean mLastChunk; 30 31 /** 32 * Package protected constructor. 33 * @param connection The CronetHttpURLConnection object. 34 * @param chunkLength The chunk length of the request body in bytes. It must 35 * be a positive number. 36 */ CronetChunkedOutputStream( CronetHttpURLConnection connection, int chunkLength, MessageLoop messageLoop)37 CronetChunkedOutputStream( 38 CronetHttpURLConnection connection, int chunkLength, MessageLoop messageLoop) { 39 if (connection == null) { 40 throw new NullPointerException(); 41 } 42 if (chunkLength <= 0) { 43 throw new IllegalArgumentException("chunkLength should be greater than 0"); 44 } 45 mBuffer = ByteBuffer.allocate(chunkLength); 46 mConnection = connection; 47 mMessageLoop = messageLoop; 48 } 49 50 @Override write(int oneByte)51 public void write(int oneByte) throws IOException { 52 ensureBufferHasRemaining(); 53 mBuffer.put((byte) oneByte); 54 } 55 56 @Override write(byte[] buffer, int offset, int count)57 public void write(byte[] buffer, int offset, int count) throws IOException { 58 checkNotClosed(); 59 if (buffer.length - offset < count || offset < 0 || count < 0) { 60 throw new IndexOutOfBoundsException(); 61 } 62 int toSend = count; 63 while (toSend > 0) { 64 int sent = Math.min(toSend, mBuffer.remaining()); 65 mBuffer.put(buffer, offset + count - toSend, sent); 66 toSend -= sent; 67 // Upload mBuffer now if an entire chunk is written. 68 ensureBufferHasRemaining(); 69 } 70 } 71 72 @Override close()73 public void close() throws IOException { 74 super.close(); 75 if (!mLastChunk) { 76 // Consumer can only call close() when message loop is not running. 77 // Set mLastChunk to be true and flip mBuffer to upload its contents. 78 mLastChunk = true; 79 mBuffer.flip(); 80 } 81 } 82 83 // Below are CronetOutputStream implementations: 84 85 @Override setConnected()86 void setConnected() throws IOException { 87 // Do nothing. 88 } 89 90 @Override checkReceivedEnoughContent()91 void checkReceivedEnoughContent() throws IOException { 92 // Do nothing. 93 } 94 95 @Override getUploadDataProvider()96 UploadDataProvider getUploadDataProvider() { 97 return mUploadDataProvider; 98 } 99 100 private class UploadDataProviderImpl extends UploadDataProvider { 101 @Override getLength()102 public long getLength() { 103 return -1; 104 } 105 106 @Override read(final UploadDataSink uploadDataSink, final ByteBuffer byteBuffer)107 public void read(final UploadDataSink uploadDataSink, final ByteBuffer byteBuffer) { 108 if (byteBuffer.remaining() >= mBuffer.remaining()) { 109 byteBuffer.put(mBuffer); 110 mBuffer.clear(); 111 uploadDataSink.onReadSucceeded(mLastChunk); 112 if (!mLastChunk) { 113 // Quit message loop so embedder can write more data. 114 mMessageLoop.quit(); 115 } 116 } else { 117 int oldLimit = mBuffer.limit(); 118 mBuffer.limit(mBuffer.position() + byteBuffer.remaining()); 119 byteBuffer.put(mBuffer); 120 mBuffer.limit(oldLimit); 121 uploadDataSink.onReadSucceeded(false); 122 } 123 } 124 125 @Override rewind(UploadDataSink uploadDataSink)126 public void rewind(UploadDataSink uploadDataSink) { 127 uploadDataSink.onRewindError( 128 new HttpRetryException("Cannot retry streamed Http body", -1)); 129 } 130 } 131 132 /** 133 * If {@code mBuffer} is full, wait until it is consumed and there is 134 * space to write more data to it. 135 */ ensureBufferHasRemaining()136 private void ensureBufferHasRemaining() throws IOException { 137 if (!mBuffer.hasRemaining()) { 138 uploadBufferInternal(); 139 } 140 } 141 142 /** 143 * Helper function to upload {@code mBuffer} to the native stack. This 144 * function blocks until {@code mBuffer} is consumed and there is space to 145 * write more data. 146 */ uploadBufferInternal()147 private void uploadBufferInternal() throws IOException { 148 checkNotClosed(); 149 mBuffer.flip(); 150 mMessageLoop.loop(); 151 checkNoException(); 152 } 153 } 154