• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/websockets/websocket_basic_stream_adapters.h"
6 
7 #include <algorithm>
8 #include <cstring>
9 #include <utility>
10 
11 #include "base/functional/bind.h"
12 #include "base/location.h"
13 #include "base/task/single_thread_task_runner.h"
14 #include "net/base/io_buffer.h"
15 #include "net/socket/client_socket_handle.h"
16 #include "net/socket/socket.h"
17 #include "net/spdy/spdy_buffer.h"
18 #include "net/third_party/quiche/src/quiche/quic/core/http/quic_spdy_stream.h"
19 #include "net/third_party/quiche/src/quiche/quic/core/http/spdy_utils.h"
20 #include "net/websockets/websocket_quic_spdy_stream.h"
21 
22 namespace net {
23 
WebSocketClientSocketHandleAdapter(std::unique_ptr<ClientSocketHandle> connection)24 WebSocketClientSocketHandleAdapter::WebSocketClientSocketHandleAdapter(
25     std::unique_ptr<ClientSocketHandle> connection)
26     : connection_(std::move(connection)) {}
27 
28 WebSocketClientSocketHandleAdapter::~WebSocketClientSocketHandleAdapter() =
29     default;
30 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)31 int WebSocketClientSocketHandleAdapter::Read(IOBuffer* buf,
32                                              int buf_len,
33                                              CompletionOnceCallback callback) {
34   return connection_->socket()->Read(buf, buf_len, std::move(callback));
35 }
36 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)37 int WebSocketClientSocketHandleAdapter::Write(
38     IOBuffer* buf,
39     int buf_len,
40     CompletionOnceCallback callback,
41     const NetworkTrafficAnnotationTag& traffic_annotation) {
42   return connection_->socket()->Write(buf, buf_len, std::move(callback),
43                                       traffic_annotation);
44 }
45 
Disconnect()46 void WebSocketClientSocketHandleAdapter::Disconnect() {
47   connection_->socket()->Disconnect();
48 }
49 
is_initialized() const50 bool WebSocketClientSocketHandleAdapter::is_initialized() const {
51   return connection_->is_initialized();
52 }
53 
WebSocketSpdyStreamAdapter(base::WeakPtr<SpdyStream> stream,Delegate * delegate,NetLogWithSource net_log)54 WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter(
55     base::WeakPtr<SpdyStream> stream,
56     Delegate* delegate,
57     NetLogWithSource net_log)
58     : stream_(stream), delegate_(delegate), net_log_(net_log) {
59   stream_->SetDelegate(this);
60 }
61 
~WebSocketSpdyStreamAdapter()62 WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() {
63   if (stream_) {
64     // DetachDelegate() also cancels the stream.
65     stream_->DetachDelegate();
66   }
67 }
68 
DetachDelegate()69 void WebSocketSpdyStreamAdapter::DetachDelegate() {
70   delegate_ = nullptr;
71 }
72 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)73 int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf,
74                                      int buf_len,
75                                      CompletionOnceCallback callback) {
76   DCHECK(!read_callback_);
77   DCHECK_LT(0, buf_len);
78 
79   DCHECK(!read_buffer_);
80   read_buffer_ = buf;
81   // |read_length_| is size_t and |buf_len| is a non-negative int, therefore
82   // conversion is always valid.
83   DCHECK(!read_length_);
84   read_length_ = buf_len;
85 
86   if (!read_data_.IsEmpty())
87     return CopySavedReadDataIntoBuffer();
88 
89   if (!stream_)
90     return stream_error_;
91 
92   read_callback_ = std::move(callback);
93   return ERR_IO_PENDING;
94 }
95 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)96 int WebSocketSpdyStreamAdapter::Write(
97     IOBuffer* buf,
98     int buf_len,
99     CompletionOnceCallback callback,
100     const NetworkTrafficAnnotationTag& traffic_annotation) {
101   CHECK(headers_sent_);
102   DCHECK(!write_callback_);
103   DCHECK(callback);
104   DCHECK_LT(0, buf_len);
105 
106   if (!stream_)
107     return stream_error_;
108 
109   stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND);
110   write_callback_ = std::move(callback);
111   write_length_ = buf_len;
112   return ERR_IO_PENDING;
113 }
114 
Disconnect()115 void WebSocketSpdyStreamAdapter::Disconnect() {
116   if (stream_) {
117     stream_->DetachDelegate();
118     stream_ = nullptr;
119   }
120 }
121 
is_initialized() const122 bool WebSocketSpdyStreamAdapter::is_initialized() const {
123   return true;
124 }
125 
126 // SpdyStream::Delegate methods.
OnHeadersSent()127 void WebSocketSpdyStreamAdapter::OnHeadersSent() {
128   headers_sent_ = true;
129   if (delegate_)
130     delegate_->OnHeadersSent();
131 }
132 
OnEarlyHintsReceived(const spdy::Http2HeaderBlock & headers)133 void WebSocketSpdyStreamAdapter::OnEarlyHintsReceived(
134     const spdy::Http2HeaderBlock& headers) {
135   // This callback should not be called for a WebSocket handshake.
136   NOTREACHED();
137 }
138 
OnHeadersReceived(const spdy::Http2HeaderBlock & response_headers,const spdy::Http2HeaderBlock * pushed_request_headers)139 void WebSocketSpdyStreamAdapter::OnHeadersReceived(
140     const spdy::Http2HeaderBlock& response_headers,
141     const spdy::Http2HeaderBlock* pushed_request_headers) {
142   if (delegate_)
143     delegate_->OnHeadersReceived(response_headers);
144 }
145 
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)146 void WebSocketSpdyStreamAdapter::OnDataReceived(
147     std::unique_ptr<SpdyBuffer> buffer) {
148   if (!buffer) {
149     // This is slightly wrong semantically, as it's still possible to write to
150     // the stream at this point. However, if the server closes the stream
151     // without waiting for a close frame from us, that means it is not
152     // interested in a clean shutdown. In which case we don't need to worry
153     // about sending any remaining data we might have buffered. This results in
154     // a call to OnClose() which then informs our delegate.
155     stream_->Close();
156     return;
157   }
158 
159   read_data_.Enqueue(std::move(buffer));
160   if (read_callback_)
161     std::move(read_callback_).Run(CopySavedReadDataIntoBuffer());
162 }
163 
OnDataSent()164 void WebSocketSpdyStreamAdapter::OnDataSent() {
165   DCHECK(write_callback_);
166 
167   std::move(write_callback_).Run(write_length_);
168 }
169 
OnTrailers(const spdy::Http2HeaderBlock & trailers)170 void WebSocketSpdyStreamAdapter::OnTrailers(
171     const spdy::Http2HeaderBlock& trailers) {}
172 
OnClose(int status)173 void WebSocketSpdyStreamAdapter::OnClose(int status) {
174   DCHECK_NE(ERR_IO_PENDING, status);
175   DCHECK_LE(status, 0);
176 
177   if (status == OK) {
178     status = ERR_CONNECTION_CLOSED;
179   }
180 
181   stream_error_ = status;
182   stream_ = nullptr;
183 
184   auto self = weak_factory_.GetWeakPtr();
185 
186   if (read_callback_) {
187     DCHECK(read_data_.IsEmpty());
188     // Might destroy |this|.
189     std::move(read_callback_).Run(status);
190     if (!self)
191       return;
192   }
193   if (write_callback_) {
194     // Might destroy |this|.
195     std::move(write_callback_).Run(status);
196     if (!self)
197       return;
198   }
199 
200   // Delay calling delegate_->OnClose() until all buffered data are read.
201   if (read_data_.IsEmpty() && delegate_) {
202     // Might destroy |this|.
203     delegate_->OnClose(status);
204   }
205 }
206 
CanGreaseFrameType() const207 bool WebSocketSpdyStreamAdapter::CanGreaseFrameType() const {
208   return false;
209 }
210 
source_dependency() const211 NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const {
212   return net_log_.source();
213 }
214 
CopySavedReadDataIntoBuffer()215 int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() {
216   DCHECK(read_buffer_);
217   DCHECK(read_length_);
218   int rv = read_data_.Dequeue(read_buffer_->data(), read_length_);
219   read_buffer_ = nullptr;
220   read_length_ = 0u;
221 
222   // Stream has been destroyed earlier but delegate_->OnClose() call was
223   // delayed until all buffered data are read.  PostTask so that Read() can
224   // return beforehand.
225   if (!stream_ && delegate_ && read_data_.IsEmpty()) {
226     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
227         FROM_HERE,
228         base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose,
229                        weak_factory_.GetWeakPtr()));
230   }
231 
232   return rv;
233 }
234 
CallDelegateOnClose()235 void WebSocketSpdyStreamAdapter::CallDelegateOnClose() {
236   if (delegate_)
237     delegate_->OnClose(stream_error_);
238 }
239 
WebSocketQuicStreamAdapter(WebSocketQuicSpdyStream * websocket_quic_spdy_stream,Delegate * delegate)240 WebSocketQuicStreamAdapter::WebSocketQuicStreamAdapter(
241     WebSocketQuicSpdyStream* websocket_quic_spdy_stream,
242     Delegate* delegate)
243     : websocket_quic_spdy_stream_(websocket_quic_spdy_stream),
244       delegate_(delegate) {
245   websocket_quic_spdy_stream_->set_delegate(this);
246 }
247 
~WebSocketQuicStreamAdapter()248 WebSocketQuicStreamAdapter::~WebSocketQuicStreamAdapter() {
249   if (websocket_quic_spdy_stream_) {
250     websocket_quic_spdy_stream_->set_delegate(nullptr);
251   }
252 }
253 
WriteHeaders(spdy::Http2HeaderBlock header_block,bool fin)254 size_t WebSocketQuicStreamAdapter::WriteHeaders(
255     spdy::Http2HeaderBlock header_block,
256     bool fin) {
257   return websocket_quic_spdy_stream_->WriteHeaders(std::move(header_block), fin,
258                                                    nullptr);
259 }
260 
261 // WebSocketBasicStream::Adapter methods.
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)262 int WebSocketQuicStreamAdapter::Read(IOBuffer* buf,
263                                      int buf_len,
264                                      CompletionOnceCallback callback) {
265   if (!websocket_quic_spdy_stream_) {
266     return ERR_UNEXPECTED;
267   }
268 
269   int rv = websocket_quic_spdy_stream_->Read(buf, buf_len);
270   if (rv != ERR_IO_PENDING) {
271     return rv;
272   }
273 
274   read_callback_ = std::move(callback);
275   read_buffer_ = buf;
276   read_length_ = buf_len;
277   return ERR_IO_PENDING;
278 }
279 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)280 int WebSocketQuicStreamAdapter::Write(
281     IOBuffer* buf,
282     int buf_len,
283     CompletionOnceCallback callback,
284     const NetworkTrafficAnnotationTag& traffic_annotation) {
285   // TODO(momoka): Write implementation.
286   return OK;
287 }
288 
Disconnect()289 void WebSocketQuicStreamAdapter::Disconnect() {
290   if (websocket_quic_spdy_stream_) {
291     websocket_quic_spdy_stream_->Reset(quic::QUIC_STREAM_CANCELLED);
292   }
293 }
294 
is_initialized() const295 bool WebSocketQuicStreamAdapter::is_initialized() const {
296   return true;
297 }
298 
299 // WebSocketQuicSpdyStream::Delegate methods.
300 
OnInitialHeadersComplete(bool fin,size_t frame_len,const quic::QuicHeaderList & quic_header_list)301 void WebSocketQuicStreamAdapter::OnInitialHeadersComplete(
302     bool fin,
303     size_t frame_len,
304     const quic::QuicHeaderList& quic_header_list) {
305   spdy::Http2HeaderBlock response_headers;
306   if (!quic::SpdyUtils::CopyAndValidateHeaders(quic_header_list, nullptr,
307                                                &response_headers)) {
308     DLOG(ERROR) << "Failed to parse header list: "
309                 << quic_header_list.DebugString();
310     websocket_quic_spdy_stream_->ConsumeHeaderList();
311     websocket_quic_spdy_stream_->Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
312     return;
313   }
314   websocket_quic_spdy_stream_->ConsumeHeaderList();
315   delegate_->OnHeadersReceived(response_headers);
316 }
317 
OnBodyAvailable()318 void WebSocketQuicStreamAdapter::OnBodyAvailable() {
319   if (!websocket_quic_spdy_stream_->FinishedReadingHeaders()) {
320     // Buffer the data in the sequencer until the headers have been read.
321     return;
322   }
323 
324   if (!websocket_quic_spdy_stream_->HasBytesToRead()) {
325     return;
326   }
327 
328   if (!read_callback_) {
329     // Wait for Read() to be called.
330     return;
331   }
332 
333   DCHECK(read_buffer_);
334   DCHECK_GT(read_length_, 0);
335 
336   int rv = websocket_quic_spdy_stream_->Read(read_buffer_, read_length_);
337 
338   if (rv == ERR_IO_PENDING) {
339     return;
340   }
341 
342   read_buffer_ = nullptr;
343   read_length_ = 0;
344   std::move(read_callback_).Run(rv);
345 }
346 
ClearStream()347 void WebSocketQuicStreamAdapter::ClearStream() {
348   if (websocket_quic_spdy_stream_) {
349     websocket_quic_spdy_stream_ = nullptr;
350   }
351 }
352 
353 }  // namespace net
354