1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_impl.h"
6
7 #include <utility>
8
9 #include "base/functional/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/task/single_thread_task_runner.h"
13 #include "base/time/time.h"
14 #include "base/timer/timer.h"
15 #include "net/http/bidirectional_stream_request_info.h"
16 #include "net/spdy/spdy_buffer.h"
17 #include "net/spdy/spdy_http_utils.h"
18 #include "net/spdy/spdy_stream.h"
19 #include "net/third_party/quiche/src/quiche/spdy/core/http2_header_block.h"
20
21 namespace net {
22
23 namespace {
24
25 // Time to wait in millisecond to notify |delegate_| of data received.
26 // Handing small chunks of data to the caller creates measurable overhead.
27 // So buffer data in short time-spans and send a single read notification.
28 const int kBufferTimeMs = 1;
29
30 } // namespace
31
BidirectionalStreamSpdyImpl(const base::WeakPtr<SpdySession> & spdy_session,NetLogSource source_dependency)32 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl(
33 const base::WeakPtr<SpdySession>& spdy_session,
34 NetLogSource source_dependency)
35 : spdy_session_(spdy_session), source_dependency_(source_dependency) {}
36
~BidirectionalStreamSpdyImpl()37 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() {
38 // Sends a RST to the remote if the stream is destroyed before it completes.
39 ResetStream();
40 }
41
Start(const BidirectionalStreamRequestInfo * request_info,const NetLogWithSource & net_log,bool,BidirectionalStreamImpl::Delegate * delegate,std::unique_ptr<base::OneShotTimer> timer,const NetworkTrafficAnnotationTag & traffic_annotation)42 void BidirectionalStreamSpdyImpl::Start(
43 const BidirectionalStreamRequestInfo* request_info,
44 const NetLogWithSource& net_log,
45 bool /*send_request_headers_automatically*/,
46 BidirectionalStreamImpl::Delegate* delegate,
47 std::unique_ptr<base::OneShotTimer> timer,
48 const NetworkTrafficAnnotationTag& traffic_annotation) {
49 DCHECK(!stream_);
50 DCHECK(timer);
51
52 delegate_ = delegate;
53 timer_ = std::move(timer);
54
55 if (!spdy_session_) {
56 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
57 FROM_HERE,
58 base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
59 weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED));
60 return;
61 }
62
63 request_info_ = request_info;
64
65 int rv = stream_request_.StartRequest(
66 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url,
67 false /* no early data */, request_info_->priority,
68 request_info_->socket_tag, net_log,
69 base::BindOnce(&BidirectionalStreamSpdyImpl::OnStreamInitialized,
70 weak_factory_.GetWeakPtr()),
71 traffic_annotation, request_info_->detect_broken_connection,
72 request_info_->heartbeat_interval);
73 if (rv != ERR_IO_PENDING)
74 OnStreamInitialized(rv);
75 }
76
SendRequestHeaders()77 void BidirectionalStreamSpdyImpl::SendRequestHeaders() {
78 // Request headers will be sent automatically.
79 NOTREACHED();
80 }
81
ReadData(IOBuffer * buf,int buf_len)82 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) {
83 if (stream_)
84 DCHECK(!stream_->IsIdle());
85
86 DCHECK(buf);
87 DCHECK(buf_len);
88 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
89
90 // If there is data buffered, complete the IO immediately.
91 if (!read_data_queue_.IsEmpty()) {
92 return read_data_queue_.Dequeue(buf->data(), buf_len);
93 } else if (stream_closed_) {
94 return closed_stream_status_;
95 }
96 // Read will complete asynchronously and Delegate::OnReadCompleted will be
97 // called upon completion.
98 read_buffer_ = buf;
99 read_buffer_len_ = buf_len;
100 return ERR_IO_PENDING;
101 }
102
SendvData(const std::vector<scoped_refptr<IOBuffer>> & buffers,const std::vector<int> & lengths,bool end_stream)103 void BidirectionalStreamSpdyImpl::SendvData(
104 const std::vector<scoped_refptr<IOBuffer>>& buffers,
105 const std::vector<int>& lengths,
106 bool end_stream) {
107 DCHECK_EQ(buffers.size(), lengths.size());
108 DCHECK(!write_pending_);
109
110 if (written_end_of_stream_) {
111 LOG(ERROR) << "Writing after end of stream is written.";
112 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
113 FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
114 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
115 return;
116 }
117
118 write_pending_ = true;
119 written_end_of_stream_ = end_stream;
120 if (MaybeHandleStreamClosedInSendData())
121 return;
122
123 DCHECK(!stream_closed_);
124 int total_len = 0;
125 for (int len : lengths) {
126 total_len += len;
127 }
128
129 if (buffers.size() == 1) {
130 pending_combined_buffer_ = buffers[0];
131 } else {
132 pending_combined_buffer_ = base::MakeRefCounted<net::IOBuffer>(total_len);
133 int len = 0;
134 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames.
135 for (size_t i = 0; i < buffers.size(); ++i) {
136 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(),
137 lengths[i]);
138 len += lengths[i];
139 }
140 }
141 stream_->SendData(pending_combined_buffer_.get(), total_len,
142 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
143 }
144
GetProtocol() const145 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const {
146 return negotiated_protocol_;
147 }
148
GetTotalReceivedBytes() const149 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const {
150 if (stream_closed_)
151 return closed_stream_received_bytes_;
152
153 if (!stream_)
154 return 0;
155
156 return stream_->raw_received_bytes();
157 }
158
GetTotalSentBytes() const159 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const {
160 if (stream_closed_)
161 return closed_stream_sent_bytes_;
162
163 if (!stream_)
164 return 0;
165
166 return stream_->raw_sent_bytes();
167 }
168
GetLoadTimingInfo(LoadTimingInfo * load_timing_info) const169 bool BidirectionalStreamSpdyImpl::GetLoadTimingInfo(
170 LoadTimingInfo* load_timing_info) const {
171 if (stream_closed_) {
172 if (!closed_has_load_timing_info_)
173 return false;
174 *load_timing_info = closed_load_timing_info_;
175 return true;
176 }
177
178 // If |stream_| isn't created or has ID 0, return false. This is to match
179 // the implementation in SpdyHttpStream.
180 if (!stream_ || stream_->stream_id() == 0)
181 return false;
182
183 return stream_->GetLoadTimingInfo(load_timing_info);
184 }
185
PopulateNetErrorDetails(NetErrorDetails * details)186 void BidirectionalStreamSpdyImpl::PopulateNetErrorDetails(
187 NetErrorDetails* details) {}
188
OnHeadersSent()189 void BidirectionalStreamSpdyImpl::OnHeadersSent() {
190 DCHECK(stream_);
191
192 negotiated_protocol_ = kProtoHTTP2;
193 if (delegate_)
194 delegate_->OnStreamReady(/*request_headers_sent=*/true);
195 }
196
OnEarlyHintsReceived(const spdy::Http2HeaderBlock & headers)197 void BidirectionalStreamSpdyImpl::OnEarlyHintsReceived(
198 const spdy::Http2HeaderBlock& headers) {
199 DCHECK(stream_);
200 // TODO(crbug.com/671310): Plumb Early Hints to `delegate_` if needed.
201 }
202
OnHeadersReceived(const spdy::Http2HeaderBlock & response_headers,const spdy::Http2HeaderBlock * pushed_request_headers)203 void BidirectionalStreamSpdyImpl::OnHeadersReceived(
204 const spdy::Http2HeaderBlock& response_headers,
205 const spdy::Http2HeaderBlock* pushed_request_headers) {
206 DCHECK(stream_);
207
208 if (delegate_)
209 delegate_->OnHeadersReceived(response_headers);
210 }
211
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)212 void BidirectionalStreamSpdyImpl::OnDataReceived(
213 std::unique_ptr<SpdyBuffer> buffer) {
214 DCHECK(stream_);
215 DCHECK(!stream_closed_);
216
217 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked
218 // by SpdyStream to indicate the end of stream.
219 if (!buffer)
220 return;
221
222 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust
223 // recv window size accordingly.
224 read_data_queue_.Enqueue(std::move(buffer));
225 if (read_buffer_) {
226 // Handing small chunks of data to the caller creates measurable overhead.
227 // So buffer data in short time-spans and send a single read notification.
228 ScheduleBufferedRead();
229 }
230 }
231
OnDataSent()232 void BidirectionalStreamSpdyImpl::OnDataSent() {
233 DCHECK(write_pending_);
234
235 pending_combined_buffer_ = nullptr;
236 write_pending_ = false;
237
238 if (delegate_)
239 delegate_->OnDataSent();
240 }
241
OnTrailers(const spdy::Http2HeaderBlock & trailers)242 void BidirectionalStreamSpdyImpl::OnTrailers(
243 const spdy::Http2HeaderBlock& trailers) {
244 DCHECK(stream_);
245 DCHECK(!stream_closed_);
246
247 if (delegate_)
248 delegate_->OnTrailersReceived(trailers);
249 }
250
OnClose(int status)251 void BidirectionalStreamSpdyImpl::OnClose(int status) {
252 DCHECK(stream_);
253
254 stream_closed_ = true;
255 closed_stream_status_ = status;
256 closed_stream_received_bytes_ = stream_->raw_received_bytes();
257 closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
258 closed_has_load_timing_info_ =
259 stream_->GetLoadTimingInfo(&closed_load_timing_info_);
260
261 if (status != OK) {
262 NotifyError(status);
263 return;
264 }
265 ResetStream();
266 // Complete any remaining read, as all data has been buffered.
267 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
268 // do nothing.
269 timer_->Stop();
270
271 // |this| might get destroyed after calling into |delegate_| in
272 // DoBufferedRead().
273 auto weak_this = weak_factory_.GetWeakPtr();
274 DoBufferedRead();
275 if (weak_this.get() && write_pending_)
276 OnDataSent();
277 }
278
CanGreaseFrameType() const279 bool BidirectionalStreamSpdyImpl::CanGreaseFrameType() const {
280 return false;
281 }
282
source_dependency() const283 NetLogSource BidirectionalStreamSpdyImpl::source_dependency() const {
284 return source_dependency_;
285 }
286
SendRequestHeadersHelper()287 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() {
288 spdy::Http2HeaderBlock headers;
289 HttpRequestInfo http_request_info;
290 http_request_info.url = request_info_->url;
291 http_request_info.method = request_info_->method;
292 http_request_info.extra_headers = request_info_->extra_headers;
293
294 CreateSpdyHeadersFromHttpRequest(http_request_info,
295 http_request_info.extra_headers, &headers);
296 written_end_of_stream_ = request_info_->end_stream_on_headers;
297 return stream_->SendRequestHeaders(std::move(headers),
298 request_info_->end_stream_on_headers
299 ? NO_MORE_DATA_TO_SEND
300 : MORE_DATA_TO_SEND);
301 }
302
OnStreamInitialized(int rv)303 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) {
304 DCHECK_NE(ERR_IO_PENDING, rv);
305 if (rv == OK) {
306 stream_ = stream_request_.ReleaseStream();
307 stream_->SetDelegate(this);
308 rv = SendRequestHeadersHelper();
309 if (rv == OK) {
310 OnHeadersSent();
311 return;
312 } else if (rv == ERR_IO_PENDING) {
313 return;
314 }
315 }
316 NotifyError(rv);
317 }
318
NotifyError(int rv)319 void BidirectionalStreamSpdyImpl::NotifyError(int rv) {
320 ResetStream();
321 write_pending_ = false;
322 if (delegate_) {
323 BidirectionalStreamImpl::Delegate* delegate = delegate_;
324 delegate_ = nullptr;
325 // Cancel any pending callback.
326 weak_factory_.InvalidateWeakPtrs();
327 delegate->OnFailed(rv);
328 // |this| can be null when returned from delegate.
329 }
330 }
331
ResetStream()332 void BidirectionalStreamSpdyImpl::ResetStream() {
333 if (!stream_)
334 return;
335 if (!stream_->IsClosed()) {
336 // This sends a RST to the remote.
337 stream_->DetachDelegate();
338 DCHECK(!stream_);
339 } else {
340 // Stream is already closed, so it is not legal to call DetachDelegate.
341 stream_.reset();
342 }
343 }
344
ScheduleBufferedRead()345 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() {
346 // If there is already a scheduled DoBufferedRead, don't issue
347 // another one. Mark that we have received more data and return.
348 if (timer_->IsRunning()) {
349 more_read_data_pending_ = true;
350 return;
351 }
352
353 more_read_data_pending_ = false;
354 timer_->Start(FROM_HERE, base::Milliseconds(kBufferTimeMs),
355 base::BindOnce(&BidirectionalStreamSpdyImpl::DoBufferedRead,
356 weak_factory_.GetWeakPtr()));
357 }
358
DoBufferedRead()359 void BidirectionalStreamSpdyImpl::DoBufferedRead() {
360 DCHECK(!timer_->IsRunning());
361 // Check to see that the stream has not errored out.
362 DCHECK(stream_ || stream_closed_);
363 DCHECK(!stream_closed_ || closed_stream_status_ == OK);
364
365 // When |more_read_data_pending_| is true, it means that more data has arrived
366 // since started waiting. Wait a little longer and continue to buffer.
367 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
368 ScheduleBufferedRead();
369 return;
370 }
371
372 int rv = 0;
373 if (read_buffer_) {
374 rv = ReadData(read_buffer_.get(), read_buffer_len_);
375 DCHECK_NE(ERR_IO_PENDING, rv);
376 read_buffer_ = nullptr;
377 read_buffer_len_ = 0;
378 if (delegate_)
379 delegate_->OnDataRead(rv);
380 }
381 }
382
ShouldWaitForMoreBufferedData() const383 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const {
384 if (stream_closed_)
385 return false;
386 DCHECK_GT(read_buffer_len_, 0);
387 return read_data_queue_.GetTotalSize() <
388 static_cast<size_t>(read_buffer_len_);
389 }
390
MaybeHandleStreamClosedInSendData()391 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() {
392 if (stream_)
393 return false;
394 // If |stream_| is closed without an error before client half closes,
395 // blackhole any pending write data. crbug.com/650438.
396 if (stream_closed_ && closed_stream_status_ == OK) {
397 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
398 FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::OnDataSent,
399 weak_factory_.GetWeakPtr()));
400 return true;
401 }
402 LOG(ERROR) << "Trying to send data after stream has been destroyed.";
403 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
404 FROM_HERE, base::BindOnce(&BidirectionalStreamSpdyImpl::NotifyError,
405 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
406 return true;
407 }
408
409 } // namespace net
410