1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_deflate_stream.h"
6
7 #include <algorithm>
8 #include <string>
9
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/memory/scoped_vector.h"
15 #include "net/base/completion_callback.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/websockets/websocket_deflate_predictor.h"
19 #include "net/websockets/websocket_deflater.h"
20 #include "net/websockets/websocket_errors.h"
21 #include "net/websockets/websocket_frame.h"
22 #include "net/websockets/websocket_inflater.h"
23 #include "net/websockets/websocket_stream.h"
24
25 class GURL;
26
27 namespace net {
28
29 namespace {
30
31 const int kWindowBits = 15;
32 const size_t kChunkSize = 4 * 1024;
33
34 } // namespace
35
WebSocketDeflateStream(scoped_ptr<WebSocketStream> stream,WebSocketDeflater::ContextTakeOverMode mode,scoped_ptr<WebSocketDeflatePredictor> predictor)36 WebSocketDeflateStream::WebSocketDeflateStream(
37 scoped_ptr<WebSocketStream> stream,
38 WebSocketDeflater::ContextTakeOverMode mode,
39 scoped_ptr<WebSocketDeflatePredictor> predictor)
40 : stream_(stream.Pass()),
41 deflater_(mode),
42 inflater_(kChunkSize, kChunkSize),
43 reading_state_(NOT_READING),
44 writing_state_(NOT_WRITING),
45 current_reading_opcode_(WebSocketFrameHeader::kOpCodeText),
46 current_writing_opcode_(WebSocketFrameHeader::kOpCodeText),
47 predictor_(predictor.Pass()) {
48 DCHECK(stream_);
49 deflater_.Initialize(kWindowBits);
50 inflater_.Initialize(kWindowBits);
51 }
52
~WebSocketDeflateStream()53 WebSocketDeflateStream::~WebSocketDeflateStream() {}
54
ReadFrames(ScopedVector<WebSocketFrame> * frames,const CompletionCallback & callback)55 int WebSocketDeflateStream::ReadFrames(ScopedVector<WebSocketFrame>* frames,
56 const CompletionCallback& callback) {
57 CompletionCallback callback_to_pass =
58 base::Bind(&WebSocketDeflateStream::OnReadComplete,
59 base::Unretained(this),
60 base::Unretained(frames),
61 callback);
62 int result = stream_->ReadFrames(frames, callback_to_pass);
63 if (result < 0)
64 return result;
65 DCHECK_EQ(OK, result);
66 return InflateAndReadIfNecessary(frames, callback_to_pass);
67 }
68
WriteFrames(ScopedVector<WebSocketFrame> * frames,const CompletionCallback & callback)69 int WebSocketDeflateStream::WriteFrames(ScopedVector<WebSocketFrame>* frames,
70 const CompletionCallback& callback) {
71 int result = Deflate(frames);
72 if (result != OK)
73 return result;
74 if (frames->empty())
75 return OK;
76 return stream_->WriteFrames(frames, callback);
77 }
78
Close()79 void WebSocketDeflateStream::Close() { stream_->Close(); }
80
GetSubProtocol() const81 std::string WebSocketDeflateStream::GetSubProtocol() const {
82 return stream_->GetSubProtocol();
83 }
84
GetExtensions() const85 std::string WebSocketDeflateStream::GetExtensions() const {
86 return stream_->GetExtensions();
87 }
88
OnReadComplete(ScopedVector<WebSocketFrame> * frames,const CompletionCallback & callback,int result)89 void WebSocketDeflateStream::OnReadComplete(
90 ScopedVector<WebSocketFrame>* frames,
91 const CompletionCallback& callback,
92 int result) {
93 if (result != OK) {
94 frames->clear();
95 callback.Run(result);
96 return;
97 }
98
99 int r = InflateAndReadIfNecessary(frames, callback);
100 if (r != ERR_IO_PENDING)
101 callback.Run(r);
102 }
103
Deflate(ScopedVector<WebSocketFrame> * frames)104 int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) {
105 ScopedVector<WebSocketFrame> frames_to_write;
106 // Store frames of the currently processed message if writing_state_ equals to
107 // WRITING_POSSIBLY_COMPRESSED_MESSAGE.
108 ScopedVector<WebSocketFrame> frames_of_message;
109 for (size_t i = 0; i < frames->size(); ++i) {
110 DCHECK(!(*frames)[i]->header.reserved1);
111 if (!WebSocketFrameHeader::IsKnownDataOpCode((*frames)[i]->header.opcode)) {
112 frames_to_write.push_back((*frames)[i]);
113 (*frames)[i] = NULL;
114 continue;
115 }
116 if (writing_state_ == NOT_WRITING)
117 OnMessageStart(*frames, i);
118
119 scoped_ptr<WebSocketFrame> frame((*frames)[i]);
120 (*frames)[i] = NULL;
121 predictor_->RecordInputDataFrame(frame.get());
122
123 if (writing_state_ == WRITING_UNCOMPRESSED_MESSAGE) {
124 if (frame->header.final)
125 writing_state_ = NOT_WRITING;
126 predictor_->RecordWrittenDataFrame(frame.get());
127 frames_to_write.push_back(frame.release());
128 current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
129 } else {
130 if (frame->data && !deflater_.AddBytes(frame->data->data(),
131 frame->header.payload_length)) {
132 DVLOG(1) << "WebSocket protocol error. "
133 << "deflater_.AddBytes() returns an error.";
134 return ERR_WS_PROTOCOL_ERROR;
135 }
136 if (frame->header.final && !deflater_.Finish()) {
137 DVLOG(1) << "WebSocket protocol error. "
138 << "deflater_.Finish() returns an error.";
139 return ERR_WS_PROTOCOL_ERROR;
140 }
141
142 if (writing_state_ == WRITING_COMPRESSED_MESSAGE) {
143 if (deflater_.CurrentOutputSize() >= kChunkSize ||
144 frame->header.final) {
145 int result = AppendCompressedFrame(frame->header, &frames_to_write);
146 if (result != OK)
147 return result;
148 }
149 if (frame->header.final)
150 writing_state_ = NOT_WRITING;
151 } else {
152 DCHECK_EQ(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_);
153 bool final = frame->header.final;
154 frames_of_message.push_back(frame.release());
155 if (final) {
156 int result = AppendPossiblyCompressedMessage(&frames_of_message,
157 &frames_to_write);
158 if (result != OK)
159 return result;
160 frames_of_message.clear();
161 writing_state_ = NOT_WRITING;
162 }
163 }
164 }
165 }
166 DCHECK_NE(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_);
167 frames->swap(frames_to_write);
168 return OK;
169 }
170
OnMessageStart(const ScopedVector<WebSocketFrame> & frames,size_t index)171 void WebSocketDeflateStream::OnMessageStart(
172 const ScopedVector<WebSocketFrame>& frames, size_t index) {
173 WebSocketFrame* frame = frames[index];
174 current_writing_opcode_ = frame->header.opcode;
175 DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText ||
176 current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary);
177 WebSocketDeflatePredictor::Result prediction =
178 predictor_->Predict(frames, index);
179
180 switch (prediction) {
181 case WebSocketDeflatePredictor::DEFLATE:
182 writing_state_ = WRITING_COMPRESSED_MESSAGE;
183 return;
184 case WebSocketDeflatePredictor::DO_NOT_DEFLATE:
185 writing_state_ = WRITING_UNCOMPRESSED_MESSAGE;
186 return;
187 case WebSocketDeflatePredictor::TRY_DEFLATE:
188 writing_state_ = WRITING_POSSIBLY_COMPRESSED_MESSAGE;
189 return;
190 }
191 NOTREACHED();
192 }
193
AppendCompressedFrame(const WebSocketFrameHeader & header,ScopedVector<WebSocketFrame> * frames_to_write)194 int WebSocketDeflateStream::AppendCompressedFrame(
195 const WebSocketFrameHeader& header,
196 ScopedVector<WebSocketFrame>* frames_to_write) {
197 const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
198 scoped_refptr<IOBufferWithSize> compressed_payload =
199 deflater_.GetOutput(deflater_.CurrentOutputSize());
200 if (!compressed_payload) {
201 DVLOG(1) << "WebSocket protocol error. "
202 << "deflater_.GetOutput() returns an error.";
203 return ERR_WS_PROTOCOL_ERROR;
204 }
205 scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
206 compressed->header.CopyFrom(header);
207 compressed->header.opcode = opcode;
208 compressed->header.final = header.final;
209 compressed->header.reserved1 =
210 (opcode != WebSocketFrameHeader::kOpCodeContinuation);
211 compressed->data = compressed_payload;
212 compressed->header.payload_length = compressed_payload->size();
213
214 current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
215 predictor_->RecordWrittenDataFrame(compressed.get());
216 frames_to_write->push_back(compressed.release());
217 return OK;
218 }
219
AppendPossiblyCompressedMessage(ScopedVector<WebSocketFrame> * frames,ScopedVector<WebSocketFrame> * frames_to_write)220 int WebSocketDeflateStream::AppendPossiblyCompressedMessage(
221 ScopedVector<WebSocketFrame>* frames,
222 ScopedVector<WebSocketFrame>* frames_to_write) {
223 DCHECK(!frames->empty());
224
225 const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
226 scoped_refptr<IOBufferWithSize> compressed_payload =
227 deflater_.GetOutput(deflater_.CurrentOutputSize());
228 if (!compressed_payload) {
229 DVLOG(1) << "WebSocket protocol error. "
230 << "deflater_.GetOutput() returns an error.";
231 return ERR_WS_PROTOCOL_ERROR;
232 }
233
234 uint64 original_payload_length = 0;
235 for (size_t i = 0; i < frames->size(); ++i) {
236 WebSocketFrame* frame = (*frames)[i];
237 // Asserts checking that frames represent one whole data message.
238 DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode));
239 DCHECK_EQ(i == 0,
240 WebSocketFrameHeader::kOpCodeContinuation !=
241 frame->header.opcode);
242 DCHECK_EQ(i == frames->size() - 1, frame->header.final);
243 original_payload_length += frame->header.payload_length;
244 }
245 if (original_payload_length <=
246 static_cast<uint64>(compressed_payload->size())) {
247 // Compression is not effective. Use the original frames.
248 for (size_t i = 0; i < frames->size(); ++i) {
249 WebSocketFrame* frame = (*frames)[i];
250 frames_to_write->push_back(frame);
251 predictor_->RecordWrittenDataFrame(frame);
252 (*frames)[i] = NULL;
253 }
254 frames->weak_clear();
255 return OK;
256 }
257 scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
258 compressed->header.CopyFrom((*frames)[0]->header);
259 compressed->header.opcode = opcode;
260 compressed->header.final = true;
261 compressed->header.reserved1 = true;
262 compressed->data = compressed_payload;
263 compressed->header.payload_length = compressed_payload->size();
264
265 predictor_->RecordWrittenDataFrame(compressed.get());
266 frames_to_write->push_back(compressed.release());
267 return OK;
268 }
269
Inflate(ScopedVector<WebSocketFrame> * frames)270 int WebSocketDeflateStream::Inflate(ScopedVector<WebSocketFrame>* frames) {
271 ScopedVector<WebSocketFrame> frames_to_output;
272 ScopedVector<WebSocketFrame> frames_passed;
273 frames->swap(frames_passed);
274 for (size_t i = 0; i < frames_passed.size(); ++i) {
275 scoped_ptr<WebSocketFrame> frame(frames_passed[i]);
276 frames_passed[i] = NULL;
277 if (!WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode)) {
278 frames_to_output.push_back(frame.release());
279 continue;
280 }
281
282 if (reading_state_ == NOT_READING) {
283 if (frame->header.reserved1)
284 reading_state_ = READING_COMPRESSED_MESSAGE;
285 else
286 reading_state_ = READING_UNCOMPRESSED_MESSAGE;
287 current_reading_opcode_ = frame->header.opcode;
288 } else {
289 if (frame->header.reserved1) {
290 DVLOG(1) << "WebSocket protocol error. "
291 << "Receiving a non-first frame with RSV1 flag set.";
292 return ERR_WS_PROTOCOL_ERROR;
293 }
294 }
295
296 if (reading_state_ == READING_UNCOMPRESSED_MESSAGE) {
297 if (frame->header.final)
298 reading_state_ = NOT_READING;
299 current_reading_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
300 frames_to_output.push_back(frame.release());
301 } else {
302 DCHECK_EQ(reading_state_, READING_COMPRESSED_MESSAGE);
303 if (frame->data && !inflater_.AddBytes(frame->data->data(),
304 frame->header.payload_length)) {
305 DVLOG(1) << "WebSocket protocol error. "
306 << "inflater_.AddBytes() returns an error.";
307 return ERR_WS_PROTOCOL_ERROR;
308 }
309 if (frame->header.final) {
310 if (!inflater_.Finish()) {
311 DVLOG(1) << "WebSocket protocol error. "
312 << "inflater_.Finish() returns an error.";
313 return ERR_WS_PROTOCOL_ERROR;
314 }
315 }
316 // TODO(yhirano): Many frames can be generated by the inflater and
317 // memory consumption can grow.
318 // We could avoid it, but avoiding it makes this class much more
319 // complicated.
320 while (inflater_.CurrentOutputSize() >= kChunkSize ||
321 frame->header.final) {
322 size_t size = std::min(kChunkSize, inflater_.CurrentOutputSize());
323 scoped_ptr<WebSocketFrame> inflated(
324 new WebSocketFrame(WebSocketFrameHeader::kOpCodeText));
325 scoped_refptr<IOBufferWithSize> data = inflater_.GetOutput(size);
326 bool is_final = !inflater_.CurrentOutputSize();
327 // |is_final| can't be true if |frame->header.final| is false.
328 DCHECK(!(is_final && !frame->header.final));
329 if (!data) {
330 DVLOG(1) << "WebSocket protocol error. "
331 << "inflater_.GetOutput() returns an error.";
332 return ERR_WS_PROTOCOL_ERROR;
333 }
334 inflated->header.CopyFrom(frame->header);
335 inflated->header.opcode = current_reading_opcode_;
336 inflated->header.final = is_final;
337 inflated->header.reserved1 = false;
338 inflated->data = data;
339 inflated->header.payload_length = data->size();
340
341 frames_to_output.push_back(inflated.release());
342 current_reading_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
343 if (is_final)
344 break;
345 }
346 if (frame->header.final)
347 reading_state_ = NOT_READING;
348 }
349 }
350 frames->swap(frames_to_output);
351 return frames->empty() ? ERR_IO_PENDING : OK;
352 }
353
InflateAndReadIfNecessary(ScopedVector<WebSocketFrame> * frames,const CompletionCallback & callback)354 int WebSocketDeflateStream::InflateAndReadIfNecessary(
355 ScopedVector<WebSocketFrame>* frames,
356 const CompletionCallback& callback) {
357 int result = Inflate(frames);
358 while (result == ERR_IO_PENDING) {
359 DCHECK(frames->empty());
360 result = stream_->ReadFrames(frames, callback);
361 if (result < 0)
362 break;
363 DCHECK_EQ(OK, result);
364 DCHECK(!frames->empty());
365 result = Inflate(frames);
366 }
367 if (result < 0)
368 frames->clear();
369 return result;
370 }
371
372 } // namespace net
373