1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/byte_stream.h"
6
7 #include <deque>
8 #include <set>
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
15
16 namespace content {
17 namespace {
18
19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
20 ContentVector;
21
22 class ByteStreamReaderImpl;
23
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence. We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
31 public:
LifetimeFlagcontent::__anon5f99e2120111::LifetimeFlag32 LifetimeFlag() : is_alive(true) { }
33 bool is_alive;
34
35 protected:
36 friend class base::RefCountedThreadSafe<LifetimeFlag>;
~LifetimeFlagcontent::__anon5f99e2120111::LifetimeFlag37 virtual ~LifetimeFlag() { }
38
39 private:
40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
41 };
42
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl : public ByteStreamWriter {
47 public:
48 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
49 scoped_refptr<LifetimeFlag> lifetime_flag,
50 size_t buffer_size);
51 virtual ~ByteStreamWriterImpl();
52
53 // Must be called before any operations are performed.
54 void SetPeer(ByteStreamReaderImpl* peer,
55 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
56 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
57
58 // Overridden from ByteStreamWriter.
59 virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
60 size_t byte_count) OVERRIDE;
61 virtual void Flush() OVERRIDE;
62 virtual void Close(int status) OVERRIDE;
63 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
64 virtual size_t GetTotalBufferedBytes() const OVERRIDE;
65
66 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
67 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
68 ByteStreamWriterImpl* target,
69 size_t bytes_consumed);
70
71 private:
72 // Called from UpdateWindow when object existence has been validated.
73 void UpdateWindowInternal(size_t bytes_consumed);
74
75 void PostToPeer(bool complete, int status);
76
77 const size_t total_buffer_size_;
78
79 // All data objects in this class are only valid to access on
80 // this task runner except as otherwise noted.
81 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
82
83 // True while this object is alive.
84 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
85
86 base::Closure space_available_callback_;
87 ContentVector input_contents_;
88 size_t input_contents_size_;
89
90 // ** Peer information.
91
92 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
93
94 // How much we've sent to the output that for flow control purposes we
95 // must assume hasn't been read yet.
96 size_t output_size_used_;
97
98 // Only valid to access on peer_task_runner_.
99 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
100
101 // Only valid to access on peer_task_runner_ if
102 // |*peer_lifetime_flag_ == true|
103 ByteStreamReaderImpl* peer_;
104 };
105
106 class ByteStreamReaderImpl : public ByteStreamReader {
107 public:
108 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
109 scoped_refptr<LifetimeFlag> lifetime_flag,
110 size_t buffer_size);
111 virtual ~ByteStreamReaderImpl();
112
113 // Must be called before any operations are performed.
114 void SetPeer(ByteStreamWriterImpl* peer,
115 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
116 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
117
118 // Overridden from ByteStreamReader.
119 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
120 size_t* length) OVERRIDE;
121 virtual int GetStatus() const OVERRIDE;
122 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
123
124 // PostTask target from |ByteStreamWriterImpl::Write| and
125 // |ByteStreamWriterImpl::Close|.
126 // Receive data from our peer.
127 // static because it may be called after the object it is targeting
128 // has been destroyed. It may not access |*target|
129 // if |*object_lifetime_flag| is false.
130 static void TransferData(
131 scoped_refptr<LifetimeFlag> object_lifetime_flag,
132 ByteStreamReaderImpl* target,
133 scoped_ptr<ContentVector> transfer_buffer,
134 size_t transfer_buffer_bytes,
135 bool source_complete,
136 int status);
137
138 private:
139 // Called from TransferData once object existence has been validated.
140 void TransferDataInternal(
141 scoped_ptr<ContentVector> transfer_buffer,
142 size_t transfer_buffer_bytes,
143 bool source_complete,
144 int status);
145
146 void MaybeUpdateInput();
147
148 const size_t total_buffer_size_;
149
150 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
151
152 // True while this object is alive.
153 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
154
155 ContentVector available_contents_;
156
157 bool received_status_;
158 int status_;
159
160 base::Closure data_available_callback_;
161
162 // Time of last point at which data in stream transitioned from full
163 // to non-full. Nulled when a callback is sent.
164 base::Time last_non_full_time_;
165
166 // ** Peer information
167
168 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
169
170 // How much has been removed from this class that we haven't told
171 // the input about yet.
172 size_t unreported_consumed_bytes_;
173
174 // Only valid to access on peer_task_runner_.
175 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
176
177 // Only valid to access on peer_task_runner_ if
178 // |*peer_lifetime_flag_ == true|
179 ByteStreamWriterImpl* peer_;
180 };
181
ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,scoped_refptr<LifetimeFlag> lifetime_flag,size_t buffer_size)182 ByteStreamWriterImpl::ByteStreamWriterImpl(
183 scoped_refptr<base::SequencedTaskRunner> task_runner,
184 scoped_refptr<LifetimeFlag> lifetime_flag,
185 size_t buffer_size)
186 : total_buffer_size_(buffer_size),
187 my_task_runner_(task_runner),
188 my_lifetime_flag_(lifetime_flag),
189 input_contents_size_(0),
190 output_size_used_(0),
191 peer_(NULL) {
192 DCHECK(my_lifetime_flag_.get());
193 my_lifetime_flag_->is_alive = true;
194 }
195
~ByteStreamWriterImpl()196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
197 // No RunsTasksOnCurrentThread() check to allow deleting a created writer
198 // before we start using it. Once started, should be deleted on the specified
199 // task runner.
200 my_lifetime_flag_->is_alive = false;
201 }
202
SetPeer(ByteStreamReaderImpl * peer,scoped_refptr<base::SequencedTaskRunner> peer_task_runner,scoped_refptr<LifetimeFlag> peer_lifetime_flag)203 void ByteStreamWriterImpl::SetPeer(
204 ByteStreamReaderImpl* peer,
205 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
206 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
207 peer_ = peer;
208 peer_task_runner_ = peer_task_runner;
209 peer_lifetime_flag_ = peer_lifetime_flag;
210 }
211
Write(scoped_refptr<net::IOBuffer> buffer,size_t byte_count)212 bool ByteStreamWriterImpl::Write(
213 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
214 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
215
216 // Check overflow.
217 //
218 // TODO(tyoshino): Discuss with content/browser/download developer and if
219 // they're fine with, set smaller limit and make it configurable.
220 size_t space_limit = std::numeric_limits<size_t>::max() -
221 GetTotalBufferedBytes();
222 if (byte_count > space_limit) {
223 // TODO(tyoshino): Tell the user that Write() failed.
224 // Ignore input.
225 return false;
226 }
227
228 input_contents_.push_back(std::make_pair(buffer, byte_count));
229 input_contents_size_ += byte_count;
230
231 // Arbitrarily, we buffer to a third of the total size before sending.
232 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
233 PostToPeer(false, 0);
234
235 return GetTotalBufferedBytes() <= total_buffer_size_;
236 }
237
Flush()238 void ByteStreamWriterImpl::Flush() {
239 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
240 if (input_contents_size_ > 0)
241 PostToPeer(false, 0);
242 }
243
Close(int status)244 void ByteStreamWriterImpl::Close(int status) {
245 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
246 PostToPeer(true, status);
247 }
248
RegisterCallback(const base::Closure & source_callback)249 void ByteStreamWriterImpl::RegisterCallback(
250 const base::Closure& source_callback) {
251 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
252 space_available_callback_ = source_callback;
253 }
254
GetTotalBufferedBytes() const255 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
256 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
257 // This sum doesn't overflow since Write() fails if this sum is going to
258 // overflow.
259 return input_contents_size_ + output_size_used_;
260 }
261
262 // static
UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,ByteStreamWriterImpl * target,size_t bytes_consumed)263 void ByteStreamWriterImpl::UpdateWindow(
264 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
265 size_t bytes_consumed) {
266 // If the target object isn't alive anymore, we do nothing.
267 if (!lifetime_flag->is_alive) return;
268
269 target->UpdateWindowInternal(bytes_consumed);
270 }
271
UpdateWindowInternal(size_t bytes_consumed)272 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
273 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
274
275 bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
276
277 DCHECK_GE(output_size_used_, bytes_consumed);
278 output_size_used_ -= bytes_consumed;
279
280 // Callback if we were above the limit and we're now <= to it.
281 bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
282
283 if (no_longer_above_limit && was_above_limit &&
284 !space_available_callback_.is_null())
285 space_available_callback_.Run();
286 }
287
PostToPeer(bool complete,int status)288 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
289 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
290 // Valid contexts in which to call.
291 DCHECK(complete || 0 != input_contents_size_);
292
293 scoped_ptr<ContentVector> transfer_buffer;
294 size_t buffer_size = 0;
295 if (0 != input_contents_size_) {
296 transfer_buffer.reset(new ContentVector);
297 transfer_buffer->swap(input_contents_);
298 buffer_size = input_contents_size_;
299 output_size_used_ += input_contents_size_;
300 input_contents_size_ = 0;
301 }
302 peer_task_runner_->PostTask(
303 FROM_HERE, base::Bind(
304 &ByteStreamReaderImpl::TransferData,
305 peer_lifetime_flag_,
306 peer_,
307 base::Passed(&transfer_buffer),
308 buffer_size,
309 complete,
310 status));
311 }
312
ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,scoped_refptr<LifetimeFlag> lifetime_flag,size_t buffer_size)313 ByteStreamReaderImpl::ByteStreamReaderImpl(
314 scoped_refptr<base::SequencedTaskRunner> task_runner,
315 scoped_refptr<LifetimeFlag> lifetime_flag,
316 size_t buffer_size)
317 : total_buffer_size_(buffer_size),
318 my_task_runner_(task_runner),
319 my_lifetime_flag_(lifetime_flag),
320 received_status_(false),
321 status_(0),
322 unreported_consumed_bytes_(0),
323 peer_(NULL) {
324 DCHECK(my_lifetime_flag_.get());
325 my_lifetime_flag_->is_alive = true;
326 }
327
~ByteStreamReaderImpl()328 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
329 // No RunsTasksOnCurrentThread() check to allow deleting a created writer
330 // before we start using it. Once started, should be deleted on the specified
331 // task runner.
332 my_lifetime_flag_->is_alive = false;
333 }
334
SetPeer(ByteStreamWriterImpl * peer,scoped_refptr<base::SequencedTaskRunner> peer_task_runner,scoped_refptr<LifetimeFlag> peer_lifetime_flag)335 void ByteStreamReaderImpl::SetPeer(
336 ByteStreamWriterImpl* peer,
337 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
338 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
339 peer_ = peer;
340 peer_task_runner_ = peer_task_runner;
341 peer_lifetime_flag_ = peer_lifetime_flag;
342 }
343
344 ByteStreamReaderImpl::StreamState
Read(scoped_refptr<net::IOBuffer> * data,size_t * length)345 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
346 size_t* length) {
347 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
348
349 if (available_contents_.size()) {
350 *data = available_contents_.front().first;
351 *length = available_contents_.front().second;
352 available_contents_.pop_front();
353 unreported_consumed_bytes_ += *length;
354
355 MaybeUpdateInput();
356 return STREAM_HAS_DATA;
357 }
358 if (received_status_) {
359 return STREAM_COMPLETE;
360 }
361 return STREAM_EMPTY;
362 }
363
GetStatus() const364 int ByteStreamReaderImpl::GetStatus() const {
365 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
366 DCHECK(received_status_);
367 return status_;
368 }
369
RegisterCallback(const base::Closure & sink_callback)370 void ByteStreamReaderImpl::RegisterCallback(
371 const base::Closure& sink_callback) {
372 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
373
374 data_available_callback_ = sink_callback;
375 }
376
377 // static
TransferData(scoped_refptr<LifetimeFlag> object_lifetime_flag,ByteStreamReaderImpl * target,scoped_ptr<ContentVector> transfer_buffer,size_t buffer_size,bool source_complete,int status)378 void ByteStreamReaderImpl::TransferData(
379 scoped_refptr<LifetimeFlag> object_lifetime_flag,
380 ByteStreamReaderImpl* target,
381 scoped_ptr<ContentVector> transfer_buffer,
382 size_t buffer_size,
383 bool source_complete,
384 int status) {
385 // If our target is no longer alive, do nothing.
386 if (!object_lifetime_flag->is_alive) return;
387
388 target->TransferDataInternal(
389 transfer_buffer.Pass(), buffer_size, source_complete, status);
390 }
391
TransferDataInternal(scoped_ptr<ContentVector> transfer_buffer,size_t buffer_size,bool source_complete,int status)392 void ByteStreamReaderImpl::TransferDataInternal(
393 scoped_ptr<ContentVector> transfer_buffer,
394 size_t buffer_size,
395 bool source_complete,
396 int status) {
397 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
398
399 bool was_empty = available_contents_.empty();
400
401 if (transfer_buffer) {
402 available_contents_.insert(available_contents_.end(),
403 transfer_buffer->begin(),
404 transfer_buffer->end());
405 }
406
407 if (source_complete) {
408 received_status_ = true;
409 status_ = status;
410 }
411
412 // Callback on transition from empty to non-empty, or
413 // source complete.
414 if (((was_empty && !available_contents_.empty()) ||
415 source_complete) &&
416 !data_available_callback_.is_null())
417 data_available_callback_.Run();
418 }
419
420 // Decide whether or not to send the input a window update.
421 // Currently we do that whenever we've got unreported consumption
422 // greater than 1/3 of total size.
MaybeUpdateInput()423 void ByteStreamReaderImpl::MaybeUpdateInput() {
424 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
425
426 if (unreported_consumed_bytes_ <=
427 total_buffer_size_ / kFractionReadBeforeWindowUpdate)
428 return;
429
430 peer_task_runner_->PostTask(
431 FROM_HERE, base::Bind(
432 &ByteStreamWriterImpl::UpdateWindow,
433 peer_lifetime_flag_,
434 peer_,
435 unreported_consumed_bytes_));
436 unreported_consumed_bytes_ = 0;
437 }
438
439 } // namespace
440
441 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
442 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
443
~ByteStreamReader()444 ByteStreamReader::~ByteStreamReader() { }
445
~ByteStreamWriter()446 ByteStreamWriter::~ByteStreamWriter() { }
447
CreateByteStream(scoped_refptr<base::SequencedTaskRunner> input_task_runner,scoped_refptr<base::SequencedTaskRunner> output_task_runner,size_t buffer_size,scoped_ptr<ByteStreamWriter> * input,scoped_ptr<ByteStreamReader> * output)448 void CreateByteStream(
449 scoped_refptr<base::SequencedTaskRunner> input_task_runner,
450 scoped_refptr<base::SequencedTaskRunner> output_task_runner,
451 size_t buffer_size,
452 scoped_ptr<ByteStreamWriter>* input,
453 scoped_ptr<ByteStreamReader>* output) {
454 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
455 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
456
457 ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
458 input_task_runner, input_flag, buffer_size);
459 ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
460 output_task_runner, output_flag, buffer_size);
461
462 in->SetPeer(out, output_task_runner, output_flag);
463 out->SetPeer(in, input_task_runner, input_flag);
464 input->reset(in);
465 output->reset(out);
466 }
467
468 } // namespace content
469