1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_basic_stream_adapters.h"
6
7 #include <algorithm>
8 #include <cstring>
9 #include <utility>
10
11 #include "base/functional/bind.h"
12 #include "base/location.h"
13 #include "base/task/single_thread_task_runner.h"
14 #include "net/base/io_buffer.h"
15 #include "net/socket/client_socket_handle.h"
16 #include "net/socket/socket.h"
17 #include "net/spdy/spdy_buffer.h"
18 #include "net/third_party/quiche/src/quiche/quic/core/http/quic_spdy_stream.h"
19 #include "net/third_party/quiche/src/quiche/quic/core/http/spdy_utils.h"
20 #include "net/websockets/websocket_quic_spdy_stream.h"
21
22 namespace net {
23
WebSocketClientSocketHandleAdapter(std::unique_ptr<ClientSocketHandle> connection)24 WebSocketClientSocketHandleAdapter::WebSocketClientSocketHandleAdapter(
25 std::unique_ptr<ClientSocketHandle> connection)
26 : connection_(std::move(connection)) {}
27
28 WebSocketClientSocketHandleAdapter::~WebSocketClientSocketHandleAdapter() =
29 default;
30
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)31 int WebSocketClientSocketHandleAdapter::Read(IOBuffer* buf,
32 int buf_len,
33 CompletionOnceCallback callback) {
34 return connection_->socket()->Read(buf, buf_len, std::move(callback));
35 }
36
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)37 int WebSocketClientSocketHandleAdapter::Write(
38 IOBuffer* buf,
39 int buf_len,
40 CompletionOnceCallback callback,
41 const NetworkTrafficAnnotationTag& traffic_annotation) {
42 return connection_->socket()->Write(buf, buf_len, std::move(callback),
43 traffic_annotation);
44 }
45
Disconnect()46 void WebSocketClientSocketHandleAdapter::Disconnect() {
47 connection_->socket()->Disconnect();
48 }
49
is_initialized() const50 bool WebSocketClientSocketHandleAdapter::is_initialized() const {
51 return connection_->is_initialized();
52 }
53
WebSocketSpdyStreamAdapter(base::WeakPtr<SpdyStream> stream,Delegate * delegate,NetLogWithSource net_log)54 WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter(
55 base::WeakPtr<SpdyStream> stream,
56 Delegate* delegate,
57 NetLogWithSource net_log)
58 : stream_(stream), delegate_(delegate), net_log_(net_log) {
59 stream_->SetDelegate(this);
60 }
61
~WebSocketSpdyStreamAdapter()62 WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() {
63 if (stream_) {
64 // DetachDelegate() also cancels the stream.
65 stream_->DetachDelegate();
66 }
67 }
68
DetachDelegate()69 void WebSocketSpdyStreamAdapter::DetachDelegate() {
70 delegate_ = nullptr;
71 }
72
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)73 int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf,
74 int buf_len,
75 CompletionOnceCallback callback) {
76 DCHECK(!read_callback_);
77 DCHECK_LT(0, buf_len);
78
79 DCHECK(!read_buffer_);
80 read_buffer_ = buf;
81 // |read_length_| is size_t and |buf_len| is a non-negative int, therefore
82 // conversion is always valid.
83 DCHECK(!read_length_);
84 read_length_ = buf_len;
85
86 if (!read_data_.IsEmpty())
87 return CopySavedReadDataIntoBuffer();
88
89 if (!stream_)
90 return stream_error_;
91
92 read_callback_ = std::move(callback);
93 return ERR_IO_PENDING;
94 }
95
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)96 int WebSocketSpdyStreamAdapter::Write(
97 IOBuffer* buf,
98 int buf_len,
99 CompletionOnceCallback callback,
100 const NetworkTrafficAnnotationTag& traffic_annotation) {
101 CHECK(headers_sent_);
102 DCHECK(!write_callback_);
103 DCHECK(callback);
104 DCHECK_LT(0, buf_len);
105
106 if (!stream_)
107 return stream_error_;
108
109 stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND);
110 write_callback_ = std::move(callback);
111 write_length_ = buf_len;
112 return ERR_IO_PENDING;
113 }
114
Disconnect()115 void WebSocketSpdyStreamAdapter::Disconnect() {
116 if (stream_) {
117 stream_->DetachDelegate();
118 stream_ = nullptr;
119 }
120 }
121
is_initialized() const122 bool WebSocketSpdyStreamAdapter::is_initialized() const {
123 return true;
124 }
125
126 // SpdyStream::Delegate methods.
OnHeadersSent()127 void WebSocketSpdyStreamAdapter::OnHeadersSent() {
128 headers_sent_ = true;
129 if (delegate_)
130 delegate_->OnHeadersSent();
131 }
132
OnEarlyHintsReceived(const spdy::Http2HeaderBlock & headers)133 void WebSocketSpdyStreamAdapter::OnEarlyHintsReceived(
134 const spdy::Http2HeaderBlock& headers) {
135 // This callback should not be called for a WebSocket handshake.
136 NOTREACHED();
137 }
138
OnHeadersReceived(const spdy::Http2HeaderBlock & response_headers,const spdy::Http2HeaderBlock * pushed_request_headers)139 void WebSocketSpdyStreamAdapter::OnHeadersReceived(
140 const spdy::Http2HeaderBlock& response_headers,
141 const spdy::Http2HeaderBlock* pushed_request_headers) {
142 if (delegate_)
143 delegate_->OnHeadersReceived(response_headers);
144 }
145
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)146 void WebSocketSpdyStreamAdapter::OnDataReceived(
147 std::unique_ptr<SpdyBuffer> buffer) {
148 if (!buffer) {
149 // This is slightly wrong semantically, as it's still possible to write to
150 // the stream at this point. However, if the server closes the stream
151 // without waiting for a close frame from us, that means it is not
152 // interested in a clean shutdown. In which case we don't need to worry
153 // about sending any remaining data we might have buffered. This results in
154 // a call to OnClose() which then informs our delegate.
155 stream_->Close();
156 return;
157 }
158
159 read_data_.Enqueue(std::move(buffer));
160 if (read_callback_)
161 std::move(read_callback_).Run(CopySavedReadDataIntoBuffer());
162 }
163
OnDataSent()164 void WebSocketSpdyStreamAdapter::OnDataSent() {
165 DCHECK(write_callback_);
166
167 std::move(write_callback_).Run(write_length_);
168 }
169
OnTrailers(const spdy::Http2HeaderBlock & trailers)170 void WebSocketSpdyStreamAdapter::OnTrailers(
171 const spdy::Http2HeaderBlock& trailers) {}
172
OnClose(int status)173 void WebSocketSpdyStreamAdapter::OnClose(int status) {
174 DCHECK_NE(ERR_IO_PENDING, status);
175 DCHECK_LE(status, 0);
176
177 if (status == OK) {
178 status = ERR_CONNECTION_CLOSED;
179 }
180
181 stream_error_ = status;
182 stream_ = nullptr;
183
184 auto self = weak_factory_.GetWeakPtr();
185
186 if (read_callback_) {
187 DCHECK(read_data_.IsEmpty());
188 // Might destroy |this|.
189 std::move(read_callback_).Run(status);
190 if (!self)
191 return;
192 }
193 if (write_callback_) {
194 // Might destroy |this|.
195 std::move(write_callback_).Run(status);
196 if (!self)
197 return;
198 }
199
200 // Delay calling delegate_->OnClose() until all buffered data are read.
201 if (read_data_.IsEmpty() && delegate_) {
202 // Might destroy |this|.
203 delegate_->OnClose(status);
204 }
205 }
206
CanGreaseFrameType() const207 bool WebSocketSpdyStreamAdapter::CanGreaseFrameType() const {
208 return false;
209 }
210
source_dependency() const211 NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const {
212 return net_log_.source();
213 }
214
CopySavedReadDataIntoBuffer()215 int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() {
216 DCHECK(read_buffer_);
217 DCHECK(read_length_);
218 int rv = read_data_.Dequeue(read_buffer_->data(), read_length_);
219 read_buffer_ = nullptr;
220 read_length_ = 0u;
221
222 // Stream has been destroyed earlier but delegate_->OnClose() call was
223 // delayed until all buffered data are read. PostTask so that Read() can
224 // return beforehand.
225 if (!stream_ && delegate_ && read_data_.IsEmpty()) {
226 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
227 FROM_HERE,
228 base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose,
229 weak_factory_.GetWeakPtr()));
230 }
231
232 return rv;
233 }
234
CallDelegateOnClose()235 void WebSocketSpdyStreamAdapter::CallDelegateOnClose() {
236 if (delegate_)
237 delegate_->OnClose(stream_error_);
238 }
239
WebSocketQuicStreamAdapter(WebSocketQuicSpdyStream * websocket_quic_spdy_stream,Delegate * delegate)240 WebSocketQuicStreamAdapter::WebSocketQuicStreamAdapter(
241 WebSocketQuicSpdyStream* websocket_quic_spdy_stream,
242 Delegate* delegate)
243 : websocket_quic_spdy_stream_(websocket_quic_spdy_stream),
244 delegate_(delegate) {
245 websocket_quic_spdy_stream_->set_delegate(this);
246 }
247
~WebSocketQuicStreamAdapter()248 WebSocketQuicStreamAdapter::~WebSocketQuicStreamAdapter() {
249 if (websocket_quic_spdy_stream_) {
250 websocket_quic_spdy_stream_->set_delegate(nullptr);
251 }
252 }
253
WriteHeaders(spdy::Http2HeaderBlock header_block,bool fin)254 size_t WebSocketQuicStreamAdapter::WriteHeaders(
255 spdy::Http2HeaderBlock header_block,
256 bool fin) {
257 return websocket_quic_spdy_stream_->WriteHeaders(std::move(header_block), fin,
258 nullptr);
259 }
260
261 // WebSocketBasicStream::Adapter methods.
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)262 int WebSocketQuicStreamAdapter::Read(IOBuffer* buf,
263 int buf_len,
264 CompletionOnceCallback callback) {
265 if (!websocket_quic_spdy_stream_) {
266 return ERR_UNEXPECTED;
267 }
268
269 int rv = websocket_quic_spdy_stream_->Read(buf, buf_len);
270 if (rv != ERR_IO_PENDING) {
271 return rv;
272 }
273
274 read_callback_ = std::move(callback);
275 read_buffer_ = buf;
276 read_length_ = buf_len;
277 return ERR_IO_PENDING;
278 }
279
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)280 int WebSocketQuicStreamAdapter::Write(
281 IOBuffer* buf,
282 int buf_len,
283 CompletionOnceCallback callback,
284 const NetworkTrafficAnnotationTag& traffic_annotation) {
285 // TODO(momoka): Write implementation.
286 return OK;
287 }
288
Disconnect()289 void WebSocketQuicStreamAdapter::Disconnect() {
290 if (websocket_quic_spdy_stream_) {
291 websocket_quic_spdy_stream_->Reset(quic::QUIC_STREAM_CANCELLED);
292 }
293 }
294
is_initialized() const295 bool WebSocketQuicStreamAdapter::is_initialized() const {
296 return true;
297 }
298
299 // WebSocketQuicSpdyStream::Delegate methods.
300
OnInitialHeadersComplete(bool fin,size_t frame_len,const quic::QuicHeaderList & quic_header_list)301 void WebSocketQuicStreamAdapter::OnInitialHeadersComplete(
302 bool fin,
303 size_t frame_len,
304 const quic::QuicHeaderList& quic_header_list) {
305 spdy::Http2HeaderBlock response_headers;
306 if (!quic::SpdyUtils::CopyAndValidateHeaders(quic_header_list, nullptr,
307 &response_headers)) {
308 DLOG(ERROR) << "Failed to parse header list: "
309 << quic_header_list.DebugString();
310 websocket_quic_spdy_stream_->ConsumeHeaderList();
311 websocket_quic_spdy_stream_->Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
312 return;
313 }
314 websocket_quic_spdy_stream_->ConsumeHeaderList();
315 delegate_->OnHeadersReceived(response_headers);
316 }
317
OnBodyAvailable()318 void WebSocketQuicStreamAdapter::OnBodyAvailable() {
319 if (!websocket_quic_spdy_stream_->FinishedReadingHeaders()) {
320 // Buffer the data in the sequencer until the headers have been read.
321 return;
322 }
323
324 if (!websocket_quic_spdy_stream_->HasBytesToRead()) {
325 return;
326 }
327
328 if (!read_callback_) {
329 // Wait for Read() to be called.
330 return;
331 }
332
333 DCHECK(read_buffer_);
334 DCHECK_GT(read_length_, 0);
335
336 int rv = websocket_quic_spdy_stream_->Read(read_buffer_, read_length_);
337
338 if (rv == ERR_IO_PENDING) {
339 return;
340 }
341
342 read_buffer_ = nullptr;
343 read_length_ = 0;
344 std::move(read_callback_).Run(rv);
345 }
346
ClearStream()347 void WebSocketQuicStreamAdapter::ClearStream() {
348 if (websocket_quic_spdy_stream_) {
349 websocket_quic_spdy_stream_ = nullptr;
350 }
351 }
352
353 } // namespace net
354