• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/quic/quic_stream_sequencer.h"
6 
7 #include <algorithm>
8 #include <limits>
9 
10 #include "base/logging.h"
11 #include "base/metrics/sparse_histogram.h"
12 #include "net/quic/reliable_quic_stream.h"
13 
14 using std::make_pair;
15 using std::min;
16 using std::numeric_limits;
17 
18 namespace net {
19 
QuicStreamSequencer(ReliableQuicStream * quic_stream)20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
21     : stream_(quic_stream),
22       num_bytes_consumed_(0),
23       close_offset_(numeric_limits<QuicStreamOffset>::max()),
24       blocked_(false),
25       num_bytes_buffered_(0),
26       num_frames_received_(0),
27       num_duplicate_frames_received_(0) {
28 }
29 
~QuicStreamSequencer()30 QuicStreamSequencer::~QuicStreamSequencer() {
31 }
32 
OnStreamFrame(const QuicStreamFrame & frame)33 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
34   ++num_frames_received_;
35   if (IsDuplicate(frame)) {
36     ++num_duplicate_frames_received_;
37     // Silently ignore duplicates.
38     return;
39   }
40 
41   if (FrameOverlapsBufferedData(frame)) {
42     stream_->CloseConnectionWithDetails(
43         QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
44     return;
45   }
46 
47   QuicStreamOffset byte_offset = frame.offset;
48   size_t data_len = frame.data.TotalBufferSize();
49   if (data_len == 0 && !frame.fin) {
50     // Stream frames must have data or a fin flag.
51     stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
52                                         "Empty stream frame without FIN set.");
53     return;
54   }
55 
56   if (frame.fin) {
57     CloseStreamAtOffset(frame.offset + data_len);
58     if (data_len == 0) {
59       return;
60     }
61   }
62 
63   IOVector data;
64   data.AppendIovec(frame.data.iovec(), frame.data.Size());
65 
66   // If the frame has arrived in-order then we can process it immediately, only
67   // buffering if the stream is unable to process it.
68   if (!blocked_ && byte_offset == num_bytes_consumed_) {
69     DVLOG(1) << "Processing byte offset " << byte_offset;
70     size_t bytes_consumed = 0;
71     for (size_t i = 0; i < data.Size(); ++i) {
72       bytes_consumed += stream_->ProcessRawData(
73           static_cast<char*>(data.iovec()[i].iov_base),
74           data.iovec()[i].iov_len);
75     }
76     num_bytes_consumed_ += bytes_consumed;
77     stream_->AddBytesConsumed(bytes_consumed);
78 
79     if (MaybeCloseStream()) {
80       return;
81     }
82     if (bytes_consumed > data_len) {
83       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
84       return;
85     } else if (bytes_consumed == data_len) {
86       FlushBufferedFrames();
87       return;  // it's safe to ack this frame.
88     } else {
89       // Set ourselves up to buffer what's left.
90       data_len -= bytes_consumed;
91       data.Consume(bytes_consumed);
92       byte_offset += bytes_consumed;
93     }
94   }
95 
96   // Buffer any remaining data to be consumed by the stream when ready.
97   for (size_t i = 0; i < data.Size(); ++i) {
98     DVLOG(1) << "Buffering stream data at offset " << byte_offset;
99     const iovec& iov = data.iovec()[i];
100     buffered_frames_.insert(make_pair(
101         byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
102     byte_offset += iov.iov_len;
103     num_bytes_buffered_ += iov.iov_len;
104   }
105   return;
106 }
107 
CloseStreamAtOffset(QuicStreamOffset offset)108 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
109   const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
110 
111   // If we have a scheduled termination or close, any new offset should match
112   // it.
113   if (close_offset_ != kMaxOffset && offset != close_offset_) {
114     stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
115     return;
116   }
117 
118   close_offset_ = offset;
119 
120   MaybeCloseStream();
121 }
122 
MaybeCloseStream()123 bool QuicStreamSequencer::MaybeCloseStream() {
124   if (!blocked_ && IsClosed()) {
125     DVLOG(1) << "Passing up termination, as we've processed "
126              << num_bytes_consumed_ << " of " << close_offset_
127              << " bytes.";
128     // Technically it's an error if num_bytes_consumed isn't exactly
129     // equal, but error handling seems silly at this point.
130     stream_->OnFinRead();
131     buffered_frames_.clear();
132     num_bytes_buffered_ = 0;
133     return true;
134   }
135   return false;
136 }
137 
GetReadableRegions(iovec * iov,size_t iov_len)138 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
139   DCHECK(!blocked_);
140   FrameMap::iterator it = buffered_frames_.begin();
141   size_t index = 0;
142   QuicStreamOffset offset = num_bytes_consumed_;
143   while (it != buffered_frames_.end() && index < iov_len) {
144     if (it->first != offset) return index;
145 
146     iov[index].iov_base = static_cast<void*>(
147         const_cast<char*>(it->second.data()));
148     iov[index].iov_len = it->second.size();
149     offset += it->second.size();
150 
151     ++index;
152     ++it;
153   }
154   return index;
155 }
156 
Readv(const struct iovec * iov,size_t iov_len)157 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
158   DCHECK(!blocked_);
159   FrameMap::iterator it = buffered_frames_.begin();
160   size_t iov_index = 0;
161   size_t iov_offset = 0;
162   size_t frame_offset = 0;
163   size_t initial_bytes_consumed = num_bytes_consumed_;
164 
165   while (iov_index < iov_len &&
166          it != buffered_frames_.end() &&
167          it->first == num_bytes_consumed_) {
168     int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
169                             it->second.size() - frame_offset);
170 
171     char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
172     memcpy(iov_ptr,
173            it->second.data() + frame_offset, bytes_to_read);
174     frame_offset += bytes_to_read;
175     iov_offset += bytes_to_read;
176 
177     if (iov[iov_index].iov_len == iov_offset) {
178       // We've filled this buffer.
179       iov_offset = 0;
180       ++iov_index;
181     }
182     if (it->second.size() == frame_offset) {
183       // We've copied this whole frame
184       RecordBytesConsumed(it->second.size());
185       buffered_frames_.erase(it);
186       it = buffered_frames_.begin();
187       frame_offset = 0;
188     }
189   }
190   // We've finished copying.  If we have a partial frame, update it.
191   if (frame_offset != 0) {
192     buffered_frames_.insert(
193         make_pair(it->first + frame_offset, it->second.substr(frame_offset)));
194     buffered_frames_.erase(buffered_frames_.begin());
195     RecordBytesConsumed(frame_offset);
196   }
197   return num_bytes_consumed_ - initial_bytes_consumed;
198 }
199 
HasBytesToRead() const200 bool QuicStreamSequencer::HasBytesToRead() const {
201   FrameMap::const_iterator it = buffered_frames_.begin();
202 
203   return it != buffered_frames_.end() && it->first == num_bytes_consumed_;
204 }
205 
IsClosed() const206 bool QuicStreamSequencer::IsClosed() const {
207   return num_bytes_consumed_ >= close_offset_;
208 }
209 
FrameOverlapsBufferedData(const QuicStreamFrame & frame) const210 bool QuicStreamSequencer::FrameOverlapsBufferedData(
211     const QuicStreamFrame& frame) const {
212   if (buffered_frames_.empty()) {
213     return false;
214   }
215 
216   FrameMap::const_iterator next_frame =
217       buffered_frames_.lower_bound(frame.offset);
218   // Duplicate frames should have been dropped in IsDuplicate.
219   DCHECK(next_frame == buffered_frames_.end() ||
220          next_frame->first != frame.offset);
221 
222   // If there is a buffered frame with a higher starting offset, then we check
223   // to see if the new frame runs into the higher frame.
224   if (next_frame != buffered_frames_.end() &&
225       (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) {
226     DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + "
227              << frame.data.TotalBufferSize() << " > " << next_frame->first;
228     return true;
229   }
230 
231   // If there is a buffered frame with a lower starting offset, then we check
232   // to see if the buffered frame runs into the new frame.
233   if (next_frame != buffered_frames_.begin()) {
234     FrameMap::const_iterator preceeding_frame = --next_frame;
235     QuicStreamOffset offset = preceeding_frame->first;
236     uint64 data_length = preceeding_frame->second.length();
237     if ((offset + data_length) > frame.offset) {
238       DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + "
239                << data_length << " > " << frame.offset;
240       return true;
241     }
242   }
243   return false;
244 }
245 
IsDuplicate(const QuicStreamFrame & frame) const246 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
247   // A frame is duplicate if the frame offset is smaller than our bytes consumed
248   // or we have stored the frame in our map.
249   // TODO(pwestin): Is it possible that a new frame contain more data even if
250   // the offset is the same?
251   return frame.offset < num_bytes_consumed_ ||
252       buffered_frames_.find(frame.offset) != buffered_frames_.end();
253 }
254 
SetBlockedUntilFlush()255 void QuicStreamSequencer::SetBlockedUntilFlush() {
256   blocked_ = true;
257 }
258 
FlushBufferedFrames()259 void QuicStreamSequencer::FlushBufferedFrames() {
260   blocked_ = false;
261   FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_);
262   while (it != buffered_frames_.end()) {
263     DVLOG(1) << "Flushing buffered packet at offset " << it->first;
264     string* data = &it->second;
265     size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
266                                                     data->size());
267     RecordBytesConsumed(bytes_consumed);
268     if (MaybeCloseStream()) {
269       return;
270     }
271     if (bytes_consumed > data->size()) {
272       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);  // Programming error
273       return;
274     } else if (bytes_consumed == data->size()) {
275       buffered_frames_.erase(it);
276       it = buffered_frames_.find(num_bytes_consumed_);
277     } else {
278       string new_data = it->second.substr(bytes_consumed);
279       buffered_frames_.erase(it);
280       buffered_frames_.insert(make_pair(num_bytes_consumed_, new_data));
281       return;
282     }
283   }
284   MaybeCloseStream();
285 }
286 
RecordBytesConsumed(size_t bytes_consumed)287 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
288   num_bytes_consumed_ += bytes_consumed;
289   num_bytes_buffered_ -= bytes_consumed;
290 
291   stream_->AddBytesConsumed(bytes_consumed);
292 }
293 
294 }  // namespace net
295