1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifdef UNSAFE_BUFFERS_BUILD
6 // TODO(crbug.com/40284755): Remove this and spanify to fix the errors.
7 #pragma allow_unsafe_buffers
8 #endif
9
10 #include "net/spdy/bidirectional_stream_spdy_impl.h"
11
12 #include <utility>
13
14 #include "base/functional/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/task/single_thread_task_runner.h"
18 #include "base/time/time.h"
19 #include "base/timer/timer.h"
20 #include "net/http/bidirectional_stream_request_info.h"
21 #include "net/spdy/spdy_buffer.h"
22 #include "net/spdy/spdy_http_utils.h"
23 #include "net/spdy/spdy_stream.h"
24
25 namespace net {
26
27 namespace {
28
29 // Time to wait in millisecond to notify |delegate_| of data received.
30 // Handing small chunks of data to the caller creates measurable overhead.
31 // So buffer data in short time-spans and send a single read notification.
32 const int kBufferTimeMs = 1;
33
34 } // namespace
35
BidirectionalStreamSpdyImpl(const base::WeakPtr<SpdySession> & spdy_session,NetLogSource source_dependency)36 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl(
37 const base::WeakPtr<SpdySession>& spdy_session,
38 NetLogSource source_dependency)
39 : spdy_session_(spdy_session), source_dependency_(source_dependency) {}
40
~BidirectionalStreamSpdyImpl()41 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() {
42 // Sends a RST to the remote if the stream is destroyed before it completes.
43 ResetStream();
44 }
45
Start(const BidirectionalStreamRequestInfo * request_info,const NetLogWithSource & net_log,bool,BidirectionalStreamImpl::Delegate * delegate,std::unique_ptr<base::OneShotTimer> timer,const NetworkTrafficAnnotationTag & traffic_annotation)46 void BidirectionalStreamSpdyImpl::Start(
47 const BidirectionalStreamRequestInfo* request_info,
48 const NetLogWithSource& net_log,
49 bool /*send_request_headers_automatically*/,
50 BidirectionalStreamImpl::Delegate* delegate,
51 std::unique_ptr<base::OneShotTimer> timer,
52 const NetworkTrafficAnnotationTag& traffic_annotation) {
53 DCHECK(!stream_);
54 DCHECK(timer);
55
56 delegate_ = delegate;
57 timer_ = std::move(timer);
58
59 if (!spdy_session_) {
60 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
61 FROM_HERE,
62 base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
63 weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED));
64 return;
65 }
66
67 request_info_ = request_info;
68
69 int rv = stream_request_.StartRequest(
70 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url,
71 false /* no early data */, request_info_->priority,
72 request_info_->socket_tag, net_log,
73 base::BindOnce(&BidirectionalStreamSpdyImpl::OnStreamInitialized,
74 weak_factory_.GetWeakPtr()),
75 traffic_annotation, request_info_->detect_broken_connection,
76 request_info_->heartbeat_interval);
77 if (rv != ERR_IO_PENDING)
78 OnStreamInitialized(rv);
79 }
80
SendRequestHeaders()81 void BidirectionalStreamSpdyImpl::SendRequestHeaders() {
82 // Request headers will be sent automatically.
83 NOTREACHED();
84 }
85
ReadData(IOBuffer * buf,int buf_len)86 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) {
87 if (stream_)
88 DCHECK(!stream_->IsIdle());
89
90 DCHECK(buf);
91 DCHECK(buf_len);
92 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
93
94 // If there is data buffered, complete the IO immediately.
95 if (!read_data_queue_.IsEmpty()) {
96 return read_data_queue_.Dequeue(buf->data(), buf_len);
97 } else if (stream_closed_) {
98 return closed_stream_status_;
99 }
100 // Read will complete asynchronously and Delegate::OnReadCompleted will be
101 // called upon completion.
102 read_buffer_ = buf;
103 read_buffer_len_ = buf_len;
104 return ERR_IO_PENDING;
105 }
106
SendvData(const std::vector<scoped_refptr<IOBuffer>> & buffers,const std::vector<int> & lengths,bool end_stream)107 void BidirectionalStreamSpdyImpl::SendvData(
108 const std::vector<scoped_refptr<IOBuffer>>& buffers,
109 const std::vector<int>& lengths,
110 bool end_stream) {
111 DCHECK_EQ(buffers.size(), lengths.size());
112 DCHECK(!write_pending_);
113
114 if (written_end_of_stream_) {
115 LOG(ERROR) << "Writing after end of stream is written.";
116 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
117 FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
118 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
119 return;
120 }
121
122 write_pending_ = true;
123 written_end_of_stream_ = end_stream;
124 if (MaybeHandleStreamClosedInSendData())
125 return;
126
127 DCHECK(!stream_closed_);
128 int total_len = 0;
129 for (int len : lengths) {
130 total_len += len;
131 }
132
133 if (buffers.size() == 1) {
134 pending_combined_buffer_ = buffers[0];
135 } else {
136 pending_combined_buffer_ =
137 base::MakeRefCounted<net::IOBufferWithSize>(total_len);
138 int len = 0;
139 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames.
140 for (size_t i = 0; i < buffers.size(); ++i) {
141 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(),
142 lengths[i]);
143 len += lengths[i];
144 }
145 }
146 stream_->SendData(pending_combined_buffer_.get(), total_len,
147 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
148 }
149
GetProtocol() const150 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const {
151 return negotiated_protocol_;
152 }
153
GetTotalReceivedBytes() const154 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const {
155 if (stream_closed_)
156 return closed_stream_received_bytes_;
157
158 if (!stream_)
159 return 0;
160
161 return stream_->raw_received_bytes();
162 }
163
GetTotalSentBytes() const164 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const {
165 if (stream_closed_)
166 return closed_stream_sent_bytes_;
167
168 if (!stream_)
169 return 0;
170
171 return stream_->raw_sent_bytes();
172 }
173
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const174 bool BidirectionalStreamSpdyImpl::GetLoadTimingInfo(
175 LoadTimingInfo* load_timing_info) const {
176 if (stream_closed_) {
177 if (!closed_has_load_timing_info_)
178 return false;
179 *load_timing_info = closed_load_timing_info_;
180 return true;
181 }
182
183 // If |stream_| isn't created or has ID 0, return false. This is to match
184 // the implementation in SpdyHttpStream.
185 if (!stream_ || stream_->stream_id() == 0)
186 return false;
187
188 return stream_->GetLoadTimingInfo(load_timing_info);
189 }
190
PopulateNetErrorDetails(NetErrorDetails * details)191 void BidirectionalStreamSpdyImpl::PopulateNetErrorDetails(
192 NetErrorDetails* details) {}
193
OnHeadersSent()194 void BidirectionalStreamSpdyImpl::OnHeadersSent() {
195 DCHECK(stream_);
196
197 negotiated_protocol_ = kProtoHTTP2;
198 if (delegate_)
199 delegate_->OnStreamReady(/*request_headers_sent=*/true);
200 }
201
OnEarlyHintsReceived(const quiche::HttpHeaderBlock & headers)202 void BidirectionalStreamSpdyImpl::OnEarlyHintsReceived(
203 const quiche::HttpHeaderBlock& headers) {
204 DCHECK(stream_);
205 // TODO(crbug.com/40496584): Plumb Early Hints to `delegate_` if needed.
206 }
207
OnHeadersReceived(const quiche::HttpHeaderBlock & response_headers)208 void BidirectionalStreamSpdyImpl::OnHeadersReceived(
209 const quiche::HttpHeaderBlock& response_headers) {
210 DCHECK(stream_);
211
212 if (delegate_)
213 delegate_->OnHeadersReceived(response_headers);
214 }
215
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)216 void BidirectionalStreamSpdyImpl::OnDataReceived(
217 std::unique_ptr<SpdyBuffer> buffer) {
218 DCHECK(stream_);
219 DCHECK(!stream_closed_);
220
221 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked
222 // by SpdyStream to indicate the end of stream.
223 if (!buffer)
224 return;
225
226 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust
227 // recv window size accordingly.
228 read_data_queue_.Enqueue(std::move(buffer));
229 if (read_buffer_) {
230 // Handing small chunks of data to the caller creates measurable overhead.
231 // So buffer data in short time-spans and send a single read notification.
232 ScheduleBufferedRead();
233 }
234 }
235
OnDataSent()236 void BidirectionalStreamSpdyImpl::OnDataSent() {
237 DCHECK(write_pending_);
238
239 pending_combined_buffer_ = nullptr;
240 write_pending_ = false;
241
242 if (delegate_)
243 delegate_->OnDataSent();
244 }
245
OnTrailers(const quiche::HttpHeaderBlock & trailers)246 void BidirectionalStreamSpdyImpl::OnTrailers(
247 const quiche::HttpHeaderBlock& trailers) {
248 DCHECK(stream_);
249 DCHECK(!stream_closed_);
250
251 if (delegate_)
252 delegate_->OnTrailersReceived(trailers);
253 }
254
OnClose(int status)255 void BidirectionalStreamSpdyImpl::OnClose(int status) {
256 DCHECK(stream_);
257
258 stream_closed_ = true;
259 closed_stream_status_ = status;
260 closed_stream_received_bytes_ = stream_->raw_received_bytes();
261 closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
262 closed_has_load_timing_info_ =
263 stream_->GetLoadTimingInfo(&closed_load_timing_info_);
264
265 if (status != OK) {
266 NotifyError(status);
267 return;
268 }
269 ResetStream();
270 // Complete any remaining read, as all data has been buffered.
271 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
272 // do nothing.
273 timer_->Stop();
274
275 // |this| might get destroyed after calling into |delegate_| in
276 // DoBufferedRead().
277 auto weak_this = weak_factory_.GetWeakPtr();
278 DoBufferedRead();
279 if (weak_this.get() && write_pending_)
280 OnDataSent();
281 }
282
CanGreaseFrameType() const283 bool BidirectionalStreamSpdyImpl::CanGreaseFrameType() const {
284 return false;
285 }
286
source_dependency() const287 NetLogSource BidirectionalStreamSpdyImpl::source_dependency() const {
288 return source_dependency_;
289 }
290
SendRequestHeadersHelper()291 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() {
292 quiche::HttpHeaderBlock headers;
293 HttpRequestInfo http_request_info;
294 http_request_info.url = request_info_->url;
295 http_request_info.method = request_info_->method;
296 http_request_info.extra_headers = request_info_->extra_headers;
297
298 CreateSpdyHeadersFromHttpRequest(http_request_info, std::nullopt,
299 http_request_info.extra_headers, &headers);
300 written_end_of_stream_ = request_info_->end_stream_on_headers;
301 return stream_->SendRequestHeaders(std::move(headers),
302 request_info_->end_stream_on_headers
303 ? NO_MORE_DATA_TO_SEND
304 : MORE_DATA_TO_SEND);
305 }
306
OnStreamInitialized(int rv)307 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) {
308 DCHECK_NE(ERR_IO_PENDING, rv);
309 if (rv == OK) {
310 stream_ = stream_request_.ReleaseStream();
311 stream_->SetDelegate(this);
312 rv = SendRequestHeadersHelper();
313 if (rv == OK) {
314 OnHeadersSent();
315 return;
316 } else if (rv == ERR_IO_PENDING) {
317 return;
318 }
319 }
320 NotifyError(rv);
321 }
322
NotifyError(int rv)323 void BidirectionalStreamSpdyImpl::NotifyError(int rv) {
324 ResetStream();
325 write_pending_ = false;
326 if (delegate_) {
327 BidirectionalStreamImpl::Delegate* delegate = delegate_;
328 delegate_ = nullptr;
329 // Cancel any pending callback.
330 weak_factory_.InvalidateWeakPtrs();
331 delegate->OnFailed(rv);
332 // |this| can be null when returned from delegate.
333 }
334 }
335
ResetStream()336 void BidirectionalStreamSpdyImpl::ResetStream() {
337 if (!stream_)
338 return;
339 if (!stream_->IsClosed()) {
340 // This sends a RST to the remote.
341 stream_->DetachDelegate();
342 DCHECK(!stream_);
343 } else {
344 // Stream is already closed, so it is not legal to call DetachDelegate.
345 stream_.reset();
346 }
347 }
348
ScheduleBufferedRead()349 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() {
350 // If there is already a scheduled DoBufferedRead, don't issue
351 // another one. Mark that we have received more data and return.
352 if (timer_->IsRunning()) {
353 more_read_data_pending_ = true;
354 return;
355 }
356
357 more_read_data_pending_ = false;
358 timer_->Start(FROM_HERE, base::Milliseconds(kBufferTimeMs),
359 base::BindOnce(&BidirectionalStreamSpdyImpl::DoBufferedRead,
360 weak_factory_.GetWeakPtr()));
361 }
362
DoBufferedRead()363 void BidirectionalStreamSpdyImpl::DoBufferedRead() {
364 DCHECK(!timer_->IsRunning());
365 // Check to see that the stream has not errored out.
366 DCHECK(stream_ || stream_closed_);
367 DCHECK(!stream_closed_ || closed_stream_status_ == OK);
368
369 // When |more_read_data_pending_| is true, it means that more data has arrived
370 // since started waiting. Wait a little longer and continue to buffer.
371 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
372 ScheduleBufferedRead();
373 return;
374 }
375
376 int rv = 0;
377 if (read_buffer_) {
378 rv = ReadData(read_buffer_.get(), read_buffer_len_);
379 DCHECK_NE(ERR_IO_PENDING, rv);
380 read_buffer_ = nullptr;
381 read_buffer_len_ = 0;
382 if (delegate_)
383 delegate_->OnDataRead(rv);
384 }
385 }
386
ShouldWaitForMoreBufferedData() const387 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const {
388 if (stream_closed_)
389 return false;
390 DCHECK_GT(read_buffer_len_, 0);
391 return read_data_queue_.GetTotalSize() <
392 static_cast<size_t>(read_buffer_len_);
393 }
394
MaybeHandleStreamClosedInSendData()395 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() {
396 if (stream_)
397 return false;
398 // If |stream_| is closed without an error before client half closes,
399 // blackhole any pending write data. crbug.com/650438.
400 if (stream_closed_ && closed_stream_status_ == OK) {
401 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
402 FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::OnDataSent,
403 weak_factory_.GetWeakPtr()));
404 return true;
405 }
406 LOG(ERROR) << "Trying to send data after stream has been destroyed.";
407 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
408 FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
409 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
410 return true;
411 }
412
413 } // namespace net
414