• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/websockets/websocket_basic_stream_adapters.h"
6 
7 #include <cstring>
8 #include <ostream>
9 #include <utility>
10 
11 #include "base/check.h"
12 #include "base/check_op.h"
13 #include "base/functional/bind.h"
14 #include "base/functional/callback.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/notreached.h"
19 #include "base/task/single_thread_task_runner.h"
20 #include "net/base/io_buffer.h"
21 #include "net/socket/client_socket_handle.h"
22 #include "net/socket/stream_socket.h"
23 #include "net/socket/stream_socket_handle.h"
24 #include "net/spdy/spdy_buffer.h"
25 #include "net/third_party/quiche/src/quiche/quic/core/http/quic_header_list.h"
26 #include "net/third_party/quiche/src/quiche/quic/core/http/spdy_utils.h"
27 #include "net/third_party/quiche/src/quiche/quic/core/quic_ack_listener_interface.h"
28 #include "net/third_party/quiche/src/quiche/quic/core/quic_error_codes.h"
29 #include "net/websockets/websocket_quic_spdy_stream.h"
30 
31 namespace net {
32 struct NetworkTrafficAnnotationTag;
33 
WebSocketClientSocketHandleAdapter(std::unique_ptr<StreamSocketHandle> connection)34 WebSocketClientSocketHandleAdapter::WebSocketClientSocketHandleAdapter(
35     std::unique_ptr<StreamSocketHandle> connection)
36     : connection_(std::move(connection)) {}
37 
38 WebSocketClientSocketHandleAdapter::~WebSocketClientSocketHandleAdapter() =
39     default;
40 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)41 int WebSocketClientSocketHandleAdapter::Read(IOBuffer* buf,
42                                              int buf_len,
43                                              CompletionOnceCallback callback) {
44   return connection_->socket()->Read(buf, buf_len, std::move(callback));
45 }
46 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)47 int WebSocketClientSocketHandleAdapter::Write(
48     IOBuffer* buf,
49     int buf_len,
50     CompletionOnceCallback callback,
51     const NetworkTrafficAnnotationTag& traffic_annotation) {
52   return connection_->socket()->Write(buf, buf_len, std::move(callback),
53                                       traffic_annotation);
54 }
55 
Disconnect()56 void WebSocketClientSocketHandleAdapter::Disconnect() {
57   connection_->socket()->Disconnect();
58 }
59 
is_initialized() const60 bool WebSocketClientSocketHandleAdapter::is_initialized() const {
61   return connection_->is_initialized();
62 }
63 
WebSocketSpdyStreamAdapter(base::WeakPtr<SpdyStream> stream,Delegate * delegate,NetLogWithSource net_log)64 WebSocketSpdyStreamAdapter::WebSocketSpdyStreamAdapter(
65     base::WeakPtr<SpdyStream> stream,
66     Delegate* delegate,
67     NetLogWithSource net_log)
68     : stream_(stream), delegate_(delegate), net_log_(net_log) {
69   stream_->SetDelegate(this);
70 }
71 
~WebSocketSpdyStreamAdapter()72 WebSocketSpdyStreamAdapter::~WebSocketSpdyStreamAdapter() {
73   if (stream_) {
74     // DetachDelegate() also cancels the stream.
75     stream_->DetachDelegate();
76   }
77 }
78 
DetachDelegate()79 void WebSocketSpdyStreamAdapter::DetachDelegate() {
80   delegate_ = nullptr;
81 }
82 
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)83 int WebSocketSpdyStreamAdapter::Read(IOBuffer* buf,
84                                      int buf_len,
85                                      CompletionOnceCallback callback) {
86   DCHECK(!read_callback_);
87   DCHECK_LT(0, buf_len);
88 
89   DCHECK(!read_buffer_);
90   read_buffer_ = buf;
91   // |read_length_| is size_t and |buf_len| is a non-negative int, therefore
92   // conversion is always valid.
93   DCHECK(!read_length_);
94   read_length_ = buf_len;
95 
96   if (!read_data_.IsEmpty())
97     return CopySavedReadDataIntoBuffer();
98 
99   if (!stream_)
100     return stream_error_;
101 
102   read_callback_ = std::move(callback);
103   return ERR_IO_PENDING;
104 }
105 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)106 int WebSocketSpdyStreamAdapter::Write(
107     IOBuffer* buf,
108     int buf_len,
109     CompletionOnceCallback callback,
110     const NetworkTrafficAnnotationTag& traffic_annotation) {
111   CHECK(headers_sent_);
112   DCHECK(!write_callback_);
113   DCHECK(callback);
114   DCHECK_LT(0, buf_len);
115 
116   if (!stream_)
117     return stream_error_;
118 
119   stream_->SendData(buf, buf_len, MORE_DATA_TO_SEND);
120   write_callback_ = std::move(callback);
121   write_length_ = buf_len;
122   return ERR_IO_PENDING;
123 }
124 
Disconnect()125 void WebSocketSpdyStreamAdapter::Disconnect() {
126   if (stream_) {
127     stream_->DetachDelegate();
128     stream_ = nullptr;
129   }
130 }
131 
is_initialized() const132 bool WebSocketSpdyStreamAdapter::is_initialized() const {
133   return true;
134 }
135 
136 // SpdyStream::Delegate methods.
OnHeadersSent()137 void WebSocketSpdyStreamAdapter::OnHeadersSent() {
138   headers_sent_ = true;
139   if (delegate_)
140     delegate_->OnHeadersSent();
141 }
142 
OnEarlyHintsReceived(const quiche::HttpHeaderBlock & headers)143 void WebSocketSpdyStreamAdapter::OnEarlyHintsReceived(
144     const quiche::HttpHeaderBlock& headers) {
145   // This callback should not be called for a WebSocket handshake.
146   NOTREACHED();
147 }
148 
OnHeadersReceived(const quiche::HttpHeaderBlock & response_headers)149 void WebSocketSpdyStreamAdapter::OnHeadersReceived(
150     const quiche::HttpHeaderBlock& response_headers) {
151   if (delegate_)
152     delegate_->OnHeadersReceived(response_headers);
153 }
154 
OnDataReceived(std::unique_ptr<SpdyBuffer> buffer)155 void WebSocketSpdyStreamAdapter::OnDataReceived(
156     std::unique_ptr<SpdyBuffer> buffer) {
157   if (!buffer) {
158     // This is slightly wrong semantically, as it's still possible to write to
159     // the stream at this point. However, if the server closes the stream
160     // without waiting for a close frame from us, that means it is not
161     // interested in a clean shutdown. In which case we don't need to worry
162     // about sending any remaining data we might have buffered. This results in
163     // a call to OnClose() which then informs our delegate.
164     stream_->Close();
165     return;
166   }
167 
168   read_data_.Enqueue(std::move(buffer));
169   if (read_callback_)
170     std::move(read_callback_).Run(CopySavedReadDataIntoBuffer());
171 }
172 
OnDataSent()173 void WebSocketSpdyStreamAdapter::OnDataSent() {
174   DCHECK(write_callback_);
175 
176   std::move(write_callback_).Run(write_length_);
177 }
178 
OnTrailers(const quiche::HttpHeaderBlock & trailers)179 void WebSocketSpdyStreamAdapter::OnTrailers(
180     const quiche::HttpHeaderBlock& trailers) {}
181 
OnClose(int status)182 void WebSocketSpdyStreamAdapter::OnClose(int status) {
183   DCHECK_NE(ERR_IO_PENDING, status);
184   DCHECK_LE(status, 0);
185 
186   if (status == OK) {
187     status = ERR_CONNECTION_CLOSED;
188   }
189 
190   stream_error_ = status;
191   stream_ = nullptr;
192 
193   auto self = weak_factory_.GetWeakPtr();
194 
195   if (read_callback_) {
196     DCHECK(read_data_.IsEmpty());
197     // Might destroy |this|.
198     std::move(read_callback_).Run(status);
199     if (!self)
200       return;
201   }
202   if (write_callback_) {
203     // Might destroy |this|.
204     std::move(write_callback_).Run(status);
205     if (!self)
206       return;
207   }
208 
209   // Delay calling delegate_->OnClose() until all buffered data are read.
210   if (read_data_.IsEmpty() && delegate_) {
211     // Might destroy |this|.
212     delegate_->OnClose(status);
213   }
214 }
215 
CanGreaseFrameType() const216 bool WebSocketSpdyStreamAdapter::CanGreaseFrameType() const {
217   return false;
218 }
219 
source_dependency() const220 NetLogSource WebSocketSpdyStreamAdapter::source_dependency() const {
221   return net_log_.source();
222 }
223 
CopySavedReadDataIntoBuffer()224 int WebSocketSpdyStreamAdapter::CopySavedReadDataIntoBuffer() {
225   DCHECK(read_buffer_);
226   DCHECK(read_length_);
227   int rv = read_data_.Dequeue(read_buffer_->data(), read_length_);
228   read_buffer_ = nullptr;
229   read_length_ = 0u;
230 
231   // Stream has been destroyed earlier but delegate_->OnClose() call was
232   // delayed until all buffered data are read.  PostTask so that Read() can
233   // return beforehand.
234   if (!stream_ && delegate_ && read_data_.IsEmpty()) {
235     base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
236         FROM_HERE,
237         base::BindOnce(&WebSocketSpdyStreamAdapter::CallDelegateOnClose,
238                        weak_factory_.GetWeakPtr()));
239   }
240 
241   return rv;
242 }
243 
CallDelegateOnClose()244 void WebSocketSpdyStreamAdapter::CallDelegateOnClose() {
245   if (delegate_)
246     delegate_->OnClose(stream_error_);
247 }
248 
WebSocketQuicStreamAdapter(WebSocketQuicSpdyStream * websocket_quic_spdy_stream,Delegate * delegate)249 WebSocketQuicStreamAdapter::WebSocketQuicStreamAdapter(
250     WebSocketQuicSpdyStream* websocket_quic_spdy_stream,
251     Delegate* delegate)
252     : websocket_quic_spdy_stream_(websocket_quic_spdy_stream),
253       delegate_(delegate) {
254   websocket_quic_spdy_stream_->set_delegate(this);
255 }
256 
~WebSocketQuicStreamAdapter()257 WebSocketQuicStreamAdapter::~WebSocketQuicStreamAdapter() {
258   if (websocket_quic_spdy_stream_) {
259     websocket_quic_spdy_stream_->set_delegate(nullptr);
260   }
261 }
262 
WriteHeaders(quiche::HttpHeaderBlock header_block,bool fin)263 size_t WebSocketQuicStreamAdapter::WriteHeaders(
264     quiche::HttpHeaderBlock header_block,
265     bool fin) {
266   return websocket_quic_spdy_stream_->WriteHeaders(std::move(header_block), fin,
267                                                    nullptr);
268 }
269 
270 // WebSocketBasicStream::Adapter methods.
Read(IOBuffer * buf,int buf_len,CompletionOnceCallback callback)271 int WebSocketQuicStreamAdapter::Read(IOBuffer* buf,
272                                      int buf_len,
273                                      CompletionOnceCallback callback) {
274   if (!websocket_quic_spdy_stream_) {
275     return ERR_UNEXPECTED;
276   }
277 
278   int rv = websocket_quic_spdy_stream_->Read(buf, buf_len);
279   if (rv != ERR_IO_PENDING) {
280     return rv;
281   }
282 
283   read_callback_ = std::move(callback);
284   read_buffer_ = buf;
285   read_length_ = buf_len;
286   return ERR_IO_PENDING;
287 }
288 
Write(IOBuffer * buf,int buf_len,CompletionOnceCallback callback,const NetworkTrafficAnnotationTag & traffic_annotation)289 int WebSocketQuicStreamAdapter::Write(
290     IOBuffer* buf,
291     int buf_len,
292     CompletionOnceCallback callback,
293     const NetworkTrafficAnnotationTag& traffic_annotation) {
294   // TODO(momoka): Write implementation.
295   return OK;
296 }
297 
Disconnect()298 void WebSocketQuicStreamAdapter::Disconnect() {
299   if (websocket_quic_spdy_stream_) {
300     websocket_quic_spdy_stream_->Reset(quic::QUIC_STREAM_CANCELLED);
301   }
302 }
303 
is_initialized() const304 bool WebSocketQuicStreamAdapter::is_initialized() const {
305   return true;
306 }
307 
308 // WebSocketQuicSpdyStream::Delegate methods.
309 
OnInitialHeadersComplete(bool fin,size_t frame_len,const quic::QuicHeaderList & quic_header_list)310 void WebSocketQuicStreamAdapter::OnInitialHeadersComplete(
311     bool fin,
312     size_t frame_len,
313     const quic::QuicHeaderList& quic_header_list) {
314   quiche::HttpHeaderBlock response_headers;
315   if (!quic::SpdyUtils::CopyAndValidateHeaders(quic_header_list, nullptr,
316                                                &response_headers)) {
317     DLOG(ERROR) << "Failed to parse header list: "
318                 << quic_header_list.DebugString();
319     websocket_quic_spdy_stream_->ConsumeHeaderList();
320     websocket_quic_spdy_stream_->Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
321     return;
322   }
323   websocket_quic_spdy_stream_->ConsumeHeaderList();
324   delegate_->OnHeadersReceived(response_headers);
325 }
326 
OnBodyAvailable()327 void WebSocketQuicStreamAdapter::OnBodyAvailable() {
328   if (!websocket_quic_spdy_stream_->FinishedReadingHeaders()) {
329     // Buffer the data in the sequencer until the headers have been read.
330     return;
331   }
332 
333   if (!websocket_quic_spdy_stream_->HasBytesToRead()) {
334     return;
335   }
336 
337   if (!read_callback_) {
338     // Wait for Read() to be called.
339     return;
340   }
341 
342   DCHECK(read_buffer_);
343   DCHECK_GT(read_length_, 0);
344 
345   int rv = websocket_quic_spdy_stream_->Read(read_buffer_, read_length_);
346 
347   if (rv == ERR_IO_PENDING) {
348     return;
349   }
350 
351   read_buffer_ = nullptr;
352   read_length_ = 0;
353   std::move(read_callback_).Run(rv);
354 }
355 
ClearStream()356 void WebSocketQuicStreamAdapter::ClearStream() {
357   if (websocket_quic_spdy_stream_) {
358     websocket_quic_spdy_stream_ = nullptr;
359   }
360 }
361 
362 }  // namespace net
363