• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifdef UNSAFE_BUFFERS_BUILD
6 // TODO(crbug.com/40284755): Remove this and spanify to fix the errors.
7 #pragma allow_unsafe_buffers
8 #endif
9 
10 #include "net/spdy/bidirectional_stream_spdy_impl.h"
11 
12 #include <utility>
13 
14 #include "base/functional/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/task/single_thread_task_runner.h"
18 #include "base/time/time.h"
19 #include "base/timer/timer.h"
20 #include "net/http/bidirectional_stream_request_info.h"
21 #include "net/spdy/spdy_buffer.h"
22 #include "net/spdy/spdy_http_utils.h"
23 #include "net/spdy/spdy_stream.h"
24 
25 namespace net {
26 
27 namespace {
28 
29 // Time to wait in millisecond to notify |delegate_| of data received.
30 // Handing small chunks of data to the caller creates measurable overhead.
31 // So buffer data in short time-spans and send a single read notification.
32 const int kBufferTimeMs = 1;
33 
34 }  // namespace
35 
BidirectionalStreamSpdyImpl(const base::WeakPtr<SpdySession> & spdy_session,NetLogSource source_dependency)36 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl(
37     const base::WeakPtr<SpdySession>& spdy_session,
38     NetLogSource source_dependency)
39     : spdy_session_(spdy_session), source_dependency_(source_dependency) {}
40 
~BidirectionalStreamSpdyImpl()41 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() {
42   // Sends a RST to the remote if the stream is destroyed before it completes.
43   ResetStream();
44 }
45 
Start(const BidirectionalStreamRequestInfo * request_info,const NetLogWithSource & net_log,bool,BidirectionalStreamImpl::Delegate * delegate,std::unique_ptr<base::OneShotTimer> timer,const NetworkTrafficAnnotationTag & traffic_annotation)46 void BidirectionalStreamSpdyImpl::Start(
47     const BidirectionalStreamRequestInfo* request_info,
48     const NetLogWithSource& net_log,
49     bool /*send_request_headers_automatically*/,
50     BidirectionalStreamImpl::Delegate* delegate,
51     std::unique_ptr<base::OneShotTimer> timer,
52     const NetworkTrafficAnnotationTag& traffic_annotation) {
53   DCHECK(!stream_);
54   DCHECK(timer);
55 
56   delegate_ = delegate;
57   timer_ = std::move(timer);
58 
59   if (!spdy_session_) {
60     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
61         FROM_HERE,
62         base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
63                        weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED));
64     return;
65   }
66 
67   request_info_ = request_info;
68 
69   int rv = stream_request_.StartRequest(
70       SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url,
71       false /* no early data */, request_info_->priority,
72       request_info_->socket_tag, net_log,
73       base::BindOnce(&BidirectionalStreamSpdyImpl::OnStreamInitialized,
74                      weak_factory_.GetWeakPtr()),
75       traffic_annotation, request_info_->detect_broken_connection,
76       request_info_->heartbeat_interval);
77   if (rv != ERR_IO_PENDING)
78     OnStreamInitialized(rv);
79 }
80 
SendRequestHeaders()81 void BidirectionalStreamSpdyImpl::SendRequestHeaders() {
82   // Request headers will be sent automatically.
83   NOTREACHED();
84 }
85 
ReadData(IOBuffer * buf,int buf_len)86 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) {
87   if (stream_)
88     DCHECK(!stream_->IsIdle());
89 
90   DCHECK(buf);
91   DCHECK(buf_len);
92   DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
93 
94   // If there is data buffered, complete the IO immediately.
95   if (!read_data_queue_.IsEmpty()) {
96     return read_data_queue_.Dequeue(buf->data(), buf_len);
97   } else if (stream_closed_) {
98     return closed_stream_status_;
99   }
100   // Read will complete asynchronously and Delegate::OnReadCompleted will be
101   // called upon completion.
102   read_buffer_ = buf;
103   read_buffer_len_ = buf_len;
104   return ERR_IO_PENDING;
105 }
106 
SendvData(const std::vector<scoped_refptr<IOBuffer>> & buffers,const std::vector<int> & lengths,bool end_stream)107 void BidirectionalStreamSpdyImpl::SendvData(
108     const std::vector<scoped_refptr<IOBuffer>>& buffers,
109     const std::vector<int>& lengths,
110     bool end_stream) {
111   DCHECK_EQ(buffers.size(), lengths.size());
112   DCHECK(!write_pending_);
113 
114   if (written_end_of_stream_) {
115     LOG(ERROR) << "Writing after end of stream is written.";
116     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
117         FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
118                                   weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
119     return;
120   }
121 
122   write_pending_ = true;
123   written_end_of_stream_ = end_stream;
124   if (MaybeHandleStreamClosedInSendData())
125     return;
126 
127   DCHECK(!stream_closed_);
128   int total_len = 0;
129   for (int len : lengths) {
130     total_len += len;
131   }
132 
133   if (buffers.size() == 1) {
134     pending_combined_buffer_ = buffers[0];
135   } else {
136     pending_combined_buffer_ =
137         base::MakeRefCounted<net::IOBufferWithSize>(total_len);
138     int len = 0;
139     // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames.
140     for (size_t i = 0; i < buffers.size(); ++i) {
141       memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(),
142              lengths[i]);
143       len += lengths[i];
144     }
145   }
146   stream_->SendData(pending_combined_buffer_.get(), total_len,
147                     end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
148 }
149 
GetProtocol() const150 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const {
151   return negotiated_protocol_;
152 }
153 
GetTotalReceivedBytes() const154 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const {
155   if (stream_closed_)
156     return closed_stream_received_bytes_;
157 
158   if (!stream_)
159     return 0;
160 
161   return stream_->raw_received_bytes();
162 }
163 
GetTotalSentBytes() const164 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const {
165   if (stream_closed_)
166     return closed_stream_sent_bytes_;
167 
168   if (!stream_)
169     return 0;
170 
171   return stream_->raw_sent_bytes();
172 }
173 
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const174 bool BidirectionalStreamSpdyImpl::GetLoadTimingInfo(
175     LoadTimingInfo* load_timing_info) const {
176   if (stream_closed_) {
177     if (!closed_has_load_timing_info_)
178       return false;
179     *load_timing_info = closed_load_timing_info_;
180     return true;
181   }
182 
183   // If |stream_| isn't created or has ID 0, return false. This is to match
184   // the implementation in SpdyHttpStream.
185   if (!stream_ || stream_->stream_id() == 0)
186     return false;
187 
188   return stream_->GetLoadTimingInfo(load_timing_info);
189 }
190 
PopulateNetErrorDetails(NetErrorDetails * details)191 void BidirectionalStreamSpdyImpl::PopulateNetErrorDetails(
192     NetErrorDetails* details) {}
193 
OnHeadersSent()194 void BidirectionalStreamSpdyImpl::OnHeadersSent() {
195   DCHECK(stream_);
196 
197   negotiated_protocol_ = kProtoHTTP2;
198   if (delegate_)
199     delegate_->OnStreamReady(/*request_headers_sent=*/true);
200 }
201 
OnEarlyHintsReceived(const quiche::HttpHeaderBlock & headers)202 void BidirectionalStreamSpdyImpl::OnEarlyHintsReceived(
203     const quiche::HttpHeaderBlock& headers) {
204   DCHECK(stream_);
205   // TODO(crbug.com/40496584): Plumb Early Hints to `delegate_` if needed.
206 }
207 
OnHeadersReceived(const quiche::HttpHeaderBlock & response_headers)208 void BidirectionalStreamSpdyImpl::OnHeadersReceived(
209     const quiche::HttpHeaderBlock& response_headers) {
210   DCHECK(stream_);
211 
212   if (delegate_)
213     delegate_->OnHeadersReceived(response_headers);
214 }
215 
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)216 void BidirectionalStreamSpdyImpl::OnDataReceived(
217     std::unique_ptr<SpdyBuffer> buffer) {
218   DCHECK(stream_);
219   DCHECK(!stream_closed_);
220 
221   // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked
222   // by SpdyStream to indicate the end of stream.
223   if (!buffer)
224     return;
225 
226   // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust
227   // recv window size accordingly.
228   read_data_queue_.Enqueue(std::move(buffer));
229   if (read_buffer_) {
230     // Handing small chunks of data to the caller creates measurable overhead.
231     // So buffer data in short time-spans and send a single read notification.
232     ScheduleBufferedRead();
233   }
234 }
235 
OnDataSent()236 void BidirectionalStreamSpdyImpl::OnDataSent() {
237   DCHECK(write_pending_);
238 
239   pending_combined_buffer_ = nullptr;
240   write_pending_ = false;
241 
242   if (delegate_)
243     delegate_->OnDataSent();
244 }
245 
OnTrailers(const quiche::HttpHeaderBlock & trailers)246 void BidirectionalStreamSpdyImpl::OnTrailers(
247     const quiche::HttpHeaderBlock& trailers) {
248   DCHECK(stream_);
249   DCHECK(!stream_closed_);
250 
251   if (delegate_)
252     delegate_->OnTrailersReceived(trailers);
253 }
254 
OnClose(int status)255 void BidirectionalStreamSpdyImpl::OnClose(int status) {
256   DCHECK(stream_);
257 
258   stream_closed_ = true;
259   closed_stream_status_ = status;
260   closed_stream_received_bytes_ = stream_->raw_received_bytes();
261   closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
262   closed_has_load_timing_info_ =
263       stream_->GetLoadTimingInfo(&closed_load_timing_info_);
264 
265   if (status != OK) {
266     NotifyError(status);
267     return;
268   }
269   ResetStream();
270   // Complete any remaining read, as all data has been buffered.
271   // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
272   // do nothing.
273   timer_->Stop();
274 
275   // |this| might get destroyed after calling into |delegate_| in
276   // DoBufferedRead().
277   auto weak_this = weak_factory_.GetWeakPtr();
278   DoBufferedRead();
279   if (weak_this.get() && write_pending_)
280     OnDataSent();
281 }
282 
CanGreaseFrameType() const283 bool BidirectionalStreamSpdyImpl::CanGreaseFrameType() const {
284   return false;
285 }
286 
source_dependency() const287 NetLogSource BidirectionalStreamSpdyImpl::source_dependency() const {
288   return source_dependency_;
289 }
290 
SendRequestHeadersHelper()291 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() {
292   quiche::HttpHeaderBlock headers;
293   HttpRequestInfo http_request_info;
294   http_request_info.url = request_info_->url;
295   http_request_info.method = request_info_->method;
296   http_request_info.extra_headers = request_info_->extra_headers;
297 
298   CreateSpdyHeadersFromHttpRequest(http_request_info, std::nullopt,
299                                    http_request_info.extra_headers, &headers);
300   written_end_of_stream_ = request_info_->end_stream_on_headers;
301   return stream_->SendRequestHeaders(std::move(headers),
302                                      request_info_->end_stream_on_headers
303                                          ? NO_MORE_DATA_TO_SEND
304                                          : MORE_DATA_TO_SEND);
305 }
306 
OnStreamInitialized(int rv)307 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) {
308   DCHECK_NE(ERR_IO_PENDING, rv);
309   if (rv == OK) {
310     stream_ = stream_request_.ReleaseStream();
311     stream_->SetDelegate(this);
312     rv = SendRequestHeadersHelper();
313     if (rv == OK) {
314       OnHeadersSent();
315       return;
316     } else if (rv == ERR_IO_PENDING) {
317       return;
318     }
319   }
320   NotifyError(rv);
321 }
322 
NotifyError(int rv)323 void BidirectionalStreamSpdyImpl::NotifyError(int rv) {
324   ResetStream();
325   write_pending_ = false;
326   if (delegate_) {
327     BidirectionalStreamImpl::Delegate* delegate = delegate_;
328     delegate_ = nullptr;
329     // Cancel any pending callback.
330     weak_factory_.InvalidateWeakPtrs();
331     delegate->OnFailed(rv);
332     // |this| can be null when returned from delegate.
333   }
334 }
335 
ResetStream()336 void BidirectionalStreamSpdyImpl::ResetStream() {
337   if (!stream_)
338     return;
339   if (!stream_->IsClosed()) {
340     // This sends a RST to the remote.
341     stream_->DetachDelegate();
342     DCHECK(!stream_);
343   } else {
344     // Stream is already closed, so it is not legal to call DetachDelegate.
345     stream_.reset();
346   }
347 }
348 
ScheduleBufferedRead()349 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() {
350   // If there is already a scheduled DoBufferedRead, don't issue
351   // another one. Mark that we have received more data and return.
352   if (timer_->IsRunning()) {
353     more_read_data_pending_ = true;
354     return;
355   }
356 
357   more_read_data_pending_ = false;
358   timer_->Start(FROM_HERE, base::Milliseconds(kBufferTimeMs),
359                 base::BindOnce(&BidirectionalStreamSpdyImpl::DoBufferedRead,
360                                weak_factory_.GetWeakPtr()));
361 }
362 
DoBufferedRead()363 void BidirectionalStreamSpdyImpl::DoBufferedRead() {
364   DCHECK(!timer_->IsRunning());
365   // Check to see that the stream has not errored out.
366   DCHECK(stream_ || stream_closed_);
367   DCHECK(!stream_closed_ || closed_stream_status_ == OK);
368 
369   // When |more_read_data_pending_| is true, it means that more data has arrived
370   // since started waiting. Wait a little longer and continue to buffer.
371   if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
372     ScheduleBufferedRead();
373     return;
374   }
375 
376   int rv = 0;
377   if (read_buffer_) {
378     rv = ReadData(read_buffer_.get(), read_buffer_len_);
379     DCHECK_NE(ERR_IO_PENDING, rv);
380     read_buffer_ = nullptr;
381     read_buffer_len_ = 0;
382     if (delegate_)
383       delegate_->OnDataRead(rv);
384   }
385 }
386 
ShouldWaitForMoreBufferedData() const387 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const {
388   if (stream_closed_)
389     return false;
390   DCHECK_GT(read_buffer_len_, 0);
391   return read_data_queue_.GetTotalSize() <
392          static_cast<size_t>(read_buffer_len_);
393 }
394 
MaybeHandleStreamClosedInSendData()395 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() {
396   if (stream_)
397     return false;
398   // If |stream_| is closed without an error before client half closes,
399   // blackhole any pending write data. crbug.com/650438.
400   if (stream_closed_ && closed_stream_status_ == OK) {
401     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
402         FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::OnDataSent,
403                                   weak_factory_.GetWeakPtr()));
404     return true;
405   }
406   LOG(ERROR) << "Trying to send data after stream has been destroyed.";
407   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
408       FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
409                                 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
410   return true;
411 }
412 
413 }  // namespace net
414