1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_basic_stream_adapters.h"
6
7 #include <cstring>
8 #include <ostream>
9 #include <utility>
10
11 #include "base/check.h"
12 #include "base/check_op.h"
13 #include "base/functional/bind.h"
14 #include "base/functional/callback.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/notreached.h"
19 #include "base/task/single_thread_task_runner.h"
20 #include "net/base/io_buffer.h"
21 #include "net/socket/client_socket_handle.h"
22 #include "net/socket/stream_socket.h"
23 #include "net/socket/stream_socket_handle.h"
24 #include "net/spdy/spdy_buffer.h"
25 #include "net/third_party/quiche/src/quiche/quic/core/http/quic_header_list.h"
26 #include "net/third_party/quiche/src/quiche/quic/core/http/spdy_utils.h"
27 #include "net/third_party/quiche/src/quiche/quic/core/quic_ack_listener_interface.h"
28 #include "net/third_party/quiche/src/quiche/quic/core/quic_error_codes.h"
29 #include "net/websockets/websocket_quic_spdy_stream.h"
30
31 namespace net {
32 struct NetworkTrafficAnnotationTag;
33
WebSocketClientSocketHandleAdapter(std::unique_ptr<StreamSocketHandle> connection)34 WebSocketClientSocketHandleAdapter::WebSocketClientSocketHandleAdapter(
35 std::unique_ptr<StreamSocketHandle> connection)
36 : connection_(std::move(connection)) {}
37
38 WebSocketClientSocketHandleAdapter::~WebSocketClientSocketHandleAdapter() =
39 default;
40
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)41 int WebSocketClientSocketHandleAdapter::Read(IOBuffer* buf,
42 int buf_len,
43 CompletionOnceCallback callback) {
44 return connection_->socket()->Read(buf, buf_len, std::move(callback));
45 }
46
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)47 int WebSocketClientSocketHandleAdapter::Write(
48 IOBuffer* buf,
49 int buf_len,
50 CompletionOnceCallback callback,
51 const NetworkTrafficAnnotationTag& traffic_annotation) {
52 return connection_->socket()->Write(buf, buf_len, std::move(callback),
53 traffic_annotation);
54 }
55
Disconnect()56 void WebSocketClientSocketHandleAdapter::Disconnect() {
57 connection_->socket()->Disconnect();
58 }
59
is_initialized() const60 bool WebSocketClientSocketHandleAdapter::is_initialized() const {
61 return connection_->is_initialized();
62 }
63
WebSocketSpdyStreamAdapter(base::WeakPtr<SpdyStream> stream,Delegate * delegate,NetLogWithSource net_log)64 WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter(
65 base::WeakPtr<SpdyStream> stream,
66 Delegate* delegate,
67 NetLogWithSource net_log)
68 : stream_(stream), delegate_(delegate), net_log_(net_log) {
69 stream_->SetDelegate(this);
70 }
71
~WebSocketSpdyStreamAdapter()72 WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() {
73 if (stream_) {
74 // DetachDelegate() also cancels the stream.
75 stream_->DetachDelegate();
76 }
77 }
78
DetachDelegate()79 void WebSocketSpdyStreamAdapter::DetachDelegate() {
80 delegate_ = nullptr;
81 }
82
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)83 int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf,
84 int buf_len,
85 CompletionOnceCallback callback) {
86 DCHECK(!read_callback_);
87 DCHECK_LT(0, buf_len);
88
89 DCHECK(!read_buffer_);
90 read_buffer_ = buf;
91 // |read_length_| is size_t and |buf_len| is a non-negative int, therefore
92 // conversion is always valid.
93 DCHECK(!read_length_);
94 read_length_ = buf_len;
95
96 if (!read_data_.IsEmpty())
97 return CopySavedReadDataIntoBuffer();
98
99 if (!stream_)
100 return stream_error_;
101
102 read_callback_ = std::move(callback);
103 return ERR_IO_PENDING;
104 }
105
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)106 int WebSocketSpdyStreamAdapter::Write(
107 IOBuffer* buf,
108 int buf_len,
109 CompletionOnceCallback callback,
110 const NetworkTrafficAnnotationTag& traffic_annotation) {
111 CHECK(headers_sent_);
112 DCHECK(!write_callback_);
113 DCHECK(callback);
114 DCHECK_LT(0, buf_len);
115
116 if (!stream_)
117 return stream_error_;
118
119 stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND);
120 write_callback_ = std::move(callback);
121 write_length_ = buf_len;
122 return ERR_IO_PENDING;
123 }
124
Disconnect()125 void WebSocketSpdyStreamAdapter::Disconnect() {
126 if (stream_) {
127 stream_->DetachDelegate();
128 stream_ = nullptr;
129 }
130 }
131
is_initialized() const132 bool WebSocketSpdyStreamAdapter::is_initialized() const {
133 return true;
134 }
135
136 // SpdyStream::Delegate methods.
OnHeadersSent()137 void WebSocketSpdyStreamAdapter::OnHeadersSent() {
138 headers_sent_ = true;
139 if (delegate_)
140 delegate_->OnHeadersSent();
141 }
142
OnEarlyHintsReceived(const quiche::HttpHeaderBlock & headers)143 void WebSocketSpdyStreamAdapter::OnEarlyHintsReceived(
144 const quiche::HttpHeaderBlock& headers) {
145 // This callback should not be called for a WebSocket handshake.
146 NOTREACHED();
147 }
148
OnHeadersReceived(const quiche::HttpHeaderBlock & response_headers)149 void WebSocketSpdyStreamAdapter::OnHeadersReceived(
150 const quiche::HttpHeaderBlock& response_headers) {
151 if (delegate_)
152 delegate_->OnHeadersReceived(response_headers);
153 }
154
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)155 void WebSocketSpdyStreamAdapter::OnDataReceived(
156 std::unique_ptr<SpdyBuffer> buffer) {
157 if (!buffer) {
158 // This is slightly wrong semantically, as it's still possible to write to
159 // the stream at this point. However, if the server closes the stream
160 // without waiting for a close frame from us, that means it is not
161 // interested in a clean shutdown. In which case we don't need to worry
162 // about sending any remaining data we might have buffered. This results in
163 // a call to OnClose() which then informs our delegate.
164 stream_->Close();
165 return;
166 }
167
168 read_data_.Enqueue(std::move(buffer));
169 if (read_callback_)
170 std::move(read_callback_).Run(CopySavedReadDataIntoBuffer());
171 }
172
OnDataSent()173 void WebSocketSpdyStreamAdapter::OnDataSent() {
174 DCHECK(write_callback_);
175
176 std::move(write_callback_).Run(write_length_);
177 }
178
OnTrailers(const quiche::HttpHeaderBlock & trailers)179 void WebSocketSpdyStreamAdapter::OnTrailers(
180 const quiche::HttpHeaderBlock& trailers) {}
181
OnClose(int status)182 void WebSocketSpdyStreamAdapter::OnClose(int status) {
183 DCHECK_NE(ERR_IO_PENDING, status);
184 DCHECK_LE(status, 0);
185
186 if (status == OK) {
187 status = ERR_CONNECTION_CLOSED;
188 }
189
190 stream_error_ = status;
191 stream_ = nullptr;
192
193 auto self = weak_factory_.GetWeakPtr();
194
195 if (read_callback_) {
196 DCHECK(read_data_.IsEmpty());
197 // Might destroy |this|.
198 std::move(read_callback_).Run(status);
199 if (!self)
200 return;
201 }
202 if (write_callback_) {
203 // Might destroy |this|.
204 std::move(write_callback_).Run(status);
205 if (!self)
206 return;
207 }
208
209 // Delay calling delegate_->OnClose() until all buffered data are read.
210 if (read_data_.IsEmpty() && delegate_) {
211 // Might destroy |this|.
212 delegate_->OnClose(status);
213 }
214 }
215
CanGreaseFrameType() const216 bool WebSocketSpdyStreamAdapter::CanGreaseFrameType() const {
217 return false;
218 }
219
source_dependency() const220 NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const {
221 return net_log_.source();
222 }
223
CopySavedReadDataIntoBuffer()224 int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() {
225 DCHECK(read_buffer_);
226 DCHECK(read_length_);
227 int rv = read_data_.Dequeue(read_buffer_->data(), read_length_);
228 read_buffer_ = nullptr;
229 read_length_ = 0u;
230
231 // Stream has been destroyed earlier but delegate_->OnClose() call was
232 // delayed until all buffered data are read. PostTask so that Read() can
233 // return beforehand.
234 if (!stream_ && delegate_ && read_data_.IsEmpty()) {
235 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
236 FROM_HERE,
237 base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose,
238 weak_factory_.GetWeakPtr()));
239 }
240
241 return rv;
242 }
243
CallDelegateOnClose()244 void WebSocketSpdyStreamAdapter::CallDelegateOnClose() {
245 if (delegate_)
246 delegate_->OnClose(stream_error_);
247 }
248
WebSocketQuicStreamAdapter(WebSocketQuicSpdyStream * websocket_quic_spdy_stream,Delegate * delegate)249 WebSocketQuicStreamAdapter::WebSocketQuicStreamAdapter(
250 WebSocketQuicSpdyStream* websocket_quic_spdy_stream,
251 Delegate* delegate)
252 : websocket_quic_spdy_stream_(websocket_quic_spdy_stream),
253 delegate_(delegate) {
254 websocket_quic_spdy_stream_->set_delegate(this);
255 }
256
~WebSocketQuicStreamAdapter()257 WebSocketQuicStreamAdapter::~WebSocketQuicStreamAdapter() {
258 if (websocket_quic_spdy_stream_) {
259 websocket_quic_spdy_stream_->set_delegate(nullptr);
260 }
261 }
262
WriteHeaders(quiche::HttpHeaderBlock header_block,bool fin)263 size_t WebSocketQuicStreamAdapter::WriteHeaders(
264 quiche::HttpHeaderBlock header_block,
265 bool fin) {
266 return websocket_quic_spdy_stream_->WriteHeaders(std::move(header_block), fin,
267 nullptr);
268 }
269
270 // WebSocketBasicStream::Adapter methods.
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)271 int WebSocketQuicStreamAdapter::Read(IOBuffer* buf,
272 int buf_len,
273 CompletionOnceCallback callback) {
274 if (!websocket_quic_spdy_stream_) {
275 return ERR_UNEXPECTED;
276 }
277
278 int rv = websocket_quic_spdy_stream_->Read(buf, buf_len);
279 if (rv != ERR_IO_PENDING) {
280 return rv;
281 }
282
283 read_callback_ = std::move(callback);
284 read_buffer_ = buf;
285 read_length_ = buf_len;
286 return ERR_IO_PENDING;
287 }
288
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)289 int WebSocketQuicStreamAdapter::Write(
290 IOBuffer* buf,
291 int buf_len,
292 CompletionOnceCallback callback,
293 const NetworkTrafficAnnotationTag& traffic_annotation) {
294 // TODO(momoka): Write implementation.
295 return OK;
296 }
297
Disconnect()298 void WebSocketQuicStreamAdapter::Disconnect() {
299 if (websocket_quic_spdy_stream_) {
300 websocket_quic_spdy_stream_->Reset(quic::QUIC_STREAM_CANCELLED);
301 }
302 }
303
is_initialized() const304 bool WebSocketQuicStreamAdapter::is_initialized() const {
305 return true;
306 }
307
308 // WebSocketQuicSpdyStream::Delegate methods.
309
OnInitialHeadersComplete(bool fin,size_t frame_len,const quic::QuicHeaderList & quic_header_list)310 void WebSocketQuicStreamAdapter::OnInitialHeadersComplete(
311 bool fin,
312 size_t frame_len,
313 const quic::QuicHeaderList& quic_header_list) {
314 quiche::HttpHeaderBlock response_headers;
315 if (!quic::SpdyUtils::CopyAndValidateHeaders(quic_header_list, nullptr,
316 &response_headers)) {
317 DLOG(ERROR) << "Failed to parse header list: "
318 << quic_header_list.DebugString();
319 websocket_quic_spdy_stream_->ConsumeHeaderList();
320 websocket_quic_spdy_stream_->Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
321 return;
322 }
323 websocket_quic_spdy_stream_->ConsumeHeaderList();
324 delegate_->OnHeadersReceived(response_headers);
325 }
326
OnBodyAvailable()327 void WebSocketQuicStreamAdapter::OnBodyAvailable() {
328 if (!websocket_quic_spdy_stream_->FinishedReadingHeaders()) {
329 // Buffer the data in the sequencer until the headers have been read.
330 return;
331 }
332
333 if (!websocket_quic_spdy_stream_->HasBytesToRead()) {
334 return;
335 }
336
337 if (!read_callback_) {
338 // Wait for Read() to be called.
339 return;
340 }
341
342 DCHECK(read_buffer_);
343 DCHECK_GT(read_length_, 0);
344
345 int rv = websocket_quic_spdy_stream_->Read(read_buffer_, read_length_);
346
347 if (rv == ERR_IO_PENDING) {
348 return;
349 }
350
351 read_buffer_ = nullptr;
352 read_length_ = 0;
353 std::move(read_callback_).Run(rv);
354 }
355
ClearStream()356 void WebSocketQuicStreamAdapter::ClearStream() {
357 if (websocket_quic_spdy_stream_) {
358 websocket_quic_spdy_stream_ = nullptr;
359 }
360 }
361
362 } // namespace net
363