• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "content/browser/streams/stream.h"
6 
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/values.h"
11 #include "content/browser/streams/stream_handle_impl.h"
12 #include "content/browser/streams/stream_read_observer.h"
13 #include "content/browser/streams/stream_registry.h"
14 #include "content/browser/streams/stream_write_observer.h"
15 #include "net/base/io_buffer.h"
16 #include "net/http/http_response_headers.h"
17 
18 namespace {
19 // Start throttling the connection at about 1MB.
20 const size_t kDeferSizeThreshold = 40 * 32768;
21 }
22 
23 namespace content {
24 
Stream(StreamRegistry * registry,StreamWriteObserver * write_observer,const GURL & url)25 Stream::Stream(StreamRegistry* registry,
26                StreamWriteObserver* write_observer,
27                const GURL& url)
28     : can_add_data_(true),
29       url_(url),
30       data_length_(0),
31       data_bytes_read_(0),
32       last_total_buffered_bytes_(0),
33       registry_(registry),
34       read_observer_(NULL),
35       write_observer_(write_observer),
36       stream_handle_(NULL),
37       weak_ptr_factory_(this) {
38   CreateByteStream(base::MessageLoopProxy::current(),
39                    base::MessageLoopProxy::current(),
40                    kDeferSizeThreshold,
41                    &writer_,
42                    &reader_);
43 
44   // Setup callback for writing.
45   writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
46                                        weak_ptr_factory_.GetWeakPtr()));
47   reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
48                                        weak_ptr_factory_.GetWeakPtr()));
49 
50   registry_->RegisterStream(this);
51 }
52 
~Stream()53 Stream::~Stream() {
54 }
55 
SetReadObserver(StreamReadObserver * observer)56 bool Stream::SetReadObserver(StreamReadObserver* observer) {
57   if (read_observer_)
58     return false;
59   read_observer_ = observer;
60   return true;
61 }
62 
RemoveReadObserver(StreamReadObserver * observer)63 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
64   DCHECK(observer == read_observer_);
65   read_observer_ = NULL;
66 }
67 
RemoveWriteObserver(StreamWriteObserver * observer)68 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
69   DCHECK(observer == write_observer_);
70   write_observer_ = NULL;
71 }
72 
Abort()73 void Stream::Abort() {
74   // Clear all buffer. It's safe to clear reader_ here since the same thread
75   // is used for both input and output operation.
76   writer_.reset();
77   reader_.reset();
78   ClearBuffer();
79   can_add_data_ = false;
80   registry_->UnregisterStream(url());
81 }
82 
AddData(scoped_refptr<net::IOBuffer> buffer,size_t size)83 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
84   if (!writer_.get())
85     return;
86 
87   size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
88   if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
89     Abort();
90     return;
91   }
92 
93   // Now it's guaranteed that this doesn't overflow. This must be done before
94   // Write() since GetTotalBufferedBytes() may return different value after
95   // Write() call, so if we use the new value, information in this instance and
96   // one in |registry_| become inconsistent.
97   last_total_buffered_bytes_ = current_buffered_bytes + size;
98 
99   can_add_data_ = writer_->Write(buffer, size);
100 }
101 
AddData(const char * data,size_t size)102 void Stream::AddData(const char* data, size_t size) {
103   if (!writer_.get())
104     return;
105 
106   scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
107   memcpy(io_buffer->data(), data, size);
108   AddData(io_buffer, size);
109 }
110 
Finalize()111 void Stream::Finalize() {
112   if (!writer_.get())
113     return;
114 
115   writer_->Close(0);
116   writer_.reset();
117 
118   // Continue asynchronously.
119   base::MessageLoopProxy::current()->PostTask(
120       FROM_HERE,
121       base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
122 }
123 
ReadRawData(net::IOBuffer * buf,int buf_size,int * bytes_read)124 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
125                                         int buf_size,
126                                         int* bytes_read) {
127   DCHECK(buf);
128   DCHECK(bytes_read);
129 
130   *bytes_read = 0;
131   if (!data_.get()) {
132     DCHECK(!data_length_);
133     DCHECK(!data_bytes_read_);
134 
135     if (!reader_.get())
136       return STREAM_ABORTED;
137 
138     ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
139     switch (state) {
140       case ByteStreamReader::STREAM_HAS_DATA:
141         break;
142       case ByteStreamReader::STREAM_COMPLETE:
143         registry_->UnregisterStream(url());
144         return STREAM_COMPLETE;
145       case ByteStreamReader::STREAM_EMPTY:
146         return STREAM_EMPTY;
147     }
148   }
149 
150   const size_t remaining_bytes = data_length_ - data_bytes_read_;
151   size_t to_read =
152       static_cast<size_t>(buf_size) < remaining_bytes ?
153       buf_size : remaining_bytes;
154   memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
155   data_bytes_read_ += to_read;
156   if (data_bytes_read_ >= data_length_)
157     ClearBuffer();
158 
159   *bytes_read = to_read;
160   return STREAM_HAS_DATA;
161 }
162 
CreateHandle(const GURL & original_url,const std::string & mime_type,scoped_refptr<net::HttpResponseHeaders> response_headers)163 scoped_ptr<StreamHandle> Stream::CreateHandle(
164     const GURL& original_url,
165     const std::string& mime_type,
166     scoped_refptr<net::HttpResponseHeaders> response_headers) {
167   CHECK(!stream_handle_);
168   stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
169                                         original_url,
170                                         mime_type,
171                                         response_headers);
172   return scoped_ptr<StreamHandle>(stream_handle_).Pass();
173 }
174 
CloseHandle()175 void Stream::CloseHandle() {
176   // Prevent deletion until this function ends.
177   scoped_refptr<Stream> ref(this);
178 
179   CHECK(stream_handle_);
180   stream_handle_ = NULL;
181   registry_->UnregisterStream(url());
182   if (write_observer_)
183     write_observer_->OnClose(this);
184 }
185 
OnSpaceAvailable()186 void Stream::OnSpaceAvailable() {
187   can_add_data_ = true;
188   if (write_observer_)
189     write_observer_->OnSpaceAvailable(this);
190 }
191 
OnDataAvailable()192 void Stream::OnDataAvailable() {
193   if (read_observer_)
194     read_observer_->OnDataAvailable(this);
195 }
196 
ClearBuffer()197 void Stream::ClearBuffer() {
198   data_ = NULL;
199   data_length_ = 0;
200   data_bytes_read_ = 0;
201 }
202 
203 }  // namespace content
204