• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/bidirectional_stream_spdy_impl.h"
6 
7 #include <utility>
8 
9 #include "base/functional/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/task/single_thread_task_runner.h"
13 #include "base/time/time.h"
14 #include "base/timer/timer.h"
15 #include "net/http/bidirectional_stream_request_info.h"
16 #include "net/spdy/spdy_buffer.h"
17 #include "net/spdy/spdy_http_utils.h"
18 #include "net/spdy/spdy_stream.h"
19 #include "net/third_party/quiche/src/quiche/spdy/core/http2_header_block.h"
20 
21 namespace net {
22 
23 namespace {
24 
25 // Time to wait in millisecond to notify |delegate_| of data received.
26 // Handing small chunks of data to the caller creates measurable overhead.
27 // So buffer data in short time-spans and send a single read notification.
28 const int kBufferTimeMs = 1;
29 
30 }  // namespace
31 
BidirectionalStreamSpdyImpl(const base::WeakPtr<SpdySession> & spdy_session,NetLogSource source_dependency)32 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl(
33     const base::WeakPtr<SpdySession>& spdy_session,
34     NetLogSource source_dependency)
35     : spdy_session_(spdy_session), source_dependency_(source_dependency) {}
36 
~BidirectionalStreamSpdyImpl()37 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() {
38   // Sends a RST to the remote if the stream is destroyed before it completes.
39   ResetStream();
40 }
41 
Start(const BidirectionalStreamRequestInfo * request_info,const NetLogWithSource & net_log,bool,BidirectionalStreamImpl::Delegate * delegate,std::unique_ptr<base::OneShotTimer> timer,const NetworkTrafficAnnotationTag & traffic_annotation)42 void BidirectionalStreamSpdyImpl::Start(
43     const BidirectionalStreamRequestInfo* request_info,
44     const NetLogWithSource& net_log,
45     bool /*send_request_headers_automatically*/,
46     BidirectionalStreamImpl::Delegate* delegate,
47     std::unique_ptr<base::OneShotTimer> timer,
48     const NetworkTrafficAnnotationTag& traffic_annotation) {
49   DCHECK(!stream_);
50   DCHECK(timer);
51 
52   delegate_ = delegate;
53   timer_ = std::move(timer);
54 
55   if (!spdy_session_) {
56     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
57         FROM_HERE,
58         base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
59                        weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED));
60     return;
61   }
62 
63   request_info_ = request_info;
64 
65   int rv = stream_request_.StartRequest(
66       SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url,
67       false /* no early data */, request_info_->priority,
68       request_info_->socket_tag, net_log,
69       base::BindOnce(&BidirectionalStreamSpdyImpl::OnStreamInitialized,
70                      weak_factory_.GetWeakPtr()),
71       traffic_annotation, request_info_->detect_broken_connection,
72       request_info_->heartbeat_interval);
73   if (rv != ERR_IO_PENDING)
74     OnStreamInitialized(rv);
75 }
76 
SendRequestHeaders()77 void BidirectionalStreamSpdyImpl::SendRequestHeaders() {
78   // Request headers will be sent automatically.
79   NOTREACHED();
80 }
81 
ReadData(IOBuffer * buf,int buf_len)82 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) {
83   if (stream_)
84     DCHECK(!stream_->IsIdle());
85 
86   DCHECK(buf);
87   DCHECK(buf_len);
88   DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
89 
90   // If there is data buffered, complete the IO immediately.
91   if (!read_data_queue_.IsEmpty()) {
92     return read_data_queue_.Dequeue(buf->data(), buf_len);
93   } else if (stream_closed_) {
94     return closed_stream_status_;
95   }
96   // Read will complete asynchronously and Delegate::OnReadCompleted will be
97   // called upon completion.
98   read_buffer_ = buf;
99   read_buffer_len_ = buf_len;
100   return ERR_IO_PENDING;
101 }
102 
SendvData(const std::vector<scoped_refptr<IOBuffer>> & buffers,const std::vector<int> & lengths,bool end_stream)103 void BidirectionalStreamSpdyImpl::SendvData(
104     const std::vector<scoped_refptr<IOBuffer>>& buffers,
105     const std::vector<int>& lengths,
106     bool end_stream) {
107   DCHECK_EQ(buffers.size(), lengths.size());
108   DCHECK(!write_pending_);
109 
110   if (written_end_of_stream_) {
111     LOG(ERROR) << "Writing after end of stream is written.";
112     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
113         FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
114                                   weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
115     return;
116   }
117 
118   write_pending_ = true;
119   written_end_of_stream_ = end_stream;
120   if (MaybeHandleStreamClosedInSendData())
121     return;
122 
123   DCHECK(!stream_closed_);
124   int total_len = 0;
125   for (int len : lengths) {
126     total_len += len;
127   }
128 
129   if (buffers.size() == 1) {
130     pending_combined_buffer_ = buffers[0];
131   } else {
132     pending_combined_buffer_ = base::MakeRefCounted<net::IOBuffer>(total_len);
133     int len = 0;
134     // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames.
135     for (size_t i = 0; i < buffers.size(); ++i) {
136       memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(),
137              lengths[i]);
138       len += lengths[i];
139     }
140   }
141   stream_->SendData(pending_combined_buffer_.get(), total_len,
142                     end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
143 }
144 
GetProtocol() const145 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const {
146   return negotiated_protocol_;
147 }
148 
GetTotalReceivedBytes() const149 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const {
150   if (stream_closed_)
151     return closed_stream_received_bytes_;
152 
153   if (!stream_)
154     return 0;
155 
156   return stream_->raw_received_bytes();
157 }
158 
GetTotalSentBytes() const159 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const {
160   if (stream_closed_)
161     return closed_stream_sent_bytes_;
162 
163   if (!stream_)
164     return 0;
165 
166   return stream_->raw_sent_bytes();
167 }
168 
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const169 bool BidirectionalStreamSpdyImpl::GetLoadTimingInfo(
170     LoadTimingInfo* load_timing_info) const {
171   if (stream_closed_) {
172     if (!closed_has_load_timing_info_)
173       return false;
174     *load_timing_info = closed_load_timing_info_;
175     return true;
176   }
177 
178   // If |stream_| isn't created or has ID 0, return false. This is to match
179   // the implementation in SpdyHttpStream.
180   if (!stream_ || stream_->stream_id() == 0)
181     return false;
182 
183   return stream_->GetLoadTimingInfo(load_timing_info);
184 }
185 
PopulateNetErrorDetails(NetErrorDetails * details)186 void BidirectionalStreamSpdyImpl::PopulateNetErrorDetails(
187     NetErrorDetails* details) {}
188 
OnHeadersSent()189 void BidirectionalStreamSpdyImpl::OnHeadersSent() {
190   DCHECK(stream_);
191 
192   negotiated_protocol_ = kProtoHTTP2;
193   if (delegate_)
194     delegate_->OnStreamReady(/*request_headers_sent=*/true);
195 }
196 
OnEarlyHintsReceived(const spdy::Http2HeaderBlock & headers)197 void BidirectionalStreamSpdyImpl::OnEarlyHintsReceived(
198     const spdy::Http2HeaderBlock& headers) {
199   DCHECK(stream_);
200   // TODO(crbug.com/671310): Plumb Early Hints to `delegate_` if needed.
201 }
202 
OnHeadersReceived(const spdy::Http2HeaderBlock & response_headers,const spdy::Http2HeaderBlock * pushed_request_headers)203 void BidirectionalStreamSpdyImpl::OnHeadersReceived(
204     const spdy::Http2HeaderBlock& response_headers,
205     const spdy::Http2HeaderBlock* pushed_request_headers) {
206   DCHECK(stream_);
207 
208   if (delegate_)
209     delegate_->OnHeadersReceived(response_headers);
210 }
211 
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)212 void BidirectionalStreamSpdyImpl::OnDataReceived(
213     std::unique_ptr<SpdyBuffer> buffer) {
214   DCHECK(stream_);
215   DCHECK(!stream_closed_);
216 
217   // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked
218   // by SpdyStream to indicate the end of stream.
219   if (!buffer)
220     return;
221 
222   // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust
223   // recv window size accordingly.
224   read_data_queue_.Enqueue(std::move(buffer));
225   if (read_buffer_) {
226     // Handing small chunks of data to the caller creates measurable overhead.
227     // So buffer data in short time-spans and send a single read notification.
228     ScheduleBufferedRead();
229   }
230 }
231 
OnDataSent()232 void BidirectionalStreamSpdyImpl::OnDataSent() {
233   DCHECK(write_pending_);
234 
235   pending_combined_buffer_ = nullptr;
236   write_pending_ = false;
237 
238   if (delegate_)
239     delegate_->OnDataSent();
240 }
241 
OnTrailers(const spdy::Http2HeaderBlock & trailers)242 void BidirectionalStreamSpdyImpl::OnTrailers(
243     const spdy::Http2HeaderBlock& trailers) {
244   DCHECK(stream_);
245   DCHECK(!stream_closed_);
246 
247   if (delegate_)
248     delegate_->OnTrailersReceived(trailers);
249 }
250 
OnClose(int status)251 void BidirectionalStreamSpdyImpl::OnClose(int status) {
252   DCHECK(stream_);
253 
254   stream_closed_ = true;
255   closed_stream_status_ = status;
256   closed_stream_received_bytes_ = stream_->raw_received_bytes();
257   closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
258   closed_has_load_timing_info_ =
259       stream_->GetLoadTimingInfo(&closed_load_timing_info_);
260 
261   if (status != OK) {
262     NotifyError(status);
263     return;
264   }
265   ResetStream();
266   // Complete any remaining read, as all data has been buffered.
267   // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
268   // do nothing.
269   timer_->Stop();
270 
271   // |this| might get destroyed after calling into |delegate_| in
272   // DoBufferedRead().
273   auto weak_this = weak_factory_.GetWeakPtr();
274   DoBufferedRead();
275   if (weak_this.get() && write_pending_)
276     OnDataSent();
277 }
278 
CanGreaseFrameType() const279 bool BidirectionalStreamSpdyImpl::CanGreaseFrameType() const {
280   return false;
281 }
282 
source_dependency() const283 NetLogSource BidirectionalStreamSpdyImpl::source_dependency() const {
284   return source_dependency_;
285 }
286 
SendRequestHeadersHelper()287 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() {
288   spdy::Http2HeaderBlock headers;
289   HttpRequestInfo http_request_info;
290   http_request_info.url = request_info_->url;
291   http_request_info.method = request_info_->method;
292   http_request_info.extra_headers = request_info_->extra_headers;
293 
294   CreateSpdyHeadersFromHttpRequest(http_request_info,
295                                    http_request_info.extra_headers, &headers);
296   written_end_of_stream_ = request_info_->end_stream_on_headers;
297   return stream_->SendRequestHeaders(std::move(headers),
298                                      request_info_->end_stream_on_headers
299                                          ? NO_MORE_DATA_TO_SEND
300                                          : MORE_DATA_TO_SEND);
301 }
302 
OnStreamInitialized(int rv)303 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) {
304   DCHECK_NE(ERR_IO_PENDING, rv);
305   if (rv == OK) {
306     stream_ = stream_request_.ReleaseStream();
307     stream_->SetDelegate(this);
308     rv = SendRequestHeadersHelper();
309     if (rv == OK) {
310       OnHeadersSent();
311       return;
312     } else if (rv == ERR_IO_PENDING) {
313       return;
314     }
315   }
316   NotifyError(rv);
317 }
318 
NotifyError(int rv)319 void BidirectionalStreamSpdyImpl::NotifyError(int rv) {
320   ResetStream();
321   write_pending_ = false;
322   if (delegate_) {
323     BidirectionalStreamImpl::Delegate* delegate = delegate_;
324     delegate_ = nullptr;
325     // Cancel any pending callback.
326     weak_factory_.InvalidateWeakPtrs();
327     delegate->OnFailed(rv);
328     // |this| can be null when returned from delegate.
329   }
330 }
331 
ResetStream()332 void BidirectionalStreamSpdyImpl::ResetStream() {
333   if (!stream_)
334     return;
335   if (!stream_->IsClosed()) {
336     // This sends a RST to the remote.
337     stream_->DetachDelegate();
338     DCHECK(!stream_);
339   } else {
340     // Stream is already closed, so it is not legal to call DetachDelegate.
341     stream_.reset();
342   }
343 }
344 
ScheduleBufferedRead()345 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() {
346   // If there is already a scheduled DoBufferedRead, don't issue
347   // another one. Mark that we have received more data and return.
348   if (timer_->IsRunning()) {
349     more_read_data_pending_ = true;
350     return;
351   }
352 
353   more_read_data_pending_ = false;
354   timer_->Start(FROM_HERE, base::Milliseconds(kBufferTimeMs),
355                 base::BindOnce(&BidirectionalStreamSpdyImpl::DoBufferedRead,
356                                weak_factory_.GetWeakPtr()));
357 }
358 
DoBufferedRead()359 void BidirectionalStreamSpdyImpl::DoBufferedRead() {
360   DCHECK(!timer_->IsRunning());
361   // Check to see that the stream has not errored out.
362   DCHECK(stream_ || stream_closed_);
363   DCHECK(!stream_closed_ || closed_stream_status_ == OK);
364 
365   // When |more_read_data_pending_| is true, it means that more data has arrived
366   // since started waiting. Wait a little longer and continue to buffer.
367   if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
368     ScheduleBufferedRead();
369     return;
370   }
371 
372   int rv = 0;
373   if (read_buffer_) {
374     rv = ReadData(read_buffer_.get(), read_buffer_len_);
375     DCHECK_NE(ERR_IO_PENDING, rv);
376     read_buffer_ = nullptr;
377     read_buffer_len_ = 0;
378     if (delegate_)
379       delegate_->OnDataRead(rv);
380   }
381 }
382 
ShouldWaitForMoreBufferedData() const383 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const {
384   if (stream_closed_)
385     return false;
386   DCHECK_GT(read_buffer_len_, 0);
387   return read_data_queue_.GetTotalSize() <
388          static_cast<size_t>(read_buffer_len_);
389 }
390 
MaybeHandleStreamClosedInSendData()391 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() {
392   if (stream_)
393     return false;
394   // If |stream_| is closed without an error before client half closes,
395   // blackhole any pending write data. crbug.com/650438.
396   if (stream_closed_ && closed_stream_status_ == OK) {
397     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
398         FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::OnDataSent,
399                                   weak_factory_.GetWeakPtr()));
400     return true;
401   }
402   LOG(ERROR) << "Trying to send data after stream has been destroyed.";
403   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
404       FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
405                                 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
406   return true;
407 }
408 
409 }  // namespace net
410