• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/quic/bidirectional_stream_quic_impl.h"
6 
7 #include <utility>
8 
9 #include "base/functional/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/memory/raw_ptr.h"
13 #include "base/task/single_thread_task_runner.h"
14 #include "base/timer/timer.h"
15 #include "net/http/bidirectional_stream_request_info.h"
16 #include "net/http/http_util.h"
17 #include "net/socket/next_proto.h"
18 #include "net/spdy/spdy_http_utils.h"
19 #include "net/third_party/quiche/src/quiche/quic/core/quic_connection.h"
20 #include "quic_http_stream.h"
21 
22 namespace net {
23 namespace {
24 // Sets a boolean to a value, and restores it to the previous value once
25 // the saver goes out of scope.
26 class ScopedBoolSaver {
27  public:
ScopedBoolSaver(bool * var,bool new_val)28   ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) {
29     *var_ = new_val;
30   }
31 
~ScopedBoolSaver()32   ~ScopedBoolSaver() { *var_ = old_val_; }
33 
34  private:
35   raw_ptr<bool> var_;
36   bool old_val_;
37 };
38 }  // namespace
39 
BidirectionalStreamQuicImpl(std::unique_ptr<QuicChromiumClientSession::Handle> session)40 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
41     std::unique_ptr<QuicChromiumClientSession::Handle> session)
42     : session_(std::move(session)) {}
43 
~BidirectionalStreamQuicImpl()44 BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() {
45   if (stream_) {
46     delegate_ = nullptr;
47     stream_->Reset(quic::QUIC_STREAM_CANCELLED);
48   }
49 }
50 
Start(const BidirectionalStreamRequestInfo * request_info,const NetLogWithSource & net_log,bool send_request_headers_automatically,BidirectionalStreamImpl::Delegate * delegate,std::unique_ptr<base::OneShotTimer> timer,const NetworkTrafficAnnotationTag & traffic_annotation)51 void BidirectionalStreamQuicImpl::Start(
52     const BidirectionalStreamRequestInfo* request_info,
53     const NetLogWithSource& net_log,
54     bool send_request_headers_automatically,
55     BidirectionalStreamImpl::Delegate* delegate,
56     std::unique_ptr<base::OneShotTimer> timer,
57     const NetworkTrafficAnnotationTag& traffic_annotation) {
58   ScopedBoolSaver saver(&may_invoke_callbacks_, false);
59   DCHECK(!stream_);
60   CHECK(delegate);
61   DLOG_IF(WARNING, !session_->IsConnected())
62       << "Trying to start request headers after session has been closed.";
63 
64   net_log.AddEventReferencingSource(
65       NetLogEventType::BIDIRECTIONAL_STREAM_BOUND_TO_QUIC_SESSION,
66       session_->net_log().source());
67 
68   send_request_headers_automatically_ = send_request_headers_automatically;
69   delegate_ = delegate;
70   request_info_ = request_info;
71 
72   // Only allow SAFE methods to use early data, unless overridden by the caller.
73   bool use_early_data = HttpUtil::IsMethodSafe(request_info_->method);
74   use_early_data |= request_info_->allow_early_data_override;
75 
76   int rv = session_->RequestStream(
77       !use_early_data,
78       base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
79                      weak_factory_.GetWeakPtr()),
80       traffic_annotation);
81   if (rv == ERR_IO_PENDING)
82     return;
83 
84   if (rv != OK) {
85     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
86         FROM_HERE,
87         base::BindOnce(
88             &BidirectionalStreamQuicImpl::NotifyError,
89             weak_factory_.GetWeakPtr(),
90             session_->OneRttKeysAvailable() ? rv : ERR_QUIC_HANDSHAKE_FAILED));
91     return;
92   }
93 
94   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
95       FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
96                                 weak_factory_.GetWeakPtr(), rv));
97 }
98 
SendRequestHeaders()99 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
100   ScopedBoolSaver saver(&may_invoke_callbacks_, false);
101   int rv = WriteHeaders();
102   if (rv < 0) {
103     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
104         FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
105                                   weak_factory_.GetWeakPtr(), rv));
106   }
107 }
108 
WriteHeaders()109 int BidirectionalStreamQuicImpl::WriteHeaders() {
110   DCHECK(!has_sent_headers_);
111 
112   quiche::HttpHeaderBlock headers;
113   HttpRequestInfo http_request_info;
114   http_request_info.url = request_info_->url;
115   http_request_info.method = request_info_->method;
116   http_request_info.extra_headers = request_info_->extra_headers;
117 
118   CreateSpdyHeadersFromHttpRequest(http_request_info, std::nullopt,
119                                    http_request_info.extra_headers, &headers);
120   int rv = stream_->WriteHeaders(std::move(headers),
121                                  request_info_->end_stream_on_headers, nullptr);
122   if (rv >= 0) {
123     headers_bytes_sent_ += rv;
124     has_sent_headers_ = true;
125   }
126   return rv;
127 }
128 
ReadData(IOBuffer * buffer,int buffer_len)129 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
130   ScopedBoolSaver saver(&may_invoke_callbacks_, false);
131   DCHECK(buffer);
132   DCHECK(buffer_len);
133 
134   int rv = stream_->ReadBody(
135       buffer, buffer_len,
136       base::BindOnce(&BidirectionalStreamQuicImpl::OnReadDataComplete,
137                      weak_factory_.GetWeakPtr()));
138   if (rv == ERR_IO_PENDING) {
139     read_buffer_ = buffer;
140     read_buffer_len_ = buffer_len;
141     return ERR_IO_PENDING;
142   }
143 
144   if (rv < 0)
145     return rv;
146 
147   // If the write side is closed, OnFinRead() will call
148   // BidirectionalStreamQuicImpl::OnClose().
149   if (stream_->IsDoneReading())
150     stream_->OnFinRead();
151 
152   return rv;
153 }
154 
SendvData(const std::vector<scoped_refptr<IOBuffer>> & buffers,const std::vector<int> & lengths,bool end_stream)155 void BidirectionalStreamQuicImpl::SendvData(
156     const std::vector<scoped_refptr<IOBuffer>>& buffers,
157     const std::vector<int>& lengths,
158     bool end_stream) {
159   ScopedBoolSaver saver(&may_invoke_callbacks_, false);
160   DCHECK_EQ(buffers.size(), lengths.size());
161 
162   if (!stream_->IsOpen()) {
163     LOG(ERROR) << "Trying to send data after stream has been closed.";
164     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
165         FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
166                                   weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
167     return;
168   }
169 
170   std::unique_ptr<quic::QuicConnection::ScopedPacketFlusher> bundler(
171       session_->CreatePacketBundler());
172   if (!has_sent_headers_) {
173     DCHECK(!send_request_headers_automatically_);
174     int rv = WriteHeaders();
175     if (rv < 0) {
176       base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
177           FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
178                                     weak_factory_.GetWeakPtr(), rv));
179       return;
180     }
181   }
182 
183   int rv = stream_->WritevStreamData(
184       buffers, lengths, end_stream,
185       base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
186                      weak_factory_.GetWeakPtr()));
187 
188   if (rv != ERR_IO_PENDING) {
189     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
190         FROM_HERE,
191         base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
192                        weak_factory_.GetWeakPtr(), rv));
193   }
194 }
195 
GetProtocol() const196 NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
197   return negotiated_protocol_;
198 }
199 
GetTotalReceivedBytes() const200 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
201   if (stream_) {
202     DCHECK_LE(stream_->NumBytesConsumed(), stream_->stream_bytes_read());
203     // Only count the uniquely received bytes.
204     return stream_->NumBytesConsumed();
205   }
206   return closed_stream_received_bytes_;
207 }
208 
GetTotalSentBytes() const209 int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const {
210   if (stream_) {
211     return stream_->stream_bytes_written();
212   }
213   return closed_stream_sent_bytes_;
214 }
215 
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const216 bool BidirectionalStreamQuicImpl::GetLoadTimingInfo(
217     LoadTimingInfo* load_timing_info) const {
218   bool is_first_stream = closed_is_first_stream_;
219   if (stream_)
220     is_first_stream = stream_->IsFirstStream();
221   if (is_first_stream) {
222     load_timing_info->socket_reused = false;
223     load_timing_info->connect_timing = connect_timing_;
224   } else {
225     load_timing_info->socket_reused = true;
226   }
227   return true;
228 }
229 
PopulateNetErrorDetails(NetErrorDetails * details)230 void BidirectionalStreamQuicImpl::PopulateNetErrorDetails(
231     NetErrorDetails* details) {
232   DCHECK(details);
233   details->connection_info =
234       QuicHttpStream::ConnectionInfoFromQuicVersion(session_->GetQuicVersion());
235   session_->PopulateNetErrorDetails(details);
236   if (session_->OneRttKeysAvailable() && stream_)
237     details->quic_connection_error = stream_->connection_error();
238 }
239 
OnStreamReady(int rv)240 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
241   DCHECK_NE(ERR_IO_PENDING, rv);
242   DCHECK(!stream_);
243   if (rv != OK) {
244     NotifyError(rv);
245     return;
246   }
247 
248   stream_ = session_->ReleaseStream();
249   DCHECK(stream_);
250 
251   if (!stream_->IsOpen()) {
252     NotifyError(ERR_CONNECTION_CLOSED);
253     return;
254   }
255 
256   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
257       FROM_HERE,
258       base::BindOnce(&BidirectionalStreamQuicImpl::ReadInitialHeaders,
259                      weak_factory_.GetWeakPtr()));
260 
261   NotifyStreamReady();
262 }
263 
OnSendDataComplete(int rv)264 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
265   CHECK(may_invoke_callbacks_);
266   DCHECK_NE(ERR_IO_PENDING, rv);
267   if (rv < 0) {
268     NotifyError(rv);
269     return;
270   }
271 
272   if (delegate_)
273     delegate_->OnDataSent();
274 }
275 
OnReadInitialHeadersComplete(int rv)276 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) {
277   CHECK(may_invoke_callbacks_);
278   DCHECK_NE(ERR_IO_PENDING, rv);
279   if (rv < 0) {
280     NotifyError(rv);
281     return;
282   }
283 
284   headers_bytes_received_ += rv;
285   negotiated_protocol_ = kProtoQUIC;
286   connect_timing_ = session_->GetConnectTiming();
287   base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
288       FROM_HERE,
289       base::BindOnce(&BidirectionalStreamQuicImpl::ReadTrailingHeaders,
290                      weak_factory_.GetWeakPtr()));
291   if (delegate_)
292     delegate_->OnHeadersReceived(initial_headers_);
293 }
294 
ReadInitialHeaders()295 void BidirectionalStreamQuicImpl::ReadInitialHeaders() {
296   int rv = stream_->ReadInitialHeaders(
297       &initial_headers_,
298       base::BindOnce(&BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete,
299                      weak_factory_.GetWeakPtr()));
300 
301   if (rv != ERR_IO_PENDING)
302     OnReadInitialHeadersComplete(rv);
303 }
304 
ReadTrailingHeaders()305 void BidirectionalStreamQuicImpl::ReadTrailingHeaders() {
306   int rv = stream_->ReadTrailingHeaders(
307       &trailing_headers_,
308       base::BindOnce(
309           &BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete,
310           weak_factory_.GetWeakPtr()));
311 
312   if (rv != ERR_IO_PENDING)
313     OnReadTrailingHeadersComplete(rv);
314 }
315 
OnReadTrailingHeadersComplete(int rv)316 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) {
317   CHECK(may_invoke_callbacks_);
318   DCHECK_NE(ERR_IO_PENDING, rv);
319   if (rv < 0) {
320     NotifyError(rv);
321     return;
322   }
323 
324   headers_bytes_received_ += rv;
325 
326   if (delegate_)
327     delegate_->OnTrailersReceived(trailing_headers_);
328 }
329 
OnReadDataComplete(int rv)330 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
331   CHECK(may_invoke_callbacks_);
332 
333   read_buffer_ = nullptr;
334   read_buffer_len_ = 0;
335 
336   // If the write side is closed, OnFinRead() will call
337   // BidirectionalStreamQuicImpl::OnClose().
338   if (stream_->IsDoneReading())
339     stream_->OnFinRead();
340 
341   if (!delegate_)
342     return;
343 
344   if (rv < 0)
345     NotifyError(rv);
346   else
347     delegate_->OnDataRead(rv);
348 }
349 
NotifyError(int error)350 void BidirectionalStreamQuicImpl::NotifyError(int error) {
351   NotifyErrorImpl(error, /*notify_delegate_later*/ false);
352 }
353 
NotifyErrorImpl(int error,bool notify_delegate_later)354 void BidirectionalStreamQuicImpl::NotifyErrorImpl(int error,
355                                                   bool notify_delegate_later) {
356   DCHECK_NE(OK, error);
357   DCHECK_NE(ERR_IO_PENDING, error);
358 
359   ResetStream();
360   if (delegate_) {
361     response_status_ = error;
362     BidirectionalStreamImpl::Delegate* delegate = delegate_;
363     delegate_ = nullptr;
364     // Cancel any pending callback.
365     weak_factory_.InvalidateWeakPtrs();
366     if (notify_delegate_later) {
367       base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
368           FROM_HERE,
369           base::BindOnce(&BidirectionalStreamQuicImpl::NotifyFailure,
370                          weak_factory_.GetWeakPtr(), delegate, error));
371     } else {
372       NotifyFailure(delegate, error);
373       // |this| might be destroyed at this point.
374     }
375   }
376 }
377 
NotifyFailure(BidirectionalStreamImpl::Delegate * delegate,int error)378 void BidirectionalStreamQuicImpl::NotifyFailure(
379     BidirectionalStreamImpl::Delegate* delegate,
380     int error) {
381   CHECK(may_invoke_callbacks_);
382   delegate->OnFailed(error);
383   // |this| might be destroyed at this point.
384 }
385 
NotifyStreamReady()386 void BidirectionalStreamQuicImpl::NotifyStreamReady() {
387   CHECK(may_invoke_callbacks_);
388   if (send_request_headers_automatically_) {
389     int rv = WriteHeaders();
390     if (rv < 0) {
391       base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
392           FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
393                                     weak_factory_.GetWeakPtr(), rv));
394       return;
395     }
396   }
397 
398   if (delegate_)
399     delegate_->OnStreamReady(has_sent_headers_);
400 }
401 
ResetStream()402 void BidirectionalStreamQuicImpl::ResetStream() {
403   if (!stream_)
404     return;
405   closed_stream_received_bytes_ = stream_->stream_bytes_read();
406   closed_stream_sent_bytes_ = stream_->stream_bytes_written();
407   closed_is_first_stream_ = stream_->IsFirstStream();
408 }
409 
410 }  // namespace net
411