1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_stream_sequencer.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "base/logging.h"
11 #include "base/metrics/sparse_histogram.h"
12 #include "net/quic/reliable_quic_stream.h"
13
14 using std::make_pair;
15 using std::min;
16 using std::numeric_limits;
17
18 namespace net {
19
QuicStreamSequencer(ReliableQuicStream * quic_stream)20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
21 : stream_(quic_stream),
22 num_bytes_consumed_(0),
23 close_offset_(numeric_limits<QuicStreamOffset>::max()),
24 blocked_(false),
25 num_bytes_buffered_(0),
26 num_frames_received_(0),
27 num_duplicate_frames_received_(0) {
28 }
29
~QuicStreamSequencer()30 QuicStreamSequencer::~QuicStreamSequencer() {
31 }
32
OnStreamFrame(const QuicStreamFrame & frame)33 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
34 ++num_frames_received_;
35 if (IsDuplicate(frame)) {
36 ++num_duplicate_frames_received_;
37 // Silently ignore duplicates.
38 return;
39 }
40
41 if (FrameOverlapsBufferedData(frame)) {
42 stream_->CloseConnectionWithDetails(
43 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
44 return;
45 }
46
47 QuicStreamOffset byte_offset = frame.offset;
48 size_t data_len = frame.data.TotalBufferSize();
49 if (data_len == 0 && !frame.fin) {
50 // Stream frames must have data or a fin flag.
51 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
52 "Empty stream frame without FIN set.");
53 return;
54 }
55
56 if (frame.fin) {
57 CloseStreamAtOffset(frame.offset + data_len);
58 if (data_len == 0) {
59 return;
60 }
61 }
62
63 IOVector data;
64 data.AppendIovec(frame.data.iovec(), frame.data.Size());
65
66 // If the frame has arrived in-order then we can process it immediately, only
67 // buffering if the stream is unable to process it.
68 if (!blocked_ && byte_offset == num_bytes_consumed_) {
69 DVLOG(1) << "Processing byte offset " << byte_offset;
70 size_t bytes_consumed = 0;
71 for (size_t i = 0; i < data.Size(); ++i) {
72 bytes_consumed += stream_->ProcessRawData(
73 static_cast<char*>(data.iovec()[i].iov_base),
74 data.iovec()[i].iov_len);
75 }
76 num_bytes_consumed_ += bytes_consumed;
77 stream_->AddBytesConsumed(bytes_consumed);
78
79 if (MaybeCloseStream()) {
80 return;
81 }
82 if (bytes_consumed > data_len) {
83 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
84 return;
85 } else if (bytes_consumed == data_len) {
86 FlushBufferedFrames();
87 return; // it's safe to ack this frame.
88 } else {
89 // Set ourselves up to buffer what's left.
90 data_len -= bytes_consumed;
91 data.Consume(bytes_consumed);
92 byte_offset += bytes_consumed;
93 }
94 }
95
96 // Buffer any remaining data to be consumed by the stream when ready.
97 for (size_t i = 0; i < data.Size(); ++i) {
98 DVLOG(1) << "Buffering stream data at offset " << byte_offset;
99 const iovec& iov = data.iovec()[i];
100 buffered_frames_.insert(make_pair(
101 byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
102 byte_offset += iov.iov_len;
103 num_bytes_buffered_ += iov.iov_len;
104 }
105 return;
106 }
107
CloseStreamAtOffset(QuicStreamOffset offset)108 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
109 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
110
111 // If we have a scheduled termination or close, any new offset should match
112 // it.
113 if (close_offset_ != kMaxOffset && offset != close_offset_) {
114 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
115 return;
116 }
117
118 close_offset_ = offset;
119
120 MaybeCloseStream();
121 }
122
MaybeCloseStream()123 bool QuicStreamSequencer::MaybeCloseStream() {
124 if (!blocked_ && IsClosed()) {
125 DVLOG(1) << "Passing up termination, as we've processed "
126 << num_bytes_consumed_ << " of " << close_offset_
127 << " bytes.";
128 // Technically it's an error if num_bytes_consumed isn't exactly
129 // equal, but error handling seems silly at this point.
130 stream_->OnFinRead();
131 buffered_frames_.clear();
132 num_bytes_buffered_ = 0;
133 return true;
134 }
135 return false;
136 }
137
GetReadableRegions(iovec * iov,size_t iov_len)138 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
139 DCHECK(!blocked_);
140 FrameMap::iterator it = buffered_frames_.begin();
141 size_t index = 0;
142 QuicStreamOffset offset = num_bytes_consumed_;
143 while (it != buffered_frames_.end() && index < iov_len) {
144 if (it->first != offset) return index;
145
146 iov[index].iov_base = static_cast<void*>(
147 const_cast<char*>(it->second.data()));
148 iov[index].iov_len = it->second.size();
149 offset += it->second.size();
150
151 ++index;
152 ++it;
153 }
154 return index;
155 }
156
Readv(const struct iovec * iov,size_t iov_len)157 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
158 DCHECK(!blocked_);
159 FrameMap::iterator it = buffered_frames_.begin();
160 size_t iov_index = 0;
161 size_t iov_offset = 0;
162 size_t frame_offset = 0;
163 size_t initial_bytes_consumed = num_bytes_consumed_;
164
165 while (iov_index < iov_len &&
166 it != buffered_frames_.end() &&
167 it->first == num_bytes_consumed_) {
168 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
169 it->second.size() - frame_offset);
170
171 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
172 memcpy(iov_ptr,
173 it->second.data() + frame_offset, bytes_to_read);
174 frame_offset += bytes_to_read;
175 iov_offset += bytes_to_read;
176
177 if (iov[iov_index].iov_len == iov_offset) {
178 // We've filled this buffer.
179 iov_offset = 0;
180 ++iov_index;
181 }
182 if (it->second.size() == frame_offset) {
183 // We've copied this whole frame
184 RecordBytesConsumed(it->second.size());
185 buffered_frames_.erase(it);
186 it = buffered_frames_.begin();
187 frame_offset = 0;
188 }
189 }
190 // We've finished copying. If we have a partial frame, update it.
191 if (frame_offset != 0) {
192 buffered_frames_.insert(
193 make_pair(it->first + frame_offset, it->second.substr(frame_offset)));
194 buffered_frames_.erase(buffered_frames_.begin());
195 RecordBytesConsumed(frame_offset);
196 }
197 return num_bytes_consumed_ - initial_bytes_consumed;
198 }
199
HasBytesToRead() const200 bool QuicStreamSequencer::HasBytesToRead() const {
201 FrameMap::const_iterator it = buffered_frames_.begin();
202
203 return it != buffered_frames_.end() && it->first == num_bytes_consumed_;
204 }
205
IsClosed() const206 bool QuicStreamSequencer::IsClosed() const {
207 return num_bytes_consumed_ >= close_offset_;
208 }
209
FrameOverlapsBufferedData(const QuicStreamFrame & frame) const210 bool QuicStreamSequencer::FrameOverlapsBufferedData(
211 const QuicStreamFrame& frame) const {
212 if (buffered_frames_.empty()) {
213 return false;
214 }
215
216 FrameMap::const_iterator next_frame =
217 buffered_frames_.lower_bound(frame.offset);
218 // Duplicate frames should have been dropped in IsDuplicate.
219 DCHECK(next_frame == buffered_frames_.end() ||
220 next_frame->first != frame.offset);
221
222 // If there is a buffered frame with a higher starting offset, then we check
223 // to see if the new frame runs into the higher frame.
224 if (next_frame != buffered_frames_.end() &&
225 (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) {
226 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + "
227 << frame.data.TotalBufferSize() << " > " << next_frame->first;
228 return true;
229 }
230
231 // If there is a buffered frame with a lower starting offset, then we check
232 // to see if the buffered frame runs into the new frame.
233 if (next_frame != buffered_frames_.begin()) {
234 FrameMap::const_iterator preceeding_frame = --next_frame;
235 QuicStreamOffset offset = preceeding_frame->first;
236 uint64 data_length = preceeding_frame->second.length();
237 if ((offset + data_length) > frame.offset) {
238 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + "
239 << data_length << " > " << frame.offset;
240 return true;
241 }
242 }
243 return false;
244 }
245
IsDuplicate(const QuicStreamFrame & frame) const246 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
247 // A frame is duplicate if the frame offset is smaller than our bytes consumed
248 // or we have stored the frame in our map.
249 // TODO(pwestin): Is it possible that a new frame contain more data even if
250 // the offset is the same?
251 return frame.offset < num_bytes_consumed_ ||
252 buffered_frames_.find(frame.offset) != buffered_frames_.end();
253 }
254
SetBlockedUntilFlush()255 void QuicStreamSequencer::SetBlockedUntilFlush() {
256 blocked_ = true;
257 }
258
FlushBufferedFrames()259 void QuicStreamSequencer::FlushBufferedFrames() {
260 blocked_ = false;
261 FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_);
262 while (it != buffered_frames_.end()) {
263 DVLOG(1) << "Flushing buffered packet at offset " << it->first;
264 string* data = &it->second;
265 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
266 data->size());
267 RecordBytesConsumed(bytes_consumed);
268 if (MaybeCloseStream()) {
269 return;
270 }
271 if (bytes_consumed > data->size()) {
272 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error
273 return;
274 } else if (bytes_consumed == data->size()) {
275 buffered_frames_.erase(it);
276 it = buffered_frames_.find(num_bytes_consumed_);
277 } else {
278 string new_data = it->second.substr(bytes_consumed);
279 buffered_frames_.erase(it);
280 buffered_frames_.insert(make_pair(num_bytes_consumed_, new_data));
281 return;
282 }
283 }
284 MaybeCloseStream();
285 }
286
RecordBytesConsumed(size_t bytes_consumed)287 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
288 num_bytes_consumed_ += bytes_consumed;
289 num_bytes_buffered_ -= bytes_consumed;
290
291 stream_->AddBytesConsumed(bytes_consumed);
292 }
293
294 } // namespace net
295