1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/scoped_ptr.h"
6 #include "base/stats_counters.h"
7
8 #include "flip_framer.h" // cross-google3 directory naming.
9 #include "flip_frame_builder.h"
10 #include "flip_bitmasks.h"
11
12 #if defined(USE_SYSTEM_ZLIB)
13 #include <zlib.h>
14 #else
15 #include "third_party/zlib/zlib.h"
16 #endif
17
18 namespace flip {
19
20 // The initial size of the control frame buffer; this is used internally
21 // as we parse through control frames.
22 static const size_t kControlFrameBufferInitialSize = 32 * 1024;
23 // The maximum size of the control frame buffer that we support.
24 // TODO(mbelshe): We should make this stream-based so there are no limits.
25 static const size_t kControlFrameBufferMaxSize = 64 * 1024;
26
27 // By default is compression on or off.
28 bool FlipFramer::compression_default_ = true;
29
30 #ifdef DEBUG_FLIP_STATE_CHANGES
31 #define CHANGE_STATE(newstate) \
32 { \
33 do { \
34 LOG(INFO) << "Changing state from: " \
35 << StateToString(state_) \
36 << " to " << StateToString(newstate) << "\n"; \
37 state_ = newstate; \
38 } while (false); \
39 }
40 #else
41 #define CHANGE_STATE(newstate) (state_ = newstate)
42 #endif
43
FlipFramer()44 FlipFramer::FlipFramer()
45 : state_(FLIP_RESET),
46 error_code_(FLIP_NO_ERROR),
47 remaining_payload_(0),
48 remaining_control_payload_(0),
49 current_frame_buffer_(NULL),
50 current_frame_len_(0),
51 current_frame_capacity_(0),
52 enable_compression_(compression_default_),
53 visitor_(NULL) {
54 }
55
~FlipFramer()56 FlipFramer::~FlipFramer() {
57 if (compressor_.get()) {
58 deflateEnd(compressor_.get());
59 }
60 if (decompressor_.get()) {
61 inflateEnd(decompressor_.get());
62 }
63 delete [] current_frame_buffer_;
64 }
65
Reset()66 void FlipFramer::Reset() {
67 state_ = FLIP_RESET;
68 error_code_ = FLIP_NO_ERROR;
69 remaining_payload_ = 0;
70 remaining_control_payload_ = 0;
71 current_frame_len_ = 0;
72 if (current_frame_capacity_ != kControlFrameBufferInitialSize) {
73 delete [] current_frame_buffer_;
74 current_frame_buffer_ = 0;
75 current_frame_capacity_ = 0;
76 ExpandControlFrameBuffer(kControlFrameBufferInitialSize);
77 }
78 }
79
StateToString(int state)80 const char* FlipFramer::StateToString(int state) {
81 switch (state) {
82 case FLIP_ERROR:
83 return "ERROR";
84 case FLIP_DONE:
85 return "DONE";
86 case FLIP_AUTO_RESET:
87 return "AUTO_RESET";
88 case FLIP_RESET:
89 return "RESET";
90 case FLIP_READING_COMMON_HEADER:
91 return "READING_COMMON_HEADER";
92 case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
93 return "INTERPRET_CONTROL_FRAME_COMMON_HEADER";
94 case FLIP_CONTROL_FRAME_PAYLOAD:
95 return "CONTROL_FRAME_PAYLOAD";
96 case FLIP_IGNORE_REMAINING_PAYLOAD:
97 return "IGNORE_REMAINING_PAYLOAD";
98 case FLIP_FORWARD_STREAM_FRAME:
99 return "FORWARD_STREAM_FRAME";
100 }
101 return "UNKNOWN_STATE";
102 }
103
BytesSafeToRead() const104 size_t FlipFramer::BytesSafeToRead() const {
105 switch (state_) {
106 case FLIP_ERROR:
107 case FLIP_DONE:
108 case FLIP_AUTO_RESET:
109 case FLIP_RESET:
110 return 0;
111 case FLIP_READING_COMMON_HEADER:
112 DCHECK(current_frame_len_ < FlipFrame::size());
113 return FlipFrame::size() - current_frame_len_;
114 case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
115 return 0;
116 case FLIP_CONTROL_FRAME_PAYLOAD:
117 case FLIP_IGNORE_REMAINING_PAYLOAD:
118 case FLIP_FORWARD_STREAM_FRAME:
119 return remaining_payload_;
120 }
121 // We should never get to here.
122 return 0;
123 }
124
set_error(FlipError error)125 void FlipFramer::set_error(FlipError error) {
126 DCHECK(visitor_);
127 error_code_ = error;
128 CHANGE_STATE(FLIP_ERROR);
129 visitor_->OnError(this);
130 }
131
ErrorCodeToString(int error_code)132 const char* FlipFramer::ErrorCodeToString(int error_code) {
133 switch (error_code) {
134 case FLIP_NO_ERROR:
135 return "NO_ERROR";
136 case FLIP_UNKNOWN_CONTROL_TYPE:
137 return "UNKNOWN_CONTROL_TYPE";
138 case FLIP_INVALID_CONTROL_FRAME:
139 return "INVALID_CONTROL_FRAME";
140 case FLIP_CONTROL_PAYLOAD_TOO_LARGE:
141 return "CONTROL_PAYLOAD_TOO_LARGE";
142 case FLIP_ZLIB_INIT_FAILURE:
143 return "ZLIB_INIT_FAILURE";
144 case FLIP_UNSUPPORTED_VERSION:
145 return "UNSUPPORTED_VERSION";
146 case FLIP_DECOMPRESS_FAILURE:
147 return "DECOMPRESS_FAILURE";
148 }
149 return "UNKNOWN_STATE";
150 }
151
ProcessInput(const char * data,size_t len)152 size_t FlipFramer::ProcessInput(const char* data, size_t len) {
153 DCHECK(visitor_);
154 DCHECK(data);
155
156 size_t original_len = len;
157 while (len != 0) {
158 switch (state_) {
159 case FLIP_ERROR:
160 case FLIP_DONE:
161 goto bottom;
162
163 case FLIP_AUTO_RESET:
164 case FLIP_RESET:
165 Reset();
166 CHANGE_STATE(FLIP_READING_COMMON_HEADER);
167 continue;
168
169 case FLIP_READING_COMMON_HEADER: {
170 int bytes_read = ProcessCommonHeader(data, len);
171 len -= bytes_read;
172 data += bytes_read;
173 continue;
174 }
175
176 // Arguably, this case is not necessary, as no bytes are consumed here.
177 // I felt it was a nice partitioning, however (which probably indicates
178 // that it should be refactored into its own function!)
179 case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
180 ProcessControlFrameHeader();
181 continue;
182
183 case FLIP_CONTROL_FRAME_PAYLOAD: {
184 int bytes_read = ProcessControlFramePayload(data, len);
185 len -= bytes_read;
186 data += bytes_read;
187 }
188 // intentional fallthrough
189 case FLIP_IGNORE_REMAINING_PAYLOAD:
190 // control frame has too-large payload
191 // intentional fallthrough
192 case FLIP_FORWARD_STREAM_FRAME: {
193 int bytes_read = ProcessDataFramePayload(data, len);
194 len -= bytes_read;
195 data += bytes_read;
196 continue;
197 }
198 default:
199 break;
200 }
201 }
202 bottom:
203 return original_len - len;
204 }
205
ProcessCommonHeader(const char * data,size_t len)206 size_t FlipFramer::ProcessCommonHeader(const char* data, size_t len) {
207 // This should only be called when we're in the FLIP_READING_COMMON_HEADER
208 // state.
209 DCHECK(state_ == FLIP_READING_COMMON_HEADER);
210
211 int original_len = len;
212 FlipFrame current_frame(current_frame_buffer_, false);
213
214 do {
215 if (current_frame_len_ < FlipFrame::size()) {
216 size_t bytes_desired = FlipFrame::size() - current_frame_len_;
217 size_t bytes_to_append = std::min(bytes_desired, len);
218 char* header_buffer = current_frame_buffer_;
219 memcpy(&header_buffer[current_frame_len_], data, bytes_to_append);
220 current_frame_len_ += bytes_to_append;
221 data += bytes_to_append;
222 len -= bytes_to_append;
223 // Check for an empty data frame.
224 if (current_frame_len_ == FlipFrame::size() &&
225 !current_frame.is_control_frame() &&
226 current_frame.length() == 0) {
227 if (current_frame.flags() & CONTROL_FLAG_FIN) {
228 FlipDataFrame data_frame(current_frame_buffer_, false);
229 visitor_->OnStreamFrameData(data_frame.stream_id(), NULL, 0);
230 }
231 CHANGE_STATE(FLIP_RESET);
232 }
233 break;
234 }
235 remaining_payload_ = current_frame.length();
236
237 // This is just a sanity check for help debugging early frame errors.
238 if (remaining_payload_ > 1000000u) {
239 LOG(ERROR) <<
240 "Unexpectedly large frame. Flip session is likely corrupt.";
241 }
242
243 // if we're here, then we have the common header all received.
244 if (!current_frame.is_control_frame())
245 CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME);
246 else
247 CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER);
248 } while (false);
249
250 return original_len - len;
251 }
252
ProcessControlFrameHeader()253 void FlipFramer::ProcessControlFrameHeader() {
254 DCHECK_EQ(FLIP_NO_ERROR, error_code_);
255 DCHECK_LE(FlipFrame::size(), current_frame_len_);
256 FlipControlFrame current_control_frame(current_frame_buffer_, false);
257 // Do some sanity checking on the control frame sizes.
258 switch (current_control_frame.type()) {
259 case SYN_STREAM:
260 if (current_control_frame.length() <
261 FlipSynStreamControlFrame::size() - FlipControlFrame::size())
262 set_error(FLIP_INVALID_CONTROL_FRAME);
263 break;
264 case SYN_REPLY:
265 if (current_control_frame.length() <
266 FlipSynReplyControlFrame::size() - FlipControlFrame::size())
267 set_error(FLIP_INVALID_CONTROL_FRAME);
268 break;
269 case FIN_STREAM:
270 if (current_control_frame.length() !=
271 FlipFinStreamControlFrame::size() - FlipFrame::size())
272 set_error(FLIP_INVALID_CONTROL_FRAME);
273 break;
274 case NOOP:
275 // NOOP. Swallow it.
276 CHANGE_STATE(FLIP_AUTO_RESET);
277 return;
278 default:
279 set_error(FLIP_UNKNOWN_CONTROL_TYPE);
280 break;
281 }
282
283 // We only support version 1 of this protocol.
284 if (current_control_frame.version() != kFlipProtocolVersion)
285 set_error(FLIP_UNSUPPORTED_VERSION);
286
287 remaining_control_payload_ = current_control_frame.length();
288 if (remaining_control_payload_ > kControlFrameBufferMaxSize)
289 set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE);
290
291 if (error_code_)
292 return;
293
294 ExpandControlFrameBuffer(remaining_control_payload_);
295 CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD);
296 }
297
ProcessControlFramePayload(const char * data,size_t len)298 size_t FlipFramer::ProcessControlFramePayload(const char* data, size_t len) {
299 size_t original_len = len;
300 do {
301 if (remaining_control_payload_) {
302 size_t amount_to_consume = std::min(remaining_control_payload_, len);
303 memcpy(¤t_frame_buffer_[current_frame_len_], data,
304 amount_to_consume);
305 current_frame_len_ += amount_to_consume;
306 data += amount_to_consume;
307 len -= amount_to_consume;
308 remaining_control_payload_ -= amount_to_consume;
309 remaining_payload_ -= amount_to_consume;
310 if (remaining_control_payload_)
311 break;
312 }
313 FlipControlFrame control_frame(current_frame_buffer_, false);
314 visitor_->OnControl(&control_frame);
315
316 // If this is a FIN, tell the caller.
317 if (control_frame.type() == SYN_REPLY &&
318 control_frame.flags() & CONTROL_FLAG_FIN)
319 visitor_->OnStreamFrameData(control_frame.stream_id(), NULL, 0);
320
321 CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD);
322 } while (false);
323 return original_len - len;
324 }
325
ProcessDataFramePayload(const char * data,size_t len)326 size_t FlipFramer::ProcessDataFramePayload(const char* data, size_t len) {
327 size_t original_len = len;
328
329 FlipDataFrame current_data_frame(current_frame_buffer_, false);
330 if (remaining_payload_) {
331 size_t amount_to_forward = std::min(remaining_payload_, len);
332 if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) {
333 if (current_data_frame.flags() & DATA_FLAG_COMPRESSED) {
334 // TODO(mbelshe): Assert that the decompressor is init'ed.
335 if (!InitializeDecompressor())
336 return NULL;
337
338 size_t decompressed_max_size = amount_to_forward * 100;
339 scoped_ptr<char> decompressed(new char[decompressed_max_size]);
340 decompressor_->next_in = reinterpret_cast<Bytef*>(
341 const_cast<char*>(data));
342 decompressor_->avail_in = amount_to_forward;
343 decompressor_->next_out =
344 reinterpret_cast<Bytef*>(decompressed.get());
345 decompressor_->avail_out = decompressed_max_size;
346
347 int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
348 if (rv != Z_OK) {
349 set_error(FLIP_DECOMPRESS_FAILURE);
350 return 0;
351 }
352 size_t decompressed_size = decompressed_max_size -
353 decompressor_->avail_out;
354 // Only inform the visitor if there is data.
355 if (decompressed_size)
356 visitor_->OnStreamFrameData(current_data_frame.stream_id(),
357 decompressed.get(),
358 decompressed_size);
359 amount_to_forward -= decompressor_->avail_in;
360 } else {
361 // The data frame was not compressed.
362 // Only inform the visitor if there is data.
363 if (amount_to_forward)
364 visitor_->OnStreamFrameData(current_data_frame.stream_id(),
365 data, amount_to_forward);
366 }
367 }
368 data += amount_to_forward;
369 len -= amount_to_forward;
370 remaining_payload_ -= amount_to_forward;
371
372 // If the FIN flag is set, and there is no more data in this data
373 // frame, inform the visitor of EOF via a 0-length data frame.
374 if (!remaining_payload_ &&
375 current_data_frame.flags() & DATA_FLAG_FIN)
376 visitor_->OnStreamFrameData(current_data_frame.stream_id(), NULL,
377 0);
378 } else {
379 CHANGE_STATE(FLIP_AUTO_RESET);
380 }
381 return original_len - len;
382 }
383
ExpandControlFrameBuffer(size_t size)384 void FlipFramer::ExpandControlFrameBuffer(size_t size) {
385 DCHECK(size < kControlFrameBufferMaxSize);
386 if (size < current_frame_capacity_)
387 return;
388
389 int alloc_size = size + FlipFrame::size();
390 char* new_buffer = new char[alloc_size];
391 memcpy(new_buffer, current_frame_buffer_, current_frame_len_);
392 current_frame_capacity_ = alloc_size;
393 current_frame_buffer_ = new_buffer;
394 }
395
ParseHeaderBlock(const FlipFrame * frame,FlipHeaderBlock * block)396 bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame,
397 FlipHeaderBlock* block) {
398 FlipControlFrame control_frame(frame->data(), false);
399 uint32 type = control_frame.type();
400 if (type != SYN_STREAM && type != SYN_REPLY)
401 return false;
402
403 // Find the header data within the control frame.
404 scoped_ptr<FlipFrame> decompressed_frame(DecompressFrame(frame));
405 if (!decompressed_frame.get())
406 return false;
407 FlipSynStreamControlFrame syn_frame(decompressed_frame->data(), false);
408 const char *header_data = syn_frame.header_block();
409 int header_length = syn_frame.header_block_len();
410
411 FlipFrameBuilder builder(header_data, header_length);
412 void* iter = NULL;
413 uint16 num_headers;
414 if (builder.ReadUInt16(&iter, &num_headers)) {
415 for (int index = 0; index < num_headers; ++index) {
416 std::string name;
417 std::string value;
418 if (!builder.ReadString(&iter, &name))
419 break;
420 if (!builder.ReadString(&iter, &value))
421 break;
422 if (block->find(name) == block->end()) {
423 (*block)[name] = value;
424 } else {
425 return false;
426 }
427 }
428 return true;
429 }
430 return false;
431 }
432
CreateSynStream(FlipStreamId stream_id,int priority,FlipControlFlags flags,bool compressed,FlipHeaderBlock * headers)433 FlipSynStreamControlFrame* FlipFramer::CreateSynStream(
434 FlipStreamId stream_id, int priority, FlipControlFlags flags,
435 bool compressed, FlipHeaderBlock* headers) {
436 FlipFrameBuilder frame;
437
438 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
439 frame.WriteUInt16(SYN_STREAM);
440 frame.WriteUInt32(0); // Placeholder for the length and flags
441 frame.WriteUInt32(stream_id);
442 frame.WriteUInt16(ntohs(priority) << 6); // Priority.
443
444 frame.WriteUInt16(headers->size()); // Number of headers.
445 FlipHeaderBlock::iterator it;
446 for (it = headers->begin(); it != headers->end(); ++it) {
447 frame.WriteString(it->first);
448 frame.WriteString(it->second);
449 }
450
451 // Write the length and flags.
452 size_t length = frame.length() - FlipFrame::size();
453 DCHECK(length < static_cast<size_t>(kLengthMask));
454 FlagsAndLength flags_length;
455 flags_length.length_ = htonl(static_cast<uint32>(length));
456 flags_length.flags_[0] = flags;
457 frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length));
458
459 scoped_ptr<FlipFrame> syn_frame(frame.take());
460 if (compressed) {
461 return reinterpret_cast<FlipSynStreamControlFrame*>(
462 CompressFrame(syn_frame.get()));
463 }
464 return reinterpret_cast<FlipSynStreamControlFrame*>(syn_frame.release());
465 }
466
467 /* static */
CreateFinStream(FlipStreamId stream_id,int status)468 FlipFinStreamControlFrame* FlipFramer::CreateFinStream(FlipStreamId stream_id,
469 int status) {
470 FlipFrameBuilder frame;
471 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
472 frame.WriteUInt16(FIN_STREAM);
473 frame.WriteUInt32(8);
474 frame.WriteUInt32(stream_id);
475 frame.WriteUInt32(status);
476 return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take());
477 }
478
CreateSynReply(FlipStreamId stream_id,FlipControlFlags flags,bool compressed,FlipHeaderBlock * headers)479 FlipSynReplyControlFrame* FlipFramer::CreateSynReply(FlipStreamId stream_id,
480 FlipControlFlags flags, bool compressed, FlipHeaderBlock* headers) {
481
482 FlipFrameBuilder frame;
483
484 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
485 frame.WriteUInt16(SYN_REPLY);
486 frame.WriteUInt32(0); // Placeholder for the length and flags.
487 frame.WriteUInt32(stream_id);
488 frame.WriteUInt16(0); // Unused
489
490 frame.WriteUInt16(headers->size()); // Number of headers.
491 FlipHeaderBlock::iterator it;
492 for (it = headers->begin(); it != headers->end(); ++it) {
493 // TODO(mbelshe): Headers need to be sorted.
494 frame.WriteString(it->first);
495 frame.WriteString(it->second);
496 }
497
498 // Write the length
499 size_t length = frame.length() - FlipFrame::size();
500 DCHECK(length < static_cast<size_t>(kLengthMask));
501 FlagsAndLength flags_length;
502 flags_length.length_ = htonl(static_cast<uint32>(length));
503 flags_length.flags_[0] = flags;
504 frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length));
505
506 scoped_ptr<FlipFrame> reply_frame(frame.take());
507 if (compressed) {
508 return reinterpret_cast<FlipSynReplyControlFrame*>(
509 CompressFrame(reply_frame.get()));
510 }
511 return reinterpret_cast<FlipSynReplyControlFrame*>(reply_frame.release());
512 }
513
CreateDataFrame(FlipStreamId stream_id,const char * data,uint32 len,FlipDataFlags flags)514 FlipDataFrame* FlipFramer::CreateDataFrame(FlipStreamId stream_id,
515 const char* data,
516 uint32 len, FlipDataFlags flags) {
517 FlipFrameBuilder frame;
518
519 frame.WriteUInt32(stream_id);
520
521 DCHECK(len < static_cast<size_t>(kLengthMask));
522 FlagsAndLength flags_length;
523 flags_length.length_ = htonl(len);
524 flags_length.flags_[0] = flags;
525 frame.WriteBytes(&flags_length, sizeof(flags_length));
526
527 frame.WriteBytes(data, len);
528 scoped_ptr<FlipFrame> data_frame(frame.take());
529 if (flags & DATA_FLAG_COMPRESSED)
530 return reinterpret_cast<FlipDataFrame*>(CompressFrame(data_frame.get()));
531 return reinterpret_cast<FlipDataFrame*>(data_frame.release());
532 }
533
534 /* static */
CreateNopFrame()535 FlipControlFrame* FlipFramer::CreateNopFrame() {
536 FlipFrameBuilder frame;
537 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
538 frame.WriteUInt16(NOOP);
539 frame.WriteUInt32(0);
540 return reinterpret_cast<FlipControlFrame*>(frame.take());
541 }
542
543 static const int kCompressorLevel = Z_DEFAULT_COMPRESSION;
544 // This is just a hacked dictionary to use for shrinking HTTP-like headers.
545 // TODO(mbelshe): Use a scientific methodology for computing the dictionary.
546 static const char dictionary[] =
547 "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-"
548 "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi"
549 "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser"
550 "-agent10010120020120220320420520630030130230330430530630740040140240340440"
551 "5406407408409410411412413414415416417500501502503504505accept-rangesageeta"
552 "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic"
553 "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran"
554 "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati"
555 "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo"
556 "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe"
557 "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic"
558 "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1"
559 ".1statusversionurl";
560 static uLong dictionary_id = 0;
561
InitializeCompressor()562 bool FlipFramer::InitializeCompressor() {
563 if (compressor_.get())
564 return true; // Already initialized.
565
566 compressor_.reset(new z_stream);
567 memset(compressor_.get(), 0, sizeof(z_stream));
568
569 int success = deflateInit(compressor_.get(), kCompressorLevel);
570 if (success == Z_OK)
571 success = deflateSetDictionary(compressor_.get(),
572 reinterpret_cast<const Bytef*>(dictionary),
573 sizeof(dictionary));
574 if (success != Z_OK)
575 compressor_.reset(NULL);
576 return success == Z_OK;
577 }
578
InitializeDecompressor()579 bool FlipFramer::InitializeDecompressor() {
580 if (decompressor_.get())
581 return true; // Already initialized.
582
583 decompressor_.reset(new z_stream);
584 memset(decompressor_.get(), 0, sizeof(z_stream));
585
586 // Compute the id of our dictionary so that we know we're using the
587 // right one when asked for it.
588 if (dictionary_id == 0) {
589 dictionary_id = adler32(0L, Z_NULL, 0);
590 dictionary_id = adler32(dictionary_id,
591 reinterpret_cast<const Bytef*>(dictionary),
592 sizeof(dictionary));
593 }
594
595 int success = inflateInit(decompressor_.get());
596 if (success != Z_OK)
597 decompressor_.reset(NULL);
598 return success == Z_OK;
599 }
600
GetFrameBoundaries(const FlipFrame * frame,int * payload_length,int * header_length,const char ** payload) const601 bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame,
602 int* payload_length,
603 int* header_length,
604 const char** payload) const {
605 if (frame->is_control_frame()) {
606 const FlipControlFrame* control_frame =
607 reinterpret_cast<const FlipControlFrame*>(frame);
608 switch (control_frame->type()) {
609 case SYN_STREAM:
610 case SYN_REPLY:
611 {
612 const FlipSynStreamControlFrame *syn_frame =
613 reinterpret_cast<const FlipSynStreamControlFrame*>(frame);
614 *payload_length = syn_frame->header_block_len();
615 *header_length = syn_frame->size();
616 *payload = frame->data() + *header_length;
617 }
618 break;
619 default:
620 // TODO(mbelshe): set an error?
621 return false; // We can't compress this frame!
622 }
623 } else {
624 *header_length = FlipFrame::size();
625 *payload_length = frame->length();
626 *payload = frame->data() + FlipFrame::size();
627 }
628 DCHECK(static_cast<size_t>(*header_length) <=
629 FlipFrame::size() + *payload_length);
630 return true;
631 }
632
633
CompressFrame(const FlipFrame * frame)634 FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) {
635 int payload_length;
636 int header_length;
637 const char* payload;
638
639 static StatsCounter pre_compress_bytes("flip.PreCompressSize");
640 static StatsCounter post_compress_bytes("flip.PostCompressSize");
641
642 if (!enable_compression_)
643 return DuplicateFrame(frame);
644
645 if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
646 return NULL;
647
648 if (!InitializeCompressor())
649 return NULL;
650
651 // TODO(mbelshe): Should we have a zlib header like what http servers do?
652
653 // Create an output frame.
654 int compressed_max_size = deflateBound(compressor_.get(), payload_length);
655 int new_frame_size = header_length + compressed_max_size;
656 FlipFrame* new_frame = new FlipFrame(new_frame_size);
657 memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size());
658
659 compressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload));
660 compressor_->avail_in = payload_length;
661 compressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) +
662 header_length;
663 compressor_->avail_out = compressed_max_size;
664
665 // Data packets have a 'compressed flag
666 if (!new_frame->is_control_frame()) {
667 FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
668 data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED);
669 }
670
671 int rv = deflate(compressor_.get(), Z_SYNC_FLUSH);
672 if (rv != Z_OK) { // How can we know that it compressed everything?
673 // This shouldn't happen, right?
674 delete new_frame;
675 return NULL;
676 }
677
678 int compressed_size = compressed_max_size - compressor_->avail_out;
679 new_frame->set_length(header_length + compressed_size - FlipFrame::size());
680
681 pre_compress_bytes.Add(payload_length);
682 post_compress_bytes.Add(new_frame->length());
683
684 return new_frame;
685 }
686
DecompressFrame(const FlipFrame * frame)687 FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) {
688 int payload_length;
689 int header_length;
690 const char* payload;
691
692 static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize");
693 static StatsCounter post_decompress_bytes("flip.PostDeCompressSize");
694
695 if (!enable_compression_)
696 return DuplicateFrame(frame);
697
698 if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
699 return NULL;
700
701 if (!frame->is_control_frame()) {
702 const FlipDataFrame* data_frame =
703 reinterpret_cast<const FlipDataFrame*>(frame);
704 if ((data_frame->flags() & DATA_FLAG_COMPRESSED) == 0)
705 return DuplicateFrame(frame);
706 }
707
708 if (!InitializeDecompressor())
709 return NULL;
710
711 // TODO(mbelshe): Should we have a zlib header like what http servers do?
712
713 // Create an output frame. Assume it does not need to be longer than
714 // the input data.
715 int decompressed_max_size = kControlFrameBufferInitialSize;
716 int new_frame_size = header_length + decompressed_max_size;
717 FlipFrame* new_frame = new FlipFrame(new_frame_size);
718 memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size());
719
720 decompressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload));
721 decompressor_->avail_in = payload_length;
722 decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) +
723 header_length;
724 decompressor_->avail_out = decompressed_max_size;
725
726 int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
727 if (rv == Z_NEED_DICT) {
728 // Need to try again with the right dictionary.
729 if (decompressor_->adler == dictionary_id) {
730 rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary,
731 sizeof(dictionary));
732 if (rv == Z_OK)
733 rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
734 }
735 }
736 if (rv != Z_OK) { // How can we know that it decompressed everything?
737 delete new_frame;
738 return NULL;
739 }
740
741 // Unset the compressed flag for data frames.
742 if (!new_frame->is_control_frame()) {
743 FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
744 data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED);
745 }
746
747 int decompressed_size = decompressed_max_size - decompressor_->avail_out;
748 new_frame->set_length(header_length + decompressed_size - FlipFrame::size());
749
750 pre_decompress_bytes.Add(frame->length());
751 post_decompress_bytes.Add(new_frame->length());
752
753 return new_frame;
754 }
755
DuplicateFrame(const FlipFrame * frame)756 FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) {
757 int size = FlipFrame::size() + frame->length();
758 FlipFrame* new_frame = new FlipFrame(size);
759 memcpy(new_frame->data(), frame->data(), size);
760 return new_frame;
761 }
762
set_enable_compression(bool value)763 void FlipFramer::set_enable_compression(bool value) {
764 enable_compression_ = value;
765 }
766
set_enable_compression_default(bool value)767 void FlipFramer::set_enable_compression_default(bool value) {
768 compression_default_ = value;
769 }
770
771 } // namespace flip
772
773