• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // Streams classes.
6 //
7 // These memory-resident streams are used for serializing data into a sequential
8 // region of memory.
9 //
10 // Streams are divided into SourceStreams for reading and SinkStreams for
11 // writing.  Streams are aggregated into Sets which allows several streams to be
12 // used at once.  Example: we can write A1, B1, A2, B2 but achieve the memory
13 // layout A1 A2 B1 B2 by writing 'A's to one stream and 'B's to another.
14 //
15 // The aggregated streams are important to Courgette's compression efficiency,
16 // we use it to cluster similar kinds of data which helps to generate longer
17 // common subsequences and repeated sequences.
18 
19 #include "courgette/streams.h"
20 
21 #include <memory.h>
22 
23 #include "base/basictypes.h"
24 #include "base/logging.h"
25 
26 namespace courgette {
27 
28 // Update this version number if the serialization format of a StreamSet
29 // changes.
30 static const unsigned int kStreamsSerializationFormatVersion = 20090218;
31 
32 //
33 // This is a cut down Varint implementation, implementing only what we use for
34 // streams.
35 //
36 class Varint {
37  public:
38   // Maximum lengths of varint encoding of uint32
39   static const int kMax32 = 5;
40 
41   // Parses a Varint32 encoded value from |source| and stores it in |output|,
42   // and returns a pointer to the following byte.  Returns NULL if a valid
43   // varint value was not found before |limit|.
44   static const uint8* Parse32WithLimit(const uint8* source, const uint8* limit,
45                                        uint32* output);
46 
47   // Writes the Varint32 encoded representation of |value| to buffer
48   // |destination|.  |destination| must have sufficient length to hold kMax32
49   // bytes.  Returns a pointer to the byte just past the last encoded byte.
50   static uint8* Encode32(uint8* destination, uint32 value);
51 };
52 
53 // Parses a Varint32 encoded unsigned number from |source|.  The Varint32
54 // encoding is a little-endian sequence of bytes containing base-128 digits,
55 // with the high order bit set to indicate if there are more digits.
56 //
57 // For each byte, we mask out the digit and 'or' it into the right place in the
58 // result.
59 //
60 // The digit loop is unrolled for performance.  It usually exits after the first
61 // one or two digits.
Parse32WithLimit(const uint8 * source,const uint8 * limit,uint32 * output)62 const uint8* Varint::Parse32WithLimit(const uint8* source,
63                                       const uint8* limit,
64                                       uint32* output) {
65   uint32 digit, result;
66   if (source >= limit)
67     return NULL;
68   digit = *(source++);
69   result = digit & 127;
70   if (digit < 128) {
71     *output = result;
72     return source;
73   }
74 
75   if (source >= limit)
76     return NULL;
77   digit = *(source++);
78   result |= (digit & 127) <<  7;
79   if (digit < 128) {
80     *output = result;
81     return source;
82   }
83 
84   if (source >= limit)
85     return NULL;
86   digit = *(source++);
87   result |= (digit & 127) << 14;
88   if (digit < 128) {
89     *output = result;
90     return source;
91   }
92 
93   if (source >= limit)
94     return NULL;
95   digit = *(source++);
96   result |= (digit & 127) << 21;
97   if (digit < 128) {
98     *output = result;
99     return source;
100   }
101 
102   if (source >= limit)
103     return NULL;
104   digit = *(source++);
105   result |= (digit & 127) << 28;
106   if (digit < 128) {
107     *output = result;
108     return source;
109   }
110 
111   return NULL;  // Value is too long to be a Varint32.
112 }
113 
114 // Write the base-128 digits in little-endian order.  All except the last digit
115 // have the high bit set to indicate more digits.
Encode32(uint8 * destination,uint32 value)116 inline uint8* Varint::Encode32(uint8* destination, uint32 value) {
117   while (value >= 128) {
118     *(destination++) = value | 128;
119     value = value >> 7;
120   }
121   *(destination++) = value;
122   return destination;
123 }
124 
Init(const SinkStream & sink)125 void SourceStream::Init(const SinkStream& sink) {
126   Init(sink.Buffer(), sink.Length());
127 }
128 
Read(void * destination,size_t count)129 bool SourceStream::Read(void* destination, size_t count) {
130   if (current_ + count > end_)
131     return false;
132   memcpy(destination, current_, count);
133   current_ += count;
134   return true;
135 }
136 
ReadVarint32(uint32 * output_value)137 bool SourceStream::ReadVarint32(uint32* output_value) {
138   const uint8* after = Varint::Parse32WithLimit(current_, end_, output_value);
139   if (!after)
140     return false;
141   current_ = after;
142   return true;
143 }
144 
ReadVarint32Signed(int32 * output_value)145 bool SourceStream::ReadVarint32Signed(int32* output_value) {
146   // Signed numbers are encoded as unsigned numbers so that numbers nearer zero
147   // have shorter varint encoding.
148   //  0000xxxx encoded as 000xxxx0.
149   //  1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx.
150   uint32 unsigned_value;
151   if (!ReadVarint32(&unsigned_value))
152     return false;
153   if (unsigned_value & 1)
154     *output_value = ~static_cast<int32>(unsigned_value >> 1);
155   else
156     *output_value = (unsigned_value >> 1);
157   return true;
158 }
159 
ShareSubstream(size_t offset,size_t length,SourceStream * substream)160 bool SourceStream::ShareSubstream(size_t offset, size_t length,
161                                   SourceStream* substream) {
162   if (offset > Remaining())
163     return false;
164   if (length > Remaining() - offset)
165     return false;
166   substream->Init(current_ + offset, length);
167   return true;
168 }
169 
ReadSubstream(size_t length,SourceStream * substream)170 bool SourceStream::ReadSubstream(size_t length, SourceStream* substream) {
171   if (!ShareSubstream(0, length, substream))
172     return false;
173   current_ += length;
174   return true;
175 }
176 
Skip(size_t byte_count)177 bool SourceStream::Skip(size_t byte_count) {
178   if (current_ + byte_count > end_)
179     return false;
180   current_ += byte_count;
181   return true;
182 }
183 
Write(const void * data,size_t byte_count)184 CheckBool SinkStream::Write(const void* data, size_t byte_count) {
185   return buffer_.append(static_cast<const char*>(data), byte_count);
186 }
187 
WriteVarint32(uint32 value)188 CheckBool SinkStream::WriteVarint32(uint32 value) {
189   uint8 buffer[Varint::kMax32];
190   uint8* end = Varint::Encode32(buffer, value);
191   return Write(buffer, end - buffer);
192 }
193 
WriteVarint32Signed(int32 value)194 CheckBool SinkStream::WriteVarint32Signed(int32 value) {
195   // Encode signed numbers so that numbers nearer zero have shorter
196   // varint encoding.
197   //  0000xxxx encoded as 000xxxx0.
198   //  1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx.
199   bool ret;
200   if (value < 0)
201     ret = WriteVarint32(~value * 2 + 1);
202   else
203     ret = WriteVarint32(value * 2);
204   return ret;
205 }
206 
WriteSizeVarint32(size_t value)207 CheckBool SinkStream::WriteSizeVarint32(size_t value) {
208   uint32 narrowed_value = static_cast<uint32>(value);
209   // On 32-bit, the compiler should figure out this test always fails.
210   LOG_ASSERT(value == narrowed_value);
211   return WriteVarint32(narrowed_value);
212 }
213 
Append(SinkStream * other)214 CheckBool SinkStream::Append(SinkStream* other) {
215   bool ret = Write(other->buffer_.data(), other->buffer_.size());
216   if (ret)
217     other->Retire();
218   return ret;
219 }
220 
Retire()221 void SinkStream::Retire() {
222   buffer_.clear();
223 }
224 
225 ////////////////////////////////////////////////////////////////////////////////
226 
SourceStreamSet()227 SourceStreamSet::SourceStreamSet()
228     : count_(kMaxStreams) {
229 }
230 
~SourceStreamSet()231 SourceStreamSet::~SourceStreamSet() {
232 }
233 
234 
235 // Initializes from |source|.
236 // The stream set for N streams is serialized as a header
237 //   <version><N><length1><length2>...<lengthN>
238 // followed by the stream contents
239 //   <bytes1><bytes2>...<bytesN>
240 //
Init(const void * source,size_t byte_count)241 bool SourceStreamSet::Init(const void* source, size_t byte_count) {
242   const uint8* start = static_cast<const uint8*>(source);
243   const uint8* end = start + byte_count;
244 
245   unsigned int version;
246   const uint8* finger = Varint::Parse32WithLimit(start, end, &version);
247   if (finger == NULL)
248     return false;
249   if (version != kStreamsSerializationFormatVersion)
250     return false;
251 
252   unsigned int count;
253   finger = Varint::Parse32WithLimit(finger, end, &count);
254   if (finger == NULL)
255     return false;
256   if (count > kMaxStreams)
257     return false;
258 
259   count_ = count;
260 
261   unsigned int lengths[kMaxStreams];
262   size_t accumulated_length = 0;
263 
264   for (size_t i = 0; i < count_; ++i) {
265     finger = Varint::Parse32WithLimit(finger, end, &lengths[i]);
266     if (finger == NULL)
267       return false;
268     accumulated_length += lengths[i];
269   }
270 
271   // Remaining bytes should add up to sum of lengths.
272   if (static_cast<size_t>(end - finger) != accumulated_length)
273     return false;
274 
275   accumulated_length = finger - start;
276   for (size_t i = 0; i < count_; ++i) {
277     stream(i)->Init(start + accumulated_length, lengths[i]);
278     accumulated_length += lengths[i];
279   }
280 
281   return true;
282 }
283 
Init(SourceStream * source)284 bool SourceStreamSet::Init(SourceStream* source) {
285   // TODO(sra): consume the rest of |source|.
286   return Init(source->Buffer(), source->Remaining());
287 }
288 
ReadSet(SourceStreamSet * set)289 bool SourceStreamSet::ReadSet(SourceStreamSet* set) {
290   uint32 stream_count = 0;
291   SourceStream* control_stream = this->stream(0);
292   if (!control_stream->ReadVarint32(&stream_count))
293     return false;
294 
295   uint32 lengths[kMaxStreams] = {};  // i.e. all zero.
296 
297   for (size_t i = 0; i < stream_count; ++i) {
298     if (!control_stream->ReadVarint32(&lengths[i]))
299       return false;
300   }
301 
302   for (size_t i = 0; i < stream_count; ++i) {
303     if (!this->stream(i)->ReadSubstream(lengths[i], set->stream(i)))
304       return false;
305   }
306   return true;
307 }
308 
Empty() const309 bool SourceStreamSet::Empty() const {
310   for (size_t i = 0; i < count_; ++i) {
311     if (streams_[i].Remaining() != 0)
312       return false;
313   }
314   return true;
315 }
316 
317 ////////////////////////////////////////////////////////////////////////////////
318 
SinkStreamSet()319 SinkStreamSet::SinkStreamSet()
320   : count_(kMaxStreams) {
321 }
322 
~SinkStreamSet()323 SinkStreamSet::~SinkStreamSet() {
324 }
325 
Init(size_t stream_index_limit)326 void SinkStreamSet::Init(size_t stream_index_limit) {
327   count_ = stream_index_limit;
328 }
329 
330 // The header for a stream set for N streams is serialized as
331 //   <version><N><length1><length2>...<lengthN>
CopyHeaderTo(SinkStream * header)332 CheckBool SinkStreamSet::CopyHeaderTo(SinkStream* header) {
333   bool ret = header->WriteVarint32(kStreamsSerializationFormatVersion);
334   if (ret) {
335     ret = header->WriteSizeVarint32(count_);
336     for (size_t i = 0; ret && i < count_; ++i) {
337       ret = header->WriteSizeVarint32(stream(i)->Length());
338     }
339   }
340   return ret;
341 }
342 
343 // Writes |this| to |combined_stream|.  See SourceStreamSet::Init for the layout
344 // of the stream metadata and contents.
CopyTo(SinkStream * combined_stream)345 CheckBool SinkStreamSet::CopyTo(SinkStream *combined_stream) {
346   SinkStream header;
347   bool ret = CopyHeaderTo(&header);
348   if (!ret)
349     return ret;
350 
351   // Reserve the correct amount of storage.
352   size_t length = header.Length();
353   for (size_t i = 0; i < count_; ++i) {
354     length += stream(i)->Length();
355   }
356   ret = combined_stream->Reserve(length);
357   if (ret) {
358     ret = combined_stream->Append(&header);
359     for (size_t i = 0; ret && i < count_; ++i) {
360       ret = combined_stream->Append(stream(i));
361     }
362   }
363   return ret;
364 }
365 
WriteSet(SinkStreamSet * set)366 CheckBool SinkStreamSet::WriteSet(SinkStreamSet* set) {
367   uint32 lengths[kMaxStreams];
368   // 'stream_count' includes all non-empty streams and all empty stream numbered
369   // lower than a non-empty stream.
370   size_t stream_count = 0;
371   for (size_t i = 0; i < kMaxStreams; ++i) {
372     SinkStream* stream = set->stream(i);
373     lengths[i] = static_cast<uint32>(stream->Length());
374     if (lengths[i] > 0)
375       stream_count = i + 1;
376   }
377 
378   SinkStream* control_stream = this->stream(0);
379   bool ret = control_stream->WriteSizeVarint32(stream_count);
380   for (size_t i = 0; ret && i < stream_count; ++i) {
381     ret = control_stream->WriteSizeVarint32(lengths[i]);
382   }
383 
384   for (size_t i = 0; ret && i < stream_count; ++i) {
385     ret = this->stream(i)->Append(set->stream(i));
386   }
387   return ret;
388 }
389 
390 }  // namespace
391