1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/bidirectional_stream_quic_impl.h"
6
7 #include <utility>
8
9 #include "base/functional/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/memory/raw_ptr.h"
13 #include "base/task/single_thread_task_runner.h"
14 #include "base/timer/timer.h"
15 #include "net/http/bidirectional_stream_request_info.h"
16 #include "net/http/http_util.h"
17 #include "net/socket/next_proto.h"
18 #include "net/spdy/spdy_http_utils.h"
19 #include "net/third_party/quiche/src/quiche/quic/core/quic_connection.h"
20 #include "quic_http_stream.h"
21
22 namespace net {
23 namespace {
24 // Sets a boolean to a value, and restores it to the previous value once
25 // the saver goes out of scope.
26 class ScopedBoolSaver {
27 public:
ScopedBoolSaver(bool * var,bool new_val)28 ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) {
29 *var_ = new_val;
30 }
31
~ScopedBoolSaver()32 ~ScopedBoolSaver() { *var_ = old_val_; }
33
34 private:
35 raw_ptr<bool> var_;
36 bool old_val_;
37 };
38 } // namespace
39
BidirectionalStreamQuicImpl(std::unique_ptr<QuicChromiumClientSession::Handle> session)40 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
41 std::unique_ptr<QuicChromiumClientSession::Handle> session)
42 : session_(std::move(session)) {}
43
~BidirectionalStreamQuicImpl()44 BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() {
45 if (stream_) {
46 delegate_ = nullptr;
47 stream_->Reset(quic::QUIC_STREAM_CANCELLED);
48 }
49 }
50
Start(const BidirectionalStreamRequestInfo * request_info,const NetLogWithSource & net_log,bool send_request_headers_automatically,BidirectionalStreamImpl::Delegate * delegate,std::unique_ptr<base::OneShotTimer> timer,const NetworkTrafficAnnotationTag & traffic_annotation)51 void BidirectionalStreamQuicImpl::Start(
52 const BidirectionalStreamRequestInfo* request_info,
53 const NetLogWithSource& net_log,
54 bool send_request_headers_automatically,
55 BidirectionalStreamImpl::Delegate* delegate,
56 std::unique_ptr<base::OneShotTimer> timer,
57 const NetworkTrafficAnnotationTag& traffic_annotation) {
58 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
59 DCHECK(!stream_);
60 CHECK(delegate);
61 DLOG_IF(WARNING, !session_->IsConnected())
62 << "Trying to start request headers after session has been closed.";
63
64 net_log.AddEventReferencingSource(
65 NetLogEventType::BIDIRECTIONAL_STREAM_BOUND_TO_QUIC_SESSION,
66 session_->net_log().source());
67
68 send_request_headers_automatically_ = send_request_headers_automatically;
69 delegate_ = delegate;
70 request_info_ = request_info;
71
72 // Only allow SAFE methods to use early data, unless overridden by the caller.
73 bool use_early_data = HttpUtil::IsMethodSafe(request_info_->method);
74 use_early_data |= request_info_->allow_early_data_override;
75
76 int rv = session_->RequestStream(
77 !use_early_data,
78 base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
79 weak_factory_.GetWeakPtr()),
80 traffic_annotation);
81 if (rv == ERR_IO_PENDING)
82 return;
83
84 if (rv != OK) {
85 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
86 FROM_HERE,
87 base::BindOnce(
88 &BidirectionalStreamQuicImpl::NotifyError,
89 weak_factory_.GetWeakPtr(),
90 session_->OneRttKeysAvailable() ? rv : ERR_QUIC_HANDSHAKE_FAILED));
91 return;
92 }
93
94 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
95 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
96 weak_factory_.GetWeakPtr(), rv));
97 }
98
SendRequestHeaders()99 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
100 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
101 int rv = WriteHeaders();
102 if (rv < 0) {
103 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
104 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
105 weak_factory_.GetWeakPtr(), rv));
106 }
107 }
108
WriteHeaders()109 int BidirectionalStreamQuicImpl::WriteHeaders() {
110 DCHECK(!has_sent_headers_);
111
112 quiche::HttpHeaderBlock headers;
113 HttpRequestInfo http_request_info;
114 http_request_info.url = request_info_->url;
115 http_request_info.method = request_info_->method;
116 http_request_info.extra_headers = request_info_->extra_headers;
117
118 CreateSpdyHeadersFromHttpRequest(http_request_info, std::nullopt,
119 http_request_info.extra_headers, &headers);
120 int rv = stream_->WriteHeaders(std::move(headers),
121 request_info_->end_stream_on_headers, nullptr);
122 if (rv >= 0) {
123 headers_bytes_sent_ += rv;
124 has_sent_headers_ = true;
125 }
126 return rv;
127 }
128
ReadData(IOBuffer * buffer,int buffer_len)129 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
130 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
131 DCHECK(buffer);
132 DCHECK(buffer_len);
133
134 int rv = stream_->ReadBody(
135 buffer, buffer_len,
136 base::BindOnce(&BidirectionalStreamQuicImpl::OnReadDataComplete,
137 weak_factory_.GetWeakPtr()));
138 if (rv == ERR_IO_PENDING) {
139 read_buffer_ = buffer;
140 read_buffer_len_ = buffer_len;
141 return ERR_IO_PENDING;
142 }
143
144 if (rv < 0)
145 return rv;
146
147 // If the write side is closed, OnFinRead() will call
148 // BidirectionalStreamQuicImpl::OnClose().
149 if (stream_->IsDoneReading())
150 stream_->OnFinRead();
151
152 return rv;
153 }
154
SendvData(const std::vector<scoped_refptr<IOBuffer>> & buffers,const std::vector<int> & lengths,bool end_stream)155 void BidirectionalStreamQuicImpl::SendvData(
156 const std::vector<scoped_refptr<IOBuffer>>& buffers,
157 const std::vector<int>& lengths,
158 bool end_stream) {
159 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
160 DCHECK_EQ(buffers.size(), lengths.size());
161
162 if (!stream_->IsOpen()) {
163 LOG(ERROR) << "Trying to send data after stream has been closed.";
164 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
165 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
166 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
167 return;
168 }
169
170 std::unique_ptr<quic::QuicConnection::ScopedPacketFlusher> bundler(
171 session_->CreatePacketBundler());
172 if (!has_sent_headers_) {
173 DCHECK(!send_request_headers_automatically_);
174 int rv = WriteHeaders();
175 if (rv < 0) {
176 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
177 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
178 weak_factory_.GetWeakPtr(), rv));
179 return;
180 }
181 }
182
183 int rv = stream_->WritevStreamData(
184 buffers, lengths, end_stream,
185 base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
186 weak_factory_.GetWeakPtr()));
187
188 if (rv != ERR_IO_PENDING) {
189 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
190 FROM_HERE,
191 base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
192 weak_factory_.GetWeakPtr(), rv));
193 }
194 }
195
GetProtocol() const196 NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
197 return negotiated_protocol_;
198 }
199
GetTotalReceivedBytes() const200 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
201 if (stream_) {
202 DCHECK_LE(stream_->NumBytesConsumed(), stream_->stream_bytes_read());
203 // Only count the uniquely received bytes.
204 return stream_->NumBytesConsumed();
205 }
206 return closed_stream_received_bytes_;
207 }
208
GetTotalSentBytes() const209 int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const {
210 if (stream_) {
211 return stream_->stream_bytes_written();
212 }
213 return closed_stream_sent_bytes_;
214 }
215
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const216 bool BidirectionalStreamQuicImpl::GetLoadTimingInfo(
217 LoadTimingInfo* load_timing_info) const {
218 bool is_first_stream = closed_is_first_stream_;
219 if (stream_)
220 is_first_stream = stream_->IsFirstStream();
221 if (is_first_stream) {
222 load_timing_info->socket_reused = false;
223 load_timing_info->connect_timing = connect_timing_;
224 } else {
225 load_timing_info->socket_reused = true;
226 }
227 return true;
228 }
229
PopulateNetErrorDetails(NetErrorDetails * details)230 void BidirectionalStreamQuicImpl::PopulateNetErrorDetails(
231 NetErrorDetails* details) {
232 DCHECK(details);
233 details->connection_info =
234 QuicHttpStream::ConnectionInfoFromQuicVersion(session_->GetQuicVersion());
235 session_->PopulateNetErrorDetails(details);
236 if (session_->OneRttKeysAvailable() && stream_)
237 details->quic_connection_error = stream_->connection_error();
238 }
239
OnStreamReady(int rv)240 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
241 DCHECK_NE(ERR_IO_PENDING, rv);
242 DCHECK(!stream_);
243 if (rv != OK) {
244 NotifyError(rv);
245 return;
246 }
247
248 stream_ = session_->ReleaseStream();
249 DCHECK(stream_);
250
251 if (!stream_->IsOpen()) {
252 NotifyError(ERR_CONNECTION_CLOSED);
253 return;
254 }
255
256 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
257 FROM_HERE,
258 base::BindOnce(&BidirectionalStreamQuicImpl::ReadInitialHeaders,
259 weak_factory_.GetWeakPtr()));
260
261 NotifyStreamReady();
262 }
263
OnSendDataComplete(int rv)264 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
265 CHECK(may_invoke_callbacks_);
266 DCHECK_NE(ERR_IO_PENDING, rv);
267 if (rv < 0) {
268 NotifyError(rv);
269 return;
270 }
271
272 if (delegate_)
273 delegate_->OnDataSent();
274 }
275
OnReadInitialHeadersComplete(int rv)276 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) {
277 CHECK(may_invoke_callbacks_);
278 DCHECK_NE(ERR_IO_PENDING, rv);
279 if (rv < 0) {
280 NotifyError(rv);
281 return;
282 }
283
284 headers_bytes_received_ += rv;
285 negotiated_protocol_ = kProtoQUIC;
286 connect_timing_ = session_->GetConnectTiming();
287 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
288 FROM_HERE,
289 base::BindOnce(&BidirectionalStreamQuicImpl::ReadTrailingHeaders,
290 weak_factory_.GetWeakPtr()));
291 if (delegate_)
292 delegate_->OnHeadersReceived(initial_headers_);
293 }
294
ReadInitialHeaders()295 void BidirectionalStreamQuicImpl::ReadInitialHeaders() {
296 int rv = stream_->ReadInitialHeaders(
297 &initial_headers_,
298 base::BindOnce(&BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete,
299 weak_factory_.GetWeakPtr()));
300
301 if (rv != ERR_IO_PENDING)
302 OnReadInitialHeadersComplete(rv);
303 }
304
ReadTrailingHeaders()305 void BidirectionalStreamQuicImpl::ReadTrailingHeaders() {
306 int rv = stream_->ReadTrailingHeaders(
307 &trailing_headers_,
308 base::BindOnce(
309 &BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete,
310 weak_factory_.GetWeakPtr()));
311
312 if (rv != ERR_IO_PENDING)
313 OnReadTrailingHeadersComplete(rv);
314 }
315
OnReadTrailingHeadersComplete(int rv)316 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) {
317 CHECK(may_invoke_callbacks_);
318 DCHECK_NE(ERR_IO_PENDING, rv);
319 if (rv < 0) {
320 NotifyError(rv);
321 return;
322 }
323
324 headers_bytes_received_ += rv;
325
326 if (delegate_)
327 delegate_->OnTrailersReceived(trailing_headers_);
328 }
329
OnReadDataComplete(int rv)330 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
331 CHECK(may_invoke_callbacks_);
332
333 read_buffer_ = nullptr;
334 read_buffer_len_ = 0;
335
336 // If the write side is closed, OnFinRead() will call
337 // BidirectionalStreamQuicImpl::OnClose().
338 if (stream_->IsDoneReading())
339 stream_->OnFinRead();
340
341 if (!delegate_)
342 return;
343
344 if (rv < 0)
345 NotifyError(rv);
346 else
347 delegate_->OnDataRead(rv);
348 }
349
NotifyError(int error)350 void BidirectionalStreamQuicImpl::NotifyError(int error) {
351 NotifyErrorImpl(error, /*notify_delegate_later*/ false);
352 }
353
NotifyErrorImpl(int error,bool notify_delegate_later)354 void BidirectionalStreamQuicImpl::NotifyErrorImpl(int error,
355 bool notify_delegate_later) {
356 DCHECK_NE(OK, error);
357 DCHECK_NE(ERR_IO_PENDING, error);
358
359 ResetStream();
360 if (delegate_) {
361 response_status_ = error;
362 BidirectionalStreamImpl::Delegate* delegate = delegate_;
363 delegate_ = nullptr;
364 // Cancel any pending callback.
365 weak_factory_.InvalidateWeakPtrs();
366 if (notify_delegate_later) {
367 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
368 FROM_HERE,
369 base::BindOnce(&BidirectionalStreamQuicImpl::NotifyFailure,
370 weak_factory_.GetWeakPtr(), delegate, error));
371 } else {
372 NotifyFailure(delegate, error);
373 // |this| might be destroyed at this point.
374 }
375 }
376 }
377
NotifyFailure(BidirectionalStreamImpl::Delegate * delegate,int error)378 void BidirectionalStreamQuicImpl::NotifyFailure(
379 BidirectionalStreamImpl::Delegate* delegate,
380 int error) {
381 CHECK(may_invoke_callbacks_);
382 delegate->OnFailed(error);
383 // |this| might be destroyed at this point.
384 }
385
NotifyStreamReady()386 void BidirectionalStreamQuicImpl::NotifyStreamReady() {
387 CHECK(may_invoke_callbacks_);
388 if (send_request_headers_automatically_) {
389 int rv = WriteHeaders();
390 if (rv < 0) {
391 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
392 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
393 weak_factory_.GetWeakPtr(), rv));
394 return;
395 }
396 }
397
398 if (delegate_)
399 delegate_->OnStreamReady(has_sent_headers_);
400 }
401
ResetStream()402 void BidirectionalStreamQuicImpl::ResetStream() {
403 if (!stream_)
404 return;
405 closed_stream_received_bytes_ = stream_->stream_bytes_read();
406 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
407 closed_is_first_stream_ = stream_->IsFirstStream();
408 }
409
410 } // namespace net
411