• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef NET_WEBSOCKETS_WEBSOCKET_BASIC_STREAM_H_
6 #define NET_WEBSOCKETS_WEBSOCKET_BASIC_STREAM_H_
7 
8 #include <stddef.h>
9 #include <stdint.h>
10 
11 #include <memory>
12 #include <string>
13 #include <vector>
14 
15 #include "base/containers/heap_array.h"
16 #include "base/containers/queue.h"
17 #include "base/containers/span.h"
18 #include "base/memory/scoped_refptr.h"
19 #include "base/time/time.h"
20 #include "net/base/completion_once_callback.h"
21 #include "net/base/net_export.h"
22 #include "net/log/net_log_with_source.h"
23 #include "net/traffic_annotation/network_traffic_annotation.h"
24 #include "net/websockets/websocket_chunk_assembler.h"
25 #include "net/websockets/websocket_frame.h"
26 #include "net/websockets/websocket_frame_parser.h"
27 #include "net/websockets/websocket_stream.h"
28 
29 namespace base {
30 class TimeTicks;
31 }  // namespace base
32 
33 namespace net {
34 
35 class ClientSocketHandle;
36 class DrainableIOBuffer;
37 class GrowableIOBuffer;
38 class IOBuffer;
39 class IOBufferWithSize;
40 struct WebSocketFrame;
41 struct WebSocketFrameChunk;
42 struct NetworkTrafficAnnotationTag;
43 
44 // Implementation of WebSocketStream for non-multiplexed ws:// connections (or
45 // the physical side of a multiplexed ws:// connection).
46 //
47 // Please update the traffic annotations in the websocket_basic_stream.cc and
48 // websocket_stream.cc if the class is used for any communication with Google.
49 // In such a case, annotation should be passed from the callers to this class
50 // and a local annotation can not be used anymore.
51 class NET_EXPORT_PRIVATE WebSocketBasicStream final : public WebSocketStream {
52  public:
53   typedef WebSocketMaskingKey (*WebSocketMaskingKeyGeneratorFunction)();
54 
55   enum class BufferSize : uint8_t {
56     kSmall,
57     kLarge,
58   };
59 
60   // A class that calculates whether the associated WebSocketBasicStream
61   // should use a small buffer or large buffer, given the timing information
62   // or Read calls. This class is public for testing.
63   class NET_EXPORT_PRIVATE BufferSizeManager final {
64    public:
65     BufferSizeManager();
66     BufferSizeManager(const BufferSizeManager&) = delete;
67     BufferSizeManager& operator=(const BufferSizeManager&) = delete;
68     ~BufferSizeManager();
69 
70     // Called when the associated WebSocketBasicStream starts reading data
71     // into a buffer.
72     void OnRead(base::TimeTicks now);
73 
74     // Called when the Read operation completes. `size` must be positive.
75     void OnReadComplete(base::TimeTicks now, int size);
76 
77     // Returns the appropriate buffer size the associated WebSocketBasicStream
78     // should use.
buffer_size()79     BufferSize buffer_size() const { return buffer_size_; }
80 
81     // Set the rolling average window for tests.
set_window_for_test(size_t size)82     void set_window_for_test(size_t size) { rolling_average_window_ = size; }
83 
84    private:
85     // This keeps the best read buffer size.
86     BufferSize buffer_size_ = BufferSize::kSmall;
87 
88     // The number of results to calculate the throughput. This is a variable so
89     // that unittests can set other values.
90     size_t rolling_average_window_ = 100;
91 
92     // This keeps the timestamps to calculate the throughput.
93     base::queue<base::TimeTicks> read_start_timestamps_;
94 
95     // The sum of the last few read size.
96     int rolling_byte_total_ = 0;
97 
98     // This keeps the read size.
99     base::queue<int> recent_read_sizes_;
100   };
101 
102   // Adapter that allows WebSocketBasicStream to use
103   // either a TCP/IP or TLS socket, or an HTTP/2 stream.
104   class Adapter {
105    public:
106     virtual ~Adapter() = default;
107     virtual int Read(IOBuffer* buf,
108                      int buf_len,
109                      CompletionOnceCallback callback) = 0;
110     virtual int Write(
111         IOBuffer* buf,
112         int buf_len,
113         CompletionOnceCallback callback,
114         const NetworkTrafficAnnotationTag& traffic_annotation) = 0;
115     virtual void Disconnect() = 0;
116     virtual bool is_initialized() const = 0;
117   };
118 
119   // This class should not normally be constructed directly; see
120   // WebSocketStream::CreateAndConnectStream() and
121   // WebSocketBasicHandshakeStream::Upgrade().
122   WebSocketBasicStream(std::unique_ptr<Adapter> connection,
123                        const scoped_refptr<GrowableIOBuffer>& http_read_buffer,
124                        const std::string& sub_protocol,
125                        const std::string& extensions,
126                        const NetLogWithSource& net_log);
127 
128   // The destructor has to make sure the connection is closed when we finish so
129   // that it does not get returned to the pool.
130   ~WebSocketBasicStream() override;
131 
132   // WebSocketStream implementation.
133   int ReadFrames(std::vector<std::unique_ptr<WebSocketFrame>>* frames,
134                  CompletionOnceCallback callback) override;
135 
136   int WriteFrames(std::vector<std::unique_ptr<WebSocketFrame>>* frames,
137                   CompletionOnceCallback callback) override;
138 
139   void Close() override;
140 
141   std::string GetSubProtocol() const override;
142 
143   std::string GetExtensions() const override;
144 
145   const NetLogWithSource& GetNetLogWithSource() const override;
146 
147   ////////////////////////////////////////////////////////////////////////////
148   // Methods for testing only.
149 
150   static std::unique_ptr<WebSocketBasicStream>
151   CreateWebSocketBasicStreamForTesting(
152       std::unique_ptr<ClientSocketHandle> connection,
153       const scoped_refptr<GrowableIOBuffer>& http_read_buffer,
154       const std::string& sub_protocol,
155       const std::string& extensions,
156       const NetLogWithSource& net_log,
157       WebSocketMaskingKeyGeneratorFunction key_generator_function);
158 
159  private:
160   // Reads until socket read returns asynchronously or returns error.
161   // If returns ERR_IO_PENDING, then |read_callback_| will be called with result
162   // later.
163   int ReadEverything(std::vector<std::unique_ptr<WebSocketFrame>>* frames);
164 
165   // Called when a read completes. Parses the result, tries to read more.
166   // Might call |read_callback_|.
167   void OnReadComplete(std::vector<std::unique_ptr<WebSocketFrame>>* frames,
168                       int result);
169 
170   // Writes until |buffer| is fully drained (in which case returns OK) or a
171   // socket write returns asynchronously or returns an error.  If returns
172   // ERR_IO_PENDING, then |write_callback_| will be called with result later.
173   int WriteEverything(const scoped_refptr<DrainableIOBuffer>& buffer);
174 
175   // Called when a write completes.  Tries to write more.
176   // Might call |write_callback_|.
177   void OnWriteComplete(const scoped_refptr<DrainableIOBuffer>& buffer,
178                        int result);
179 
180   // Attempts to parse the output of a read as WebSocket frames. On success,
181   // returns OK and places the frame(s) in |frames|.
182   int HandleReadResult(int result,
183                        std::vector<std::unique_ptr<WebSocketFrame>>* frames);
184 
185   // Converts the chunks in |frame_chunks| into frames and writes them to
186   // |frames|. |frame_chunks| is destroyed in the process. Returns
187   // ERR_WS_PROTOCOL_ERROR if an invalid chunk was found. If one or more frames
188   // was added to |frames|, then returns OK, otherwise returns ERR_IO_PENDING.
189   int ConvertChunksToFrames(
190       std::vector<std::unique_ptr<WebSocketFrameChunk>>* frame_chunks,
191       std::vector<std::unique_ptr<WebSocketFrame>>* frames);
192 
193   // Storage for pending reads.
194   scoped_refptr<IOBufferWithSize> read_buffer_;
195 
196   // The best read buffer size for the current throughput.
197   size_t target_read_buffer_size_;
198 
199   // The connection, wrapped in a ClientSocketHandle so that we can prevent it
200   // from being returned to the pool.
201   std::unique_ptr<Adapter> connection_;
202 
203   // Storage for payload of multiple control frames.
204   std::vector<base::HeapArray<uint8_t>> control_frame_payloads_;
205 
206   // Only used during handshake. Some data may be left in this buffer after the
207   // handshake, in which case it will be picked up during the first call to
208   // ReadFrames(). The type is GrowableIOBuffer for compatibility with
209   // net::HttpStreamParser, which is used to parse the handshake.
210   scoped_refptr<GrowableIOBuffer> http_read_buffer_;
211   // Flag to keep above buffer until next ReadFrames() after decoding.
212   bool is_http_read_buffer_decoded_ = false;
213 
214   // This keeps the current parse state (including any incomplete headers) and
215   // parses frames.
216   WebSocketFrameParser parser_;
217 
218   // The negotated sub-protocol, or empty for none.
219   const std::string sub_protocol_;
220 
221   // The extensions negotiated with the remote server.
222   const std::string extensions_;
223 
224   NetLogWithSource net_log_;
225 
226   // This is used for adaptive read buffer size.
227   BufferSizeManager buffer_size_manager_;
228 
229   // This keeps the current read buffer size.
230   BufferSize buffer_size_ = buffer_size_manager_.buffer_size();
231 
232   // This can be overridden in tests to make the output deterministic. We don't
233   // use a Callback here because a function pointer is faster and good enough
234   // for our purposes.
235   WebSocketMaskingKeyGeneratorFunction generate_websocket_masking_key_;
236 
237   // User callback saved for asynchronous writes and reads.
238   CompletionOnceCallback write_callback_;
239   CompletionOnceCallback read_callback_;
240 
241   // Used to assemble FrameChunks into Frames.
242   WebSocketChunkAssembler chunk_assembler_;
243 };
244 
245 }  // namespace net
246 
247 #endif  // NET_WEBSOCKETS_WEBSOCKET_BASIC_STREAM_H_
248