• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/base/upload_data_stream.h"
6 
7 #include "base/logging.h"
8 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h"
10 #include "net/base/upload_bytes_element_reader.h"
11 #include "net/base/upload_element_reader.h"
12 
13 namespace net {
14 
UploadDataStream(ScopedVector<UploadElementReader> element_readers,int64 identifier)15 UploadDataStream::UploadDataStream(
16     ScopedVector<UploadElementReader> element_readers,
17     int64 identifier)
18     : element_readers_(element_readers.Pass()),
19       element_index_(0),
20       total_size_(0),
21       current_position_(0),
22       identifier_(identifier),
23       is_chunked_(false),
24       last_chunk_appended_(false),
25       read_failed_(false),
26       initialized_successfully_(false),
27       weak_ptr_factory_(this) {
28 }
29 
UploadDataStream(Chunked,int64 identifier)30 UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier)
31     : element_index_(0),
32       total_size_(0),
33       current_position_(0),
34       identifier_(identifier),
35       is_chunked_(true),
36       last_chunk_appended_(false),
37       read_failed_(false),
38       initialized_successfully_(false),
39       weak_ptr_factory_(this) {
40 }
41 
~UploadDataStream()42 UploadDataStream::~UploadDataStream() {
43 }
44 
CreateWithReader(scoped_ptr<UploadElementReader> reader,int64 identifier)45 UploadDataStream* UploadDataStream::CreateWithReader(
46     scoped_ptr<UploadElementReader> reader,
47     int64 identifier) {
48   ScopedVector<UploadElementReader> readers;
49   readers.push_back(reader.release());
50   return new UploadDataStream(readers.Pass(), identifier);
51 }
52 
Init(const CompletionCallback & callback)53 int UploadDataStream::Init(const CompletionCallback& callback) {
54   Reset();
55   return InitInternal(0, callback);
56 }
57 
Read(IOBuffer * buf,int buf_len,const CompletionCallback & callback)58 int UploadDataStream::Read(IOBuffer* buf,
59                            int buf_len,
60                            const CompletionCallback& callback) {
61   DCHECK(initialized_successfully_);
62   DCHECK_GT(buf_len, 0);
63   return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback);
64 }
65 
IsEOF() const66 bool UploadDataStream::IsEOF() const {
67   DCHECK(initialized_successfully_);
68   if (!is_chunked_)
69     return current_position_ == total_size_;
70 
71   // If the upload data is chunked, check if the last chunk is appended and all
72   // elements are consumed.
73   return element_index_ == element_readers_.size() && last_chunk_appended_;
74 }
75 
IsInMemory() const76 bool UploadDataStream::IsInMemory() const {
77   // Chunks are in memory, but UploadData does not have all the chunks at
78   // once. Chunks are provided progressively with AppendChunk() as chunks
79   // are ready. Check is_chunked_ here, rather than relying on the loop
80   // below, as there is a case that is_chunked_ is set to true, but the
81   // first chunk is not yet delivered.
82   if (is_chunked_)
83     return false;
84 
85   for (size_t i = 0; i < element_readers_.size(); ++i) {
86     if (!element_readers_[i]->IsInMemory())
87       return false;
88   }
89   return true;
90 }
91 
AppendChunk(const char * bytes,int bytes_len,bool is_last_chunk)92 void UploadDataStream::AppendChunk(const char* bytes,
93                                    int bytes_len,
94                                    bool is_last_chunk) {
95   DCHECK(is_chunked_);
96   DCHECK(!last_chunk_appended_);
97   last_chunk_appended_ = is_last_chunk;
98 
99   // Initialize a reader for the newly appended chunk. We leave |total_size_| at
100   // zero, since for chunked uploads, we may not know the total size.
101   std::vector<char> data(bytes, bytes + bytes_len);
102   UploadElementReader* reader = new UploadOwnedBytesElementReader(&data);
103   const int rv = reader->Init(net::CompletionCallback());
104   DCHECK_EQ(OK, rv);
105   element_readers_.push_back(reader);
106 
107   // Resume pending read.
108   if (!pending_chunked_read_callback_.is_null()) {
109     base::Closure callback = pending_chunked_read_callback_;
110     pending_chunked_read_callback_.Reset();
111     callback.Run();
112   }
113 }
114 
Reset()115 void UploadDataStream::Reset() {
116   weak_ptr_factory_.InvalidateWeakPtrs();
117   pending_chunked_read_callback_.Reset();
118   initialized_successfully_ = false;
119   read_failed_ = false;
120   current_position_ = 0;
121   total_size_ = 0;
122   element_index_ = 0;
123 }
124 
InitInternal(int start_index,const CompletionCallback & callback)125 int UploadDataStream::InitInternal(int start_index,
126                                    const CompletionCallback& callback) {
127   DCHECK(!initialized_successfully_);
128 
129   // Call Init() for all elements.
130   for (size_t i = start_index; i < element_readers_.size(); ++i) {
131     UploadElementReader* reader = element_readers_[i];
132     // When new_result is ERR_IO_PENDING, InitInternal() will be called
133     // with start_index == i + 1 when reader->Init() finishes.
134     const int result = reader->Init(
135         base::Bind(&UploadDataStream::ResumePendingInit,
136                    weak_ptr_factory_.GetWeakPtr(),
137                    i + 1,
138                    callback));
139     if (result != OK) {
140       DCHECK(result != ERR_IO_PENDING || !callback.is_null());
141       return result;
142     }
143   }
144 
145   // Finalize initialization.
146   if (!is_chunked_) {
147     uint64 total_size = 0;
148     for (size_t i = 0; i < element_readers_.size(); ++i) {
149       UploadElementReader* reader = element_readers_[i];
150       total_size += reader->GetContentLength();
151     }
152     total_size_ = total_size;
153   }
154   initialized_successfully_ = true;
155   return OK;
156 }
157 
ResumePendingInit(int start_index,const CompletionCallback & callback,int previous_result)158 void UploadDataStream::ResumePendingInit(int start_index,
159                                          const CompletionCallback& callback,
160                                          int previous_result) {
161   DCHECK(!initialized_successfully_);
162   DCHECK(!callback.is_null());
163   DCHECK_NE(ERR_IO_PENDING, previous_result);
164 
165   // Check the last result.
166   if (previous_result != OK) {
167     callback.Run(previous_result);
168     return;
169   }
170 
171   const int result = InitInternal(start_index, callback);
172   if (result != ERR_IO_PENDING)
173     callback.Run(result);
174 }
175 
ReadInternal(scoped_refptr<DrainableIOBuffer> buf,const CompletionCallback & callback)176 int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf,
177                                    const CompletionCallback& callback) {
178   DCHECK(initialized_successfully_);
179 
180   while (!read_failed_ && element_index_ < element_readers_.size()) {
181     UploadElementReader* reader = element_readers_[element_index_];
182 
183     if (reader->BytesRemaining() == 0) {
184       ++element_index_;
185       continue;
186     }
187 
188     if (buf->BytesRemaining() == 0)
189       break;
190 
191     int result = reader->Read(
192         buf.get(),
193         buf->BytesRemaining(),
194         base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead),
195                    weak_ptr_factory_.GetWeakPtr(),
196                    buf,
197                    callback));
198     if (result == ERR_IO_PENDING) {
199       DCHECK(!callback.is_null());
200       return ERR_IO_PENDING;
201     }
202     ProcessReadResult(buf, result);
203   }
204 
205   if (read_failed_) {
206     // Chunked transfers may only contain byte readers, so cannot have read
207     // failures.
208     DCHECK(!is_chunked_);
209 
210     // If an error occured during read operation, then pad with zero.
211     // Otherwise the server will hang waiting for the rest of the data.
212     const int num_bytes_to_fill =
213         std::min(static_cast<uint64>(buf->BytesRemaining()),
214                  size() - position() - buf->BytesConsumed());
215     DCHECK_LE(0, num_bytes_to_fill);
216     memset(buf->data(), 0, num_bytes_to_fill);
217     buf->DidConsume(num_bytes_to_fill);
218   }
219 
220   const int bytes_copied = buf->BytesConsumed();
221   current_position_ += bytes_copied;
222   DCHECK(is_chunked_ || total_size_ >= current_position_);
223 
224   if (is_chunked_ && !IsEOF() && bytes_copied == 0) {
225     DCHECK(!callback.is_null());
226     DCHECK(pending_chunked_read_callback_.is_null());
227     pending_chunked_read_callback_ =
228         base::Bind(&UploadDataStream::ResumePendingRead,
229                    weak_ptr_factory_.GetWeakPtr(),
230                    buf,
231                    callback,
232                    OK);
233     return ERR_IO_PENDING;
234   }
235 
236   // Returning 0 is allowed only when IsEOF() == true.
237   DCHECK(bytes_copied != 0 || IsEOF());
238   return bytes_copied;
239 }
240 
ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf,const CompletionCallback & callback,int previous_result)241 void UploadDataStream::ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf,
242                                          const CompletionCallback& callback,
243                                          int previous_result) {
244   DCHECK(!callback.is_null());
245 
246   ProcessReadResult(buf, previous_result);
247 
248   const int result = ReadInternal(buf, callback);
249   if (result != ERR_IO_PENDING)
250     callback.Run(result);
251 }
252 
ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf,int result)253 void UploadDataStream::ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf,
254                                          int result) {
255   DCHECK_NE(ERR_IO_PENDING, result);
256   DCHECK(!read_failed_);
257 
258   if (result >= 0)
259     buf->DidConsume(result);
260   else
261     read_failed_ = true;
262 }
263 
264 }  // namespace net
265