• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/scoped_ptr.h"
6 #include "base/stats_counters.h"
7 
8 #include "flip_framer.h"  // cross-google3 directory naming.
9 #include "flip_frame_builder.h"
10 #include "flip_bitmasks.h"
11 
12 #if defined(USE_SYSTEM_ZLIB)
13 #include <zlib.h>
14 #else
15 #include "third_party/zlib/zlib.h"
16 #endif
17 
18 namespace flip {
19 
20 // The initial size of the control frame buffer; this is used internally
21 // as we parse through control frames.
22 static const size_t kControlFrameBufferInitialSize = 32 * 1024;
23 // The maximum size of the control frame buffer that we support.
24 // TODO(mbelshe): We should make this stream-based so there are no limits.
25 static const size_t kControlFrameBufferMaxSize = 64 * 1024;
26 
27 // By default is compression on or off.
28 bool FlipFramer::compression_default_ = true;
29 
30 #ifdef DEBUG_FLIP_STATE_CHANGES
31 #define CHANGE_STATE(newstate) \
32 { \
33   do { \
34     LOG(INFO) << "Changing state from: " \
35       << StateToString(state_) \
36       << " to " << StateToString(newstate) << "\n"; \
37     state_ = newstate; \
38   } while (false); \
39 }
40 #else
41 #define CHANGE_STATE(newstate) (state_ = newstate)
42 #endif
43 
FlipFramer()44 FlipFramer::FlipFramer()
45     : state_(FLIP_RESET),
46       error_code_(FLIP_NO_ERROR),
47       remaining_payload_(0),
48       remaining_control_payload_(0),
49       current_frame_buffer_(NULL),
50       current_frame_len_(0),
51       current_frame_capacity_(0),
52       enable_compression_(compression_default_),
53       visitor_(NULL) {
54 }
55 
~FlipFramer()56 FlipFramer::~FlipFramer() {
57   if (compressor_.get()) {
58     deflateEnd(compressor_.get());
59   }
60   if (decompressor_.get()) {
61     inflateEnd(decompressor_.get());
62   }
63   delete [] current_frame_buffer_;
64 }
65 
Reset()66 void FlipFramer::Reset() {
67   state_ = FLIP_RESET;
68   error_code_ = FLIP_NO_ERROR;
69   remaining_payload_ = 0;
70   remaining_control_payload_ = 0;
71   current_frame_len_ = 0;
72   if (current_frame_capacity_ != kControlFrameBufferInitialSize) {
73     delete [] current_frame_buffer_;
74     current_frame_buffer_ = 0;
75     current_frame_capacity_ = 0;
76     ExpandControlFrameBuffer(kControlFrameBufferInitialSize);
77   }
78 }
79 
StateToString(int state)80 const char* FlipFramer::StateToString(int state) {
81   switch (state) {
82     case FLIP_ERROR:
83       return "ERROR";
84     case FLIP_DONE:
85       return "DONE";
86     case FLIP_AUTO_RESET:
87       return "AUTO_RESET";
88     case FLIP_RESET:
89       return "RESET";
90     case FLIP_READING_COMMON_HEADER:
91       return "READING_COMMON_HEADER";
92     case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
93       return "INTERPRET_CONTROL_FRAME_COMMON_HEADER";
94     case FLIP_CONTROL_FRAME_PAYLOAD:
95       return "CONTROL_FRAME_PAYLOAD";
96     case FLIP_IGNORE_REMAINING_PAYLOAD:
97       return "IGNORE_REMAINING_PAYLOAD";
98     case FLIP_FORWARD_STREAM_FRAME:
99       return "FORWARD_STREAM_FRAME";
100   }
101   return "UNKNOWN_STATE";
102 }
103 
BytesSafeToRead() const104 size_t FlipFramer::BytesSafeToRead() const {
105   switch (state_) {
106     case FLIP_ERROR:
107     case FLIP_DONE:
108     case FLIP_AUTO_RESET:
109     case FLIP_RESET:
110       return 0;
111     case FLIP_READING_COMMON_HEADER:
112       DCHECK(current_frame_len_ < FlipFrame::size());
113       return FlipFrame::size() - current_frame_len_;
114     case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
115       return 0;
116     case FLIP_CONTROL_FRAME_PAYLOAD:
117     case FLIP_IGNORE_REMAINING_PAYLOAD:
118     case FLIP_FORWARD_STREAM_FRAME:
119       return remaining_payload_;
120   }
121   // We should never get to here.
122   return 0;
123 }
124 
set_error(FlipError error)125 void FlipFramer::set_error(FlipError error) {
126   DCHECK(visitor_);
127   error_code_ = error;
128   CHANGE_STATE(FLIP_ERROR);
129   visitor_->OnError(this);
130 }
131 
ErrorCodeToString(int error_code)132 const char* FlipFramer::ErrorCodeToString(int error_code) {
133   switch (error_code) {
134     case FLIP_NO_ERROR:
135       return "NO_ERROR";
136     case FLIP_UNKNOWN_CONTROL_TYPE:
137       return "UNKNOWN_CONTROL_TYPE";
138     case FLIP_INVALID_CONTROL_FRAME:
139       return "INVALID_CONTROL_FRAME";
140     case FLIP_CONTROL_PAYLOAD_TOO_LARGE:
141       return "CONTROL_PAYLOAD_TOO_LARGE";
142     case FLIP_ZLIB_INIT_FAILURE:
143       return "ZLIB_INIT_FAILURE";
144     case FLIP_UNSUPPORTED_VERSION:
145       return "UNSUPPORTED_VERSION";
146     case FLIP_DECOMPRESS_FAILURE:
147       return "DECOMPRESS_FAILURE";
148   }
149   return "UNKNOWN_STATE";
150 }
151 
ProcessInput(const char * data,size_t len)152 size_t FlipFramer::ProcessInput(const char* data, size_t len) {
153   DCHECK(visitor_);
154   DCHECK(data);
155 
156   size_t original_len = len;
157   while (len != 0) {
158     switch (state_) {
159       case FLIP_ERROR:
160       case FLIP_DONE:
161         goto bottom;
162 
163       case FLIP_AUTO_RESET:
164       case FLIP_RESET:
165         Reset();
166         CHANGE_STATE(FLIP_READING_COMMON_HEADER);
167         continue;
168 
169       case FLIP_READING_COMMON_HEADER: {
170         int bytes_read = ProcessCommonHeader(data, len);
171         len -= bytes_read;
172         data += bytes_read;
173         continue;
174       }
175 
176       // Arguably, this case is not necessary, as no bytes are consumed here.
177       // I felt it was a nice partitioning, however (which probably indicates
178       // that it should be refactored into its own function!)
179       case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
180         ProcessControlFrameHeader();
181         continue;
182 
183       case FLIP_CONTROL_FRAME_PAYLOAD: {
184         int bytes_read = ProcessControlFramePayload(data, len);
185         len -= bytes_read;
186         data += bytes_read;
187       }
188         // intentional fallthrough
189       case FLIP_IGNORE_REMAINING_PAYLOAD:
190         // control frame has too-large payload
191         // intentional fallthrough
192       case FLIP_FORWARD_STREAM_FRAME: {
193         int bytes_read = ProcessDataFramePayload(data, len);
194         len -= bytes_read;
195         data += bytes_read;
196         continue;
197       }
198       default:
199         break;
200     }
201   }
202  bottom:
203   return original_len - len;
204 }
205 
ProcessCommonHeader(const char * data,size_t len)206 size_t FlipFramer::ProcessCommonHeader(const char* data, size_t len) {
207   // This should only be called when we're in the FLIP_READING_COMMON_HEADER
208   // state.
209   DCHECK(state_ == FLIP_READING_COMMON_HEADER);
210 
211   int original_len = len;
212   FlipFrame current_frame(current_frame_buffer_, false);
213 
214   do {
215     if (current_frame_len_ < FlipFrame::size()) {
216       size_t bytes_desired = FlipFrame::size() - current_frame_len_;
217       size_t bytes_to_append = std::min(bytes_desired, len);
218       char* header_buffer = current_frame_buffer_;
219       memcpy(&header_buffer[current_frame_len_], data, bytes_to_append);
220       current_frame_len_ += bytes_to_append;
221       data += bytes_to_append;
222       len -= bytes_to_append;
223       // Check for an empty data frame.
224       if (current_frame_len_ == FlipFrame::size() &&
225           !current_frame.is_control_frame() &&
226           current_frame.length() == 0) {
227         if (current_frame.flags() & CONTROL_FLAG_FIN) {
228           FlipDataFrame data_frame(current_frame_buffer_, false);
229           visitor_->OnStreamFrameData(data_frame.stream_id(), NULL, 0);
230         }
231         CHANGE_STATE(FLIP_RESET);
232       }
233       break;
234     }
235     remaining_payload_ = current_frame.length();
236 
237     // This is just a sanity check for help debugging early frame errors.
238     if (remaining_payload_ > 1000000u) {
239       LOG(ERROR) <<
240           "Unexpectedly large frame.  Flip session is likely corrupt.";
241     }
242 
243     // if we're here, then we have the common header all received.
244     if (!current_frame.is_control_frame())
245       CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME);
246     else
247       CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER);
248   } while (false);
249 
250   return original_len - len;
251 }
252 
ProcessControlFrameHeader()253 void FlipFramer::ProcessControlFrameHeader() {
254   DCHECK_EQ(FLIP_NO_ERROR, error_code_);
255   DCHECK_LE(FlipFrame::size(), current_frame_len_);
256   FlipControlFrame current_control_frame(current_frame_buffer_, false);
257   // Do some sanity checking on the control frame sizes.
258   switch (current_control_frame.type()) {
259     case SYN_STREAM:
260       if (current_control_frame.length() <
261           FlipSynStreamControlFrame::size() - FlipControlFrame::size())
262         set_error(FLIP_INVALID_CONTROL_FRAME);
263       break;
264     case SYN_REPLY:
265       if (current_control_frame.length() <
266           FlipSynReplyControlFrame::size() - FlipControlFrame::size())
267         set_error(FLIP_INVALID_CONTROL_FRAME);
268       break;
269     case FIN_STREAM:
270       if (current_control_frame.length() !=
271           FlipFinStreamControlFrame::size() - FlipFrame::size())
272         set_error(FLIP_INVALID_CONTROL_FRAME);
273       break;
274     case NOOP:
275       // NOOP.  Swallow it.
276       CHANGE_STATE(FLIP_AUTO_RESET);
277       return;
278     default:
279       set_error(FLIP_UNKNOWN_CONTROL_TYPE);
280       break;
281   }
282 
283   // We only support version 1 of this protocol.
284   if (current_control_frame.version() != kFlipProtocolVersion)
285     set_error(FLIP_UNSUPPORTED_VERSION);
286 
287   remaining_control_payload_ = current_control_frame.length();
288   if (remaining_control_payload_ > kControlFrameBufferMaxSize)
289     set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE);
290 
291   if (error_code_)
292     return;
293 
294   ExpandControlFrameBuffer(remaining_control_payload_);
295   CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD);
296 }
297 
ProcessControlFramePayload(const char * data,size_t len)298 size_t FlipFramer::ProcessControlFramePayload(const char* data, size_t len) {
299   size_t original_len = len;
300   do {
301     if (remaining_control_payload_) {
302       size_t amount_to_consume = std::min(remaining_control_payload_, len);
303       memcpy(&current_frame_buffer_[current_frame_len_], data,
304              amount_to_consume);
305       current_frame_len_ += amount_to_consume;
306       data += amount_to_consume;
307       len -= amount_to_consume;
308       remaining_control_payload_ -= amount_to_consume;
309       remaining_payload_ -= amount_to_consume;
310       if (remaining_control_payload_)
311         break;
312     }
313     FlipControlFrame control_frame(current_frame_buffer_, false);
314     visitor_->OnControl(&control_frame);
315 
316     // If this is a FIN, tell the caller.
317     if (control_frame.type() == SYN_REPLY &&
318         control_frame.flags() & CONTROL_FLAG_FIN)
319       visitor_->OnStreamFrameData(control_frame.stream_id(), NULL, 0);
320 
321     CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD);
322   } while (false);
323   return original_len - len;
324 }
325 
ProcessDataFramePayload(const char * data,size_t len)326 size_t FlipFramer::ProcessDataFramePayload(const char* data, size_t len) {
327   size_t original_len = len;
328 
329   FlipDataFrame current_data_frame(current_frame_buffer_, false);
330   if (remaining_payload_) {
331     size_t amount_to_forward = std::min(remaining_payload_, len);
332     if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) {
333       if (current_data_frame.flags() & DATA_FLAG_COMPRESSED) {
334         // TODO(mbelshe): Assert that the decompressor is init'ed.
335         if (!InitializeDecompressor())
336           return NULL;
337 
338         size_t decompressed_max_size = amount_to_forward * 100;
339         scoped_ptr<char> decompressed(new char[decompressed_max_size]);
340         decompressor_->next_in = reinterpret_cast<Bytef*>(
341             const_cast<char*>(data));
342         decompressor_->avail_in = amount_to_forward;
343         decompressor_->next_out =
344             reinterpret_cast<Bytef*>(decompressed.get());
345         decompressor_->avail_out = decompressed_max_size;
346 
347         int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
348         if (rv != Z_OK) {
349           set_error(FLIP_DECOMPRESS_FAILURE);
350           return 0;
351         }
352         size_t decompressed_size = decompressed_max_size -
353                                    decompressor_->avail_out;
354         // Only inform the visitor if there is data.
355         if (decompressed_size)
356           visitor_->OnStreamFrameData(current_data_frame.stream_id(),
357                                       decompressed.get(),
358                                       decompressed_size);
359         amount_to_forward -= decompressor_->avail_in;
360       } else {
361         // The data frame was not compressed.
362         // Only inform the visitor if there is data.
363         if (amount_to_forward)
364           visitor_->OnStreamFrameData(current_data_frame.stream_id(),
365                                       data, amount_to_forward);
366       }
367     }
368     data += amount_to_forward;
369     len -= amount_to_forward;
370     remaining_payload_ -= amount_to_forward;
371 
372     // If the FIN flag is set, and there is no more data in this data
373     // frame, inform the visitor of EOF via a 0-length data frame.
374     if (!remaining_payload_ &&
375         current_data_frame.flags() & DATA_FLAG_FIN)
376       visitor_->OnStreamFrameData(current_data_frame.stream_id(), NULL,
377                                   0);
378   } else {
379     CHANGE_STATE(FLIP_AUTO_RESET);
380   }
381   return original_len - len;
382 }
383 
ExpandControlFrameBuffer(size_t size)384 void FlipFramer::ExpandControlFrameBuffer(size_t size) {
385   DCHECK(size < kControlFrameBufferMaxSize);
386   if (size < current_frame_capacity_)
387     return;
388 
389   int alloc_size = size + FlipFrame::size();
390   char* new_buffer = new char[alloc_size];
391   memcpy(new_buffer, current_frame_buffer_, current_frame_len_);
392   current_frame_capacity_ = alloc_size;
393   current_frame_buffer_ = new_buffer;
394 }
395 
ParseHeaderBlock(const FlipFrame * frame,FlipHeaderBlock * block)396 bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame,
397                                   FlipHeaderBlock* block) {
398   FlipControlFrame control_frame(frame->data(), false);
399   uint32 type = control_frame.type();
400   if (type != SYN_STREAM && type != SYN_REPLY)
401     return false;
402 
403   // Find the header data within the control frame.
404   scoped_ptr<FlipFrame> decompressed_frame(DecompressFrame(frame));
405   if (!decompressed_frame.get())
406     return false;
407   FlipSynStreamControlFrame syn_frame(decompressed_frame->data(), false);
408   const char *header_data = syn_frame.header_block();
409   int header_length = syn_frame.header_block_len();
410 
411   FlipFrameBuilder builder(header_data, header_length);
412   void* iter = NULL;
413   uint16 num_headers;
414   if (builder.ReadUInt16(&iter, &num_headers)) {
415     for (int index = 0; index < num_headers; ++index) {
416       std::string name;
417       std::string value;
418       if (!builder.ReadString(&iter, &name))
419         break;
420       if (!builder.ReadString(&iter, &value))
421         break;
422       if (block->find(name) == block->end()) {
423         (*block)[name] = value;
424       } else {
425         return false;
426       }
427     }
428     return true;
429   }
430   return false;
431 }
432 
CreateSynStream(FlipStreamId stream_id,int priority,FlipControlFlags flags,bool compressed,FlipHeaderBlock * headers)433 FlipSynStreamControlFrame* FlipFramer::CreateSynStream(
434     FlipStreamId stream_id, int priority, FlipControlFlags flags,
435     bool compressed, FlipHeaderBlock* headers) {
436   FlipFrameBuilder frame;
437 
438   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
439   frame.WriteUInt16(SYN_STREAM);
440   frame.WriteUInt32(0);  // Placeholder for the length and flags
441   frame.WriteUInt32(stream_id);
442   frame.WriteUInt16(ntohs(priority) << 6);  // Priority.
443 
444   frame.WriteUInt16(headers->size());  // Number of headers.
445   FlipHeaderBlock::iterator it;
446   for (it = headers->begin(); it != headers->end(); ++it) {
447     frame.WriteString(it->first);
448     frame.WriteString(it->second);
449   }
450 
451   // Write the length and flags.
452   size_t length = frame.length() - FlipFrame::size();
453   DCHECK(length < static_cast<size_t>(kLengthMask));
454   FlagsAndLength flags_length;
455   flags_length.length_ = htonl(static_cast<uint32>(length));
456   flags_length.flags_[0] = flags;
457   frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length));
458 
459   scoped_ptr<FlipFrame> syn_frame(frame.take());
460   if (compressed) {
461     return reinterpret_cast<FlipSynStreamControlFrame*>(
462         CompressFrame(syn_frame.get()));
463   }
464   return reinterpret_cast<FlipSynStreamControlFrame*>(syn_frame.release());
465 }
466 
467 /* static */
CreateFinStream(FlipStreamId stream_id,int status)468 FlipFinStreamControlFrame* FlipFramer::CreateFinStream(FlipStreamId stream_id,
469                                                        int status) {
470   FlipFrameBuilder frame;
471   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
472   frame.WriteUInt16(FIN_STREAM);
473   frame.WriteUInt32(8);
474   frame.WriteUInt32(stream_id);
475   frame.WriteUInt32(status);
476   return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take());
477 }
478 
CreateSynReply(FlipStreamId stream_id,FlipControlFlags flags,bool compressed,FlipHeaderBlock * headers)479 FlipSynReplyControlFrame* FlipFramer::CreateSynReply(FlipStreamId stream_id,
480     FlipControlFlags flags, bool compressed, FlipHeaderBlock* headers) {
481 
482   FlipFrameBuilder frame;
483 
484   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
485   frame.WriteUInt16(SYN_REPLY);
486   frame.WriteUInt32(0);  // Placeholder for the length and flags.
487   frame.WriteUInt32(stream_id);
488   frame.WriteUInt16(0);  // Unused
489 
490   frame.WriteUInt16(headers->size());  // Number of headers.
491   FlipHeaderBlock::iterator it;
492   for (it = headers->begin(); it != headers->end(); ++it) {
493     // TODO(mbelshe): Headers need to be sorted.
494     frame.WriteString(it->first);
495     frame.WriteString(it->second);
496   }
497 
498   // Write the length
499   size_t length = frame.length() - FlipFrame::size();
500   DCHECK(length < static_cast<size_t>(kLengthMask));
501   FlagsAndLength flags_length;
502   flags_length.length_ = htonl(static_cast<uint32>(length));
503   flags_length.flags_[0] = flags;
504   frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length));
505 
506   scoped_ptr<FlipFrame> reply_frame(frame.take());
507   if (compressed) {
508     return reinterpret_cast<FlipSynReplyControlFrame*>(
509         CompressFrame(reply_frame.get()));
510   }
511   return reinterpret_cast<FlipSynReplyControlFrame*>(reply_frame.release());
512 }
513 
CreateDataFrame(FlipStreamId stream_id,const char * data,uint32 len,FlipDataFlags flags)514 FlipDataFrame* FlipFramer::CreateDataFrame(FlipStreamId stream_id,
515                                            const char* data,
516                                            uint32 len, FlipDataFlags flags) {
517   FlipFrameBuilder frame;
518 
519   frame.WriteUInt32(stream_id);
520 
521   DCHECK(len < static_cast<size_t>(kLengthMask));
522   FlagsAndLength flags_length;
523   flags_length.length_ = htonl(len);
524   flags_length.flags_[0] = flags;
525   frame.WriteBytes(&flags_length, sizeof(flags_length));
526 
527   frame.WriteBytes(data, len);
528   scoped_ptr<FlipFrame> data_frame(frame.take());
529   if (flags & DATA_FLAG_COMPRESSED)
530     return reinterpret_cast<FlipDataFrame*>(CompressFrame(data_frame.get()));
531   return reinterpret_cast<FlipDataFrame*>(data_frame.release());
532 }
533 
534 /* static */
CreateNopFrame()535 FlipControlFrame* FlipFramer::CreateNopFrame() {
536   FlipFrameBuilder frame;
537   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
538   frame.WriteUInt16(NOOP);
539   frame.WriteUInt32(0);
540   return reinterpret_cast<FlipControlFrame*>(frame.take());
541 }
542 
543 static const int kCompressorLevel = Z_DEFAULT_COMPRESSION;
544 // This is just a hacked dictionary to use for shrinking HTTP-like headers.
545 // TODO(mbelshe): Use a scientific methodology for computing the dictionary.
546 static const char dictionary[] =
547   "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-"
548   "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi"
549   "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser"
550   "-agent10010120020120220320420520630030130230330430530630740040140240340440"
551   "5406407408409410411412413414415416417500501502503504505accept-rangesageeta"
552   "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic"
553   "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran"
554   "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati"
555   "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo"
556   "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe"
557   "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic"
558   "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1"
559   ".1statusversionurl";
560 static uLong dictionary_id = 0;
561 
InitializeCompressor()562 bool FlipFramer::InitializeCompressor() {
563   if (compressor_.get())
564     return true;  // Already initialized.
565 
566   compressor_.reset(new z_stream);
567   memset(compressor_.get(), 0, sizeof(z_stream));
568 
569   int success = deflateInit(compressor_.get(), kCompressorLevel);
570   if (success == Z_OK)
571     success = deflateSetDictionary(compressor_.get(),
572                                    reinterpret_cast<const Bytef*>(dictionary),
573                                    sizeof(dictionary));
574   if (success != Z_OK)
575     compressor_.reset(NULL);
576   return success == Z_OK;
577 }
578 
InitializeDecompressor()579 bool FlipFramer::InitializeDecompressor() {
580   if (decompressor_.get())
581     return true;  // Already initialized.
582 
583   decompressor_.reset(new z_stream);
584   memset(decompressor_.get(), 0, sizeof(z_stream));
585 
586   // Compute the id of our dictionary so that we know we're using the
587   // right one when asked for it.
588   if (dictionary_id == 0) {
589     dictionary_id = adler32(0L, Z_NULL, 0);
590     dictionary_id = adler32(dictionary_id,
591                             reinterpret_cast<const Bytef*>(dictionary),
592                             sizeof(dictionary));
593   }
594 
595   int success = inflateInit(decompressor_.get());
596   if (success != Z_OK)
597     decompressor_.reset(NULL);
598   return success == Z_OK;
599 }
600 
GetFrameBoundaries(const FlipFrame * frame,int * payload_length,int * header_length,const char ** payload) const601 bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame,
602                                     int* payload_length,
603                                     int* header_length,
604                                     const char** payload) const {
605   if (frame->is_control_frame()) {
606     const FlipControlFrame* control_frame =
607         reinterpret_cast<const FlipControlFrame*>(frame);
608     switch (control_frame->type()) {
609       case SYN_STREAM:
610       case SYN_REPLY:
611         {
612           const FlipSynStreamControlFrame *syn_frame =
613               reinterpret_cast<const FlipSynStreamControlFrame*>(frame);
614           *payload_length = syn_frame->header_block_len();
615           *header_length = syn_frame->size();
616           *payload = frame->data() + *header_length;
617         }
618         break;
619       default:
620         // TODO(mbelshe): set an error?
621         return false;  // We can't compress this frame!
622     }
623   } else {
624     *header_length = FlipFrame::size();
625     *payload_length = frame->length();
626     *payload = frame->data() + FlipFrame::size();
627   }
628   DCHECK(static_cast<size_t>(*header_length) <=
629       FlipFrame::size() + *payload_length);
630   return true;
631 }
632 
633 
CompressFrame(const FlipFrame * frame)634 FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) {
635   int payload_length;
636   int header_length;
637   const char* payload;
638 
639   static StatsCounter pre_compress_bytes("flip.PreCompressSize");
640   static StatsCounter post_compress_bytes("flip.PostCompressSize");
641 
642   if (!enable_compression_)
643     return DuplicateFrame(frame);
644 
645   if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
646     return NULL;
647 
648   if (!InitializeCompressor())
649     return NULL;
650 
651   // TODO(mbelshe): Should we have a zlib header like what http servers do?
652 
653   // Create an output frame.
654   int compressed_max_size = deflateBound(compressor_.get(), payload_length);
655   int new_frame_size = header_length + compressed_max_size;
656   FlipFrame* new_frame = new FlipFrame(new_frame_size);
657   memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size());
658 
659   compressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload));
660   compressor_->avail_in = payload_length;
661   compressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) +
662                           header_length;
663   compressor_->avail_out = compressed_max_size;
664 
665   // Data packets have a 'compressed flag
666   if (!new_frame->is_control_frame()) {
667     FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
668     data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED);
669   }
670 
671   int rv = deflate(compressor_.get(), Z_SYNC_FLUSH);
672   if (rv != Z_OK) {  // How can we know that it compressed everything?
673     // This shouldn't happen, right?
674     delete new_frame;
675     return NULL;
676   }
677 
678   int compressed_size = compressed_max_size - compressor_->avail_out;
679   new_frame->set_length(header_length + compressed_size - FlipFrame::size());
680 
681   pre_compress_bytes.Add(payload_length);
682   post_compress_bytes.Add(new_frame->length());
683 
684   return new_frame;
685 }
686 
DecompressFrame(const FlipFrame * frame)687 FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) {
688   int payload_length;
689   int header_length;
690   const char* payload;
691 
692   static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize");
693   static StatsCounter post_decompress_bytes("flip.PostDeCompressSize");
694 
695   if (!enable_compression_)
696     return DuplicateFrame(frame);
697 
698   if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
699     return NULL;
700 
701   if (!frame->is_control_frame()) {
702     const FlipDataFrame* data_frame =
703         reinterpret_cast<const FlipDataFrame*>(frame);
704     if ((data_frame->flags() & DATA_FLAG_COMPRESSED) == 0)
705       return DuplicateFrame(frame);
706   }
707 
708   if (!InitializeDecompressor())
709     return NULL;
710 
711   // TODO(mbelshe): Should we have a zlib header like what http servers do?
712 
713   // Create an output frame.  Assume it does not need to be longer than
714   // the input data.
715   int decompressed_max_size = kControlFrameBufferInitialSize;
716   int new_frame_size = header_length + decompressed_max_size;
717   FlipFrame* new_frame = new FlipFrame(new_frame_size);
718   memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size());
719 
720   decompressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload));
721   decompressor_->avail_in = payload_length;
722   decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) +
723       header_length;
724   decompressor_->avail_out = decompressed_max_size;
725 
726   int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
727   if (rv == Z_NEED_DICT) {
728     // Need to try again with the right dictionary.
729     if (decompressor_->adler == dictionary_id) {
730       rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary,
731                                 sizeof(dictionary));
732       if (rv == Z_OK)
733         rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
734     }
735   }
736   if (rv != Z_OK) {  // How can we know that it decompressed everything?
737     delete new_frame;
738     return NULL;
739   }
740 
741   // Unset the compressed flag for data frames.
742   if (!new_frame->is_control_frame()) {
743     FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
744     data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED);
745   }
746 
747   int decompressed_size = decompressed_max_size - decompressor_->avail_out;
748   new_frame->set_length(header_length + decompressed_size - FlipFrame::size());
749 
750   pre_decompress_bytes.Add(frame->length());
751   post_decompress_bytes.Add(new_frame->length());
752 
753   return new_frame;
754 }
755 
DuplicateFrame(const FlipFrame * frame)756 FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) {
757   int size = FlipFrame::size() + frame->length();
758   FlipFrame* new_frame = new FlipFrame(size);
759   memcpy(new_frame->data(), frame->data(), size);
760   return new_frame;
761 }
762 
set_enable_compression(bool value)763 void FlipFramer::set_enable_compression(bool value) {
764   enable_compression_ = value;
765 }
766 
set_enable_compression_default(bool value)767 void FlipFramer::set_enable_compression_default(bool value) {
768   compression_default_ = value;
769 }
770 
771 }  // namespace flip
772 
773