• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "content/browser/byte_stream.h"
6 
7 #include <deque>
8 #include <set>
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
15 
16 namespace content {
17 namespace {
18 
19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
20 ContentVector;
21 
22 class ByteStreamReaderImpl;
23 
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence.  We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
31  public:
LifetimeFlagcontent::__anon5f99e2120111::LifetimeFlag32   LifetimeFlag() : is_alive(true) { }
33   bool is_alive;
34 
35  protected:
36   friend class base::RefCountedThreadSafe<LifetimeFlag>;
~LifetimeFlagcontent::__anon5f99e2120111::LifetimeFlag37   virtual ~LifetimeFlag() { }
38 
39  private:
40   DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
41 };
42 
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl : public ByteStreamWriter {
47  public:
48   ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
49                        scoped_refptr<LifetimeFlag> lifetime_flag,
50                        size_t buffer_size);
51   virtual ~ByteStreamWriterImpl();
52 
53   // Must be called before any operations are performed.
54   void SetPeer(ByteStreamReaderImpl* peer,
55                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
56                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
57 
58   // Overridden from ByteStreamWriter.
59   virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
60                      size_t byte_count) OVERRIDE;
61   virtual void Flush() OVERRIDE;
62   virtual void Close(int status) OVERRIDE;
63   virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
64   virtual size_t GetTotalBufferedBytes() const OVERRIDE;
65 
66   // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
67   static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
68                            ByteStreamWriterImpl* target,
69                            size_t bytes_consumed);
70 
71  private:
72   // Called from UpdateWindow when object existence has been validated.
73   void UpdateWindowInternal(size_t bytes_consumed);
74 
75   void PostToPeer(bool complete, int status);
76 
77   const size_t total_buffer_size_;
78 
79   // All data objects in this class are only valid to access on
80   // this task runner except as otherwise noted.
81   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
82 
83   // True while this object is alive.
84   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
85 
86   base::Closure space_available_callback_;
87   ContentVector input_contents_;
88   size_t input_contents_size_;
89 
90   // ** Peer information.
91 
92   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
93 
94   // How much we've sent to the output that for flow control purposes we
95   // must assume hasn't been read yet.
96   size_t output_size_used_;
97 
98   // Only valid to access on peer_task_runner_.
99   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
100 
101   // Only valid to access on peer_task_runner_ if
102   // |*peer_lifetime_flag_ == true|
103   ByteStreamReaderImpl* peer_;
104 };
105 
106 class ByteStreamReaderImpl : public ByteStreamReader {
107  public:
108   ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
109                        scoped_refptr<LifetimeFlag> lifetime_flag,
110                        size_t buffer_size);
111   virtual ~ByteStreamReaderImpl();
112 
113   // Must be called before any operations are performed.
114   void SetPeer(ByteStreamWriterImpl* peer,
115                scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
116                scoped_refptr<LifetimeFlag> peer_lifetime_flag);
117 
118   // Overridden from ByteStreamReader.
119   virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
120                            size_t* length) OVERRIDE;
121   virtual int GetStatus() const OVERRIDE;
122   virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
123 
124   // PostTask target from |ByteStreamWriterImpl::Write| and
125   // |ByteStreamWriterImpl::Close|.
126   // Receive data from our peer.
127   // static because it may be called after the object it is targeting
128   // has been destroyed.  It may not access |*target|
129   // if |*object_lifetime_flag| is false.
130   static void TransferData(
131       scoped_refptr<LifetimeFlag> object_lifetime_flag,
132       ByteStreamReaderImpl* target,
133       scoped_ptr<ContentVector> transfer_buffer,
134       size_t transfer_buffer_bytes,
135       bool source_complete,
136       int status);
137 
138  private:
139   // Called from TransferData once object existence has been validated.
140   void TransferDataInternal(
141       scoped_ptr<ContentVector> transfer_buffer,
142       size_t transfer_buffer_bytes,
143       bool source_complete,
144       int status);
145 
146   void MaybeUpdateInput();
147 
148   const size_t total_buffer_size_;
149 
150   scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
151 
152   // True while this object is alive.
153   scoped_refptr<LifetimeFlag> my_lifetime_flag_;
154 
155   ContentVector available_contents_;
156 
157   bool received_status_;
158   int status_;
159 
160   base::Closure data_available_callback_;
161 
162   // Time of last point at which data in stream transitioned from full
163   // to non-full.  Nulled when a callback is sent.
164   base::Time last_non_full_time_;
165 
166   // ** Peer information
167 
168   scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
169 
170   // How much has been removed from this class that we haven't told
171   // the input about yet.
172   size_t unreported_consumed_bytes_;
173 
174   // Only valid to access on peer_task_runner_.
175   scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
176 
177   // Only valid to access on peer_task_runner_ if
178   // |*peer_lifetime_flag_ == true|
179   ByteStreamWriterImpl* peer_;
180 };
181 
ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,scoped_refptr<LifetimeFlag> lifetime_flag,size_t buffer_size)182 ByteStreamWriterImpl::ByteStreamWriterImpl(
183     scoped_refptr<base::SequencedTaskRunner> task_runner,
184     scoped_refptr<LifetimeFlag> lifetime_flag,
185     size_t buffer_size)
186     : total_buffer_size_(buffer_size),
187       my_task_runner_(task_runner),
188       my_lifetime_flag_(lifetime_flag),
189       input_contents_size_(0),
190       output_size_used_(0),
191       peer_(NULL) {
192   DCHECK(my_lifetime_flag_.get());
193   my_lifetime_flag_->is_alive = true;
194 }
195 
~ByteStreamWriterImpl()196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
197   // No RunsTasksOnCurrentThread() check to allow deleting a created writer
198   // before we start using it. Once started, should be deleted on the specified
199   // task runner.
200   my_lifetime_flag_->is_alive = false;
201 }
202 
SetPeer(ByteStreamReaderImpl * peer,scoped_refptr<base::SequencedTaskRunner> peer_task_runner,scoped_refptr<LifetimeFlag> peer_lifetime_flag)203 void ByteStreamWriterImpl::SetPeer(
204     ByteStreamReaderImpl* peer,
205     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
206     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
207   peer_ = peer;
208   peer_task_runner_ = peer_task_runner;
209   peer_lifetime_flag_ = peer_lifetime_flag;
210 }
211 
Write(scoped_refptr<net::IOBuffer> buffer,size_t byte_count)212 bool ByteStreamWriterImpl::Write(
213     scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
214   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
215 
216   // Check overflow.
217   //
218   // TODO(tyoshino): Discuss with content/browser/download developer and if
219   // they're fine with, set smaller limit and make it configurable.
220   size_t space_limit = std::numeric_limits<size_t>::max() -
221       GetTotalBufferedBytes();
222   if (byte_count > space_limit) {
223     // TODO(tyoshino): Tell the user that Write() failed.
224     // Ignore input.
225     return false;
226   }
227 
228   input_contents_.push_back(std::make_pair(buffer, byte_count));
229   input_contents_size_ += byte_count;
230 
231   // Arbitrarily, we buffer to a third of the total size before sending.
232   if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
233     PostToPeer(false, 0);
234 
235   return GetTotalBufferedBytes() <= total_buffer_size_;
236 }
237 
Flush()238 void ByteStreamWriterImpl::Flush() {
239   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
240   if (input_contents_size_ > 0)
241     PostToPeer(false, 0);
242 }
243 
Close(int status)244 void ByteStreamWriterImpl::Close(int status) {
245   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
246   PostToPeer(true, status);
247 }
248 
RegisterCallback(const base::Closure & source_callback)249 void ByteStreamWriterImpl::RegisterCallback(
250     const base::Closure& source_callback) {
251   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
252   space_available_callback_ = source_callback;
253 }
254 
GetTotalBufferedBytes() const255 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
256   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
257   // This sum doesn't overflow since Write() fails if this sum is going to
258   // overflow.
259   return input_contents_size_ + output_size_used_;
260 }
261 
262 // static
UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,ByteStreamWriterImpl * target,size_t bytes_consumed)263 void ByteStreamWriterImpl::UpdateWindow(
264     scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
265     size_t bytes_consumed) {
266   // If the target object isn't alive anymore, we do nothing.
267   if (!lifetime_flag->is_alive) return;
268 
269   target->UpdateWindowInternal(bytes_consumed);
270 }
271 
UpdateWindowInternal(size_t bytes_consumed)272 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
273   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
274 
275   bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
276 
277   DCHECK_GE(output_size_used_, bytes_consumed);
278   output_size_used_ -= bytes_consumed;
279 
280   // Callback if we were above the limit and we're now <= to it.
281   bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
282 
283   if (no_longer_above_limit && was_above_limit &&
284       !space_available_callback_.is_null())
285     space_available_callback_.Run();
286 }
287 
PostToPeer(bool complete,int status)288 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
289   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
290   // Valid contexts in which to call.
291   DCHECK(complete || 0 != input_contents_size_);
292 
293   scoped_ptr<ContentVector> transfer_buffer;
294   size_t buffer_size = 0;
295   if (0 != input_contents_size_) {
296     transfer_buffer.reset(new ContentVector);
297     transfer_buffer->swap(input_contents_);
298     buffer_size = input_contents_size_;
299     output_size_used_ += input_contents_size_;
300     input_contents_size_ = 0;
301   }
302   peer_task_runner_->PostTask(
303       FROM_HERE, base::Bind(
304           &ByteStreamReaderImpl::TransferData,
305           peer_lifetime_flag_,
306           peer_,
307           base::Passed(&transfer_buffer),
308           buffer_size,
309           complete,
310           status));
311 }
312 
ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,scoped_refptr<LifetimeFlag> lifetime_flag,size_t buffer_size)313 ByteStreamReaderImpl::ByteStreamReaderImpl(
314     scoped_refptr<base::SequencedTaskRunner> task_runner,
315     scoped_refptr<LifetimeFlag> lifetime_flag,
316     size_t buffer_size)
317     : total_buffer_size_(buffer_size),
318       my_task_runner_(task_runner),
319       my_lifetime_flag_(lifetime_flag),
320       received_status_(false),
321       status_(0),
322       unreported_consumed_bytes_(0),
323       peer_(NULL) {
324   DCHECK(my_lifetime_flag_.get());
325   my_lifetime_flag_->is_alive = true;
326 }
327 
~ByteStreamReaderImpl()328 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
329   // No RunsTasksOnCurrentThread() check to allow deleting a created writer
330   // before we start using it. Once started, should be deleted on the specified
331   // task runner.
332   my_lifetime_flag_->is_alive = false;
333 }
334 
SetPeer(ByteStreamWriterImpl * peer,scoped_refptr<base::SequencedTaskRunner> peer_task_runner,scoped_refptr<LifetimeFlag> peer_lifetime_flag)335 void ByteStreamReaderImpl::SetPeer(
336     ByteStreamWriterImpl* peer,
337     scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
338     scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
339   peer_ = peer;
340   peer_task_runner_ = peer_task_runner;
341   peer_lifetime_flag_ = peer_lifetime_flag;
342 }
343 
344 ByteStreamReaderImpl::StreamState
Read(scoped_refptr<net::IOBuffer> * data,size_t * length)345 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
346                            size_t* length) {
347   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
348 
349   if (available_contents_.size()) {
350     *data = available_contents_.front().first;
351     *length = available_contents_.front().second;
352     available_contents_.pop_front();
353     unreported_consumed_bytes_ += *length;
354 
355     MaybeUpdateInput();
356     return STREAM_HAS_DATA;
357   }
358   if (received_status_) {
359     return STREAM_COMPLETE;
360   }
361   return STREAM_EMPTY;
362 }
363 
GetStatus() const364 int ByteStreamReaderImpl::GetStatus() const {
365   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
366   DCHECK(received_status_);
367   return status_;
368 }
369 
RegisterCallback(const base::Closure & sink_callback)370 void ByteStreamReaderImpl::RegisterCallback(
371     const base::Closure& sink_callback) {
372   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
373 
374   data_available_callback_ = sink_callback;
375 }
376 
377 // static
TransferData(scoped_refptr<LifetimeFlag> object_lifetime_flag,ByteStreamReaderImpl * target,scoped_ptr<ContentVector> transfer_buffer,size_t buffer_size,bool source_complete,int status)378 void ByteStreamReaderImpl::TransferData(
379     scoped_refptr<LifetimeFlag> object_lifetime_flag,
380     ByteStreamReaderImpl* target,
381     scoped_ptr<ContentVector> transfer_buffer,
382     size_t buffer_size,
383     bool source_complete,
384     int status) {
385   // If our target is no longer alive, do nothing.
386   if (!object_lifetime_flag->is_alive) return;
387 
388   target->TransferDataInternal(
389       transfer_buffer.Pass(), buffer_size, source_complete, status);
390 }
391 
TransferDataInternal(scoped_ptr<ContentVector> transfer_buffer,size_t buffer_size,bool source_complete,int status)392 void ByteStreamReaderImpl::TransferDataInternal(
393     scoped_ptr<ContentVector> transfer_buffer,
394     size_t buffer_size,
395     bool source_complete,
396     int status) {
397   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
398 
399   bool was_empty = available_contents_.empty();
400 
401   if (transfer_buffer) {
402     available_contents_.insert(available_contents_.end(),
403                                transfer_buffer->begin(),
404                                transfer_buffer->end());
405   }
406 
407   if (source_complete) {
408     received_status_ = true;
409     status_ = status;
410   }
411 
412   // Callback on transition from empty to non-empty, or
413   // source complete.
414   if (((was_empty && !available_contents_.empty()) ||
415        source_complete) &&
416       !data_available_callback_.is_null())
417     data_available_callback_.Run();
418 }
419 
420 // Decide whether or not to send the input a window update.
421 // Currently we do that whenever we've got unreported consumption
422 // greater than 1/3 of total size.
MaybeUpdateInput()423 void ByteStreamReaderImpl::MaybeUpdateInput() {
424   DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
425 
426   if (unreported_consumed_bytes_ <=
427       total_buffer_size_ / kFractionReadBeforeWindowUpdate)
428     return;
429 
430   peer_task_runner_->PostTask(
431       FROM_HERE, base::Bind(
432           &ByteStreamWriterImpl::UpdateWindow,
433           peer_lifetime_flag_,
434           peer_,
435           unreported_consumed_bytes_));
436   unreported_consumed_bytes_ = 0;
437 }
438 
439 }  // namespace
440 
441 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
442 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
443 
~ByteStreamReader()444 ByteStreamReader::~ByteStreamReader() { }
445 
~ByteStreamWriter()446 ByteStreamWriter::~ByteStreamWriter() { }
447 
CreateByteStream(scoped_refptr<base::SequencedTaskRunner> input_task_runner,scoped_refptr<base::SequencedTaskRunner> output_task_runner,size_t buffer_size,scoped_ptr<ByteStreamWriter> * input,scoped_ptr<ByteStreamReader> * output)448 void CreateByteStream(
449     scoped_refptr<base::SequencedTaskRunner> input_task_runner,
450     scoped_refptr<base::SequencedTaskRunner> output_task_runner,
451     size_t buffer_size,
452     scoped_ptr<ByteStreamWriter>* input,
453     scoped_ptr<ByteStreamReader>* output) {
454   scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
455   scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
456 
457   ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
458       input_task_runner, input_flag, buffer_size);
459   ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
460       output_task_runner, output_flag, buffer_size);
461 
462   in->SetPeer(out, output_task_runner, output_flag);
463   out->SetPeer(in, input_task_runner, input_flag);
464   input->reset(in);
465   output->reset(out);
466 }
467 
468 }  // namespace content
469