1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/base/upload_data_stream.h"
6
7 #include "base/logging.h"
8 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h"
10 #include "net/base/upload_bytes_element_reader.h"
11 #include "net/base/upload_element_reader.h"
12
13 namespace net {
14
UploadDataStream(ScopedVector<UploadElementReader> element_readers,int64 identifier)15 UploadDataStream::UploadDataStream(
16 ScopedVector<UploadElementReader> element_readers,
17 int64 identifier)
18 : element_readers_(element_readers.Pass()),
19 element_index_(0),
20 total_size_(0),
21 current_position_(0),
22 identifier_(identifier),
23 is_chunked_(false),
24 last_chunk_appended_(false),
25 read_failed_(false),
26 initialized_successfully_(false),
27 weak_ptr_factory_(this) {
28 }
29
UploadDataStream(Chunked,int64 identifier)30 UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier)
31 : element_index_(0),
32 total_size_(0),
33 current_position_(0),
34 identifier_(identifier),
35 is_chunked_(true),
36 last_chunk_appended_(false),
37 read_failed_(false),
38 initialized_successfully_(false),
39 weak_ptr_factory_(this) {
40 }
41
~UploadDataStream()42 UploadDataStream::~UploadDataStream() {
43 }
44
CreateWithReader(scoped_ptr<UploadElementReader> reader,int64 identifier)45 UploadDataStream* UploadDataStream::CreateWithReader(
46 scoped_ptr<UploadElementReader> reader,
47 int64 identifier) {
48 ScopedVector<UploadElementReader> readers;
49 readers.push_back(reader.release());
50 return new UploadDataStream(readers.Pass(), identifier);
51 }
52
Init(const CompletionCallback & callback)53 int UploadDataStream::Init(const CompletionCallback& callback) {
54 Reset();
55 return InitInternal(0, callback);
56 }
57
Read(IOBuffer * buf,int buf_len,const CompletionCallback & callback)58 int UploadDataStream::Read(IOBuffer* buf,
59 int buf_len,
60 const CompletionCallback& callback) {
61 DCHECK(initialized_successfully_);
62 DCHECK_GT(buf_len, 0);
63 return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback);
64 }
65
IsEOF() const66 bool UploadDataStream::IsEOF() const {
67 DCHECK(initialized_successfully_);
68 if (!is_chunked_)
69 return current_position_ == total_size_;
70
71 // If the upload data is chunked, check if the last chunk is appended and all
72 // elements are consumed.
73 return element_index_ == element_readers_.size() && last_chunk_appended_;
74 }
75
IsInMemory() const76 bool UploadDataStream::IsInMemory() const {
77 // Chunks are in memory, but UploadData does not have all the chunks at
78 // once. Chunks are provided progressively with AppendChunk() as chunks
79 // are ready. Check is_chunked_ here, rather than relying on the loop
80 // below, as there is a case that is_chunked_ is set to true, but the
81 // first chunk is not yet delivered.
82 if (is_chunked_)
83 return false;
84
85 for (size_t i = 0; i < element_readers_.size(); ++i) {
86 if (!element_readers_[i]->IsInMemory())
87 return false;
88 }
89 return true;
90 }
91
AppendChunk(const char * bytes,int bytes_len,bool is_last_chunk)92 void UploadDataStream::AppendChunk(const char* bytes,
93 int bytes_len,
94 bool is_last_chunk) {
95 DCHECK(is_chunked_);
96 DCHECK(!last_chunk_appended_);
97 last_chunk_appended_ = is_last_chunk;
98
99 // Initialize a reader for the newly appended chunk. We leave |total_size_| at
100 // zero, since for chunked uploads, we may not know the total size.
101 std::vector<char> data(bytes, bytes + bytes_len);
102 UploadElementReader* reader = new UploadOwnedBytesElementReader(&data);
103 const int rv = reader->Init(net::CompletionCallback());
104 DCHECK_EQ(OK, rv);
105 element_readers_.push_back(reader);
106
107 // Resume pending read.
108 if (!pending_chunked_read_callback_.is_null()) {
109 base::Closure callback = pending_chunked_read_callback_;
110 pending_chunked_read_callback_.Reset();
111 callback.Run();
112 }
113 }
114
Reset()115 void UploadDataStream::Reset() {
116 weak_ptr_factory_.InvalidateWeakPtrs();
117 pending_chunked_read_callback_.Reset();
118 initialized_successfully_ = false;
119 read_failed_ = false;
120 current_position_ = 0;
121 total_size_ = 0;
122 element_index_ = 0;
123 }
124
InitInternal(int start_index,const CompletionCallback & callback)125 int UploadDataStream::InitInternal(int start_index,
126 const CompletionCallback& callback) {
127 DCHECK(!initialized_successfully_);
128
129 // Call Init() for all elements.
130 for (size_t i = start_index; i < element_readers_.size(); ++i) {
131 UploadElementReader* reader = element_readers_[i];
132 // When new_result is ERR_IO_PENDING, InitInternal() will be called
133 // with start_index == i + 1 when reader->Init() finishes.
134 const int result = reader->Init(
135 base::Bind(&UploadDataStream::ResumePendingInit,
136 weak_ptr_factory_.GetWeakPtr(),
137 i + 1,
138 callback));
139 if (result != OK) {
140 DCHECK(result != ERR_IO_PENDING || !callback.is_null());
141 return result;
142 }
143 }
144
145 // Finalize initialization.
146 if (!is_chunked_) {
147 uint64 total_size = 0;
148 for (size_t i = 0; i < element_readers_.size(); ++i) {
149 UploadElementReader* reader = element_readers_[i];
150 total_size += reader->GetContentLength();
151 }
152 total_size_ = total_size;
153 }
154 initialized_successfully_ = true;
155 return OK;
156 }
157
ResumePendingInit(int start_index,const CompletionCallback & callback,int previous_result)158 void UploadDataStream::ResumePendingInit(int start_index,
159 const CompletionCallback& callback,
160 int previous_result) {
161 DCHECK(!initialized_successfully_);
162 DCHECK(!callback.is_null());
163 DCHECK_NE(ERR_IO_PENDING, previous_result);
164
165 // Check the last result.
166 if (previous_result != OK) {
167 callback.Run(previous_result);
168 return;
169 }
170
171 const int result = InitInternal(start_index, callback);
172 if (result != ERR_IO_PENDING)
173 callback.Run(result);
174 }
175
ReadInternal(scoped_refptr<DrainableIOBuffer> buf,const CompletionCallback & callback)176 int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf,
177 const CompletionCallback& callback) {
178 DCHECK(initialized_successfully_);
179
180 while (!read_failed_ && element_index_ < element_readers_.size()) {
181 UploadElementReader* reader = element_readers_[element_index_];
182
183 if (reader->BytesRemaining() == 0) {
184 ++element_index_;
185 continue;
186 }
187
188 if (buf->BytesRemaining() == 0)
189 break;
190
191 int result = reader->Read(
192 buf.get(),
193 buf->BytesRemaining(),
194 base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead),
195 weak_ptr_factory_.GetWeakPtr(),
196 buf,
197 callback));
198 if (result == ERR_IO_PENDING) {
199 DCHECK(!callback.is_null());
200 return ERR_IO_PENDING;
201 }
202 ProcessReadResult(buf, result);
203 }
204
205 if (read_failed_) {
206 // Chunked transfers may only contain byte readers, so cannot have read
207 // failures.
208 DCHECK(!is_chunked_);
209
210 // If an error occured during read operation, then pad with zero.
211 // Otherwise the server will hang waiting for the rest of the data.
212 const int num_bytes_to_fill =
213 std::min(static_cast<uint64>(buf->BytesRemaining()),
214 size() - position() - buf->BytesConsumed());
215 DCHECK_LE(0, num_bytes_to_fill);
216 memset(buf->data(), 0, num_bytes_to_fill);
217 buf->DidConsume(num_bytes_to_fill);
218 }
219
220 const int bytes_copied = buf->BytesConsumed();
221 current_position_ += bytes_copied;
222 DCHECK(is_chunked_ || total_size_ >= current_position_);
223
224 if (is_chunked_ && !IsEOF() && bytes_copied == 0) {
225 DCHECK(!callback.is_null());
226 DCHECK(pending_chunked_read_callback_.is_null());
227 pending_chunked_read_callback_ =
228 base::Bind(&UploadDataStream::ResumePendingRead,
229 weak_ptr_factory_.GetWeakPtr(),
230 buf,
231 callback,
232 OK);
233 return ERR_IO_PENDING;
234 }
235
236 // Returning 0 is allowed only when IsEOF() == true.
237 DCHECK(bytes_copied != 0 || IsEOF());
238 return bytes_copied;
239 }
240
ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf,const CompletionCallback & callback,int previous_result)241 void UploadDataStream::ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf,
242 const CompletionCallback& callback,
243 int previous_result) {
244 DCHECK(!callback.is_null());
245
246 ProcessReadResult(buf, previous_result);
247
248 const int result = ReadInternal(buf, callback);
249 if (result != ERR_IO_PENDING)
250 callback.Run(result);
251 }
252
ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf,int result)253 void UploadDataStream::ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf,
254 int result) {
255 DCHECK_NE(ERR_IO_PENDING, result);
256 DCHECK(!read_failed_);
257
258 if (result >= 0)
259 buf->DidConsume(result);
260 else
261 read_failed_ = true;
262 }
263
264 } // namespace net
265