1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_data_stream.h"
6
7 #include "base/logging.h"
8 #include "net/quic/quic_session.h"
9 #include "net/quic/quic_spdy_decompressor.h"
10 #include "net/spdy/write_blocked_list.h"
11
12 using base::StringPiece;
13 using std::min;
14
15 namespace net {
16
17 #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ")
18
19 namespace {
20
21 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail
22 // to set a priority client-side, or cancel a stream before stripping the
23 // priority from the wire server-side. In either case, start out with a
24 // priority in the middle.
25 QuicPriority kDefaultPriority = 3;
26
27 // Appends bytes from data into partial_data_buffer. Once partial_data_buffer
28 // reaches 4 bytes, copies the data into 'result' and clears
29 // partial_data_buffer.
30 // Returns the number of bytes consumed.
StripUint32(const char * data,uint32 data_len,string * partial_data_buffer,uint32 * result)31 uint32 StripUint32(const char* data, uint32 data_len,
32 string* partial_data_buffer,
33 uint32* result) {
34 DCHECK_GT(4u, partial_data_buffer->length());
35 size_t missing_size = 4 - partial_data_buffer->length();
36 if (data_len < missing_size) {
37 StringPiece(data, data_len).AppendToString(partial_data_buffer);
38 return data_len;
39 }
40 StringPiece(data, missing_size).AppendToString(partial_data_buffer);
41 DCHECK_EQ(4u, partial_data_buffer->length());
42 memcpy(result, partial_data_buffer->data(), 4);
43 partial_data_buffer->clear();
44 return missing_size;
45 }
46
47 } // namespace
48
QuicDataStream(QuicStreamId id,QuicSession * session)49 QuicDataStream::QuicDataStream(QuicStreamId id,
50 QuicSession* session)
51 : ReliableQuicStream(id, session),
52 visitor_(NULL),
53 headers_decompressed_(false),
54 priority_(kDefaultPriority),
55 headers_id_(0),
56 decompression_failed_(false),
57 priority_parsed_(false) {
58 DCHECK_NE(kCryptoStreamId, id);
59 }
60
~QuicDataStream()61 QuicDataStream::~QuicDataStream() {
62 }
63
Readv(const struct iovec * iov,size_t iov_len)64 size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) {
65 if (FinishedReadingHeaders()) {
66 // If the headers have been read, simply delegate to the sequencer's
67 // Readv method.
68 return sequencer()->Readv(iov, iov_len);
69 }
70 // Otherwise, copy decompressed header data into |iov|.
71 size_t bytes_consumed = 0;
72 size_t iov_index = 0;
73 while (iov_index < iov_len &&
74 decompressed_headers_.length() > bytes_consumed) {
75 size_t bytes_to_read = min(iov[iov_index].iov_len,
76 decompressed_headers_.length() - bytes_consumed);
77 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
78 memcpy(iov_ptr,
79 decompressed_headers_.data() + bytes_consumed, bytes_to_read);
80 bytes_consumed += bytes_to_read;
81 ++iov_index;
82 }
83 decompressed_headers_.erase(0, bytes_consumed);
84 return bytes_consumed;
85 }
86
GetReadableRegions(iovec * iov,size_t iov_len)87 int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) {
88 if (FinishedReadingHeaders()) {
89 return sequencer()->GetReadableRegions(iov, iov_len);
90 }
91 if (iov_len == 0) {
92 return 0;
93 }
94 iov[0].iov_base = static_cast<void*>(
95 const_cast<char*>(decompressed_headers_.data()));
96 iov[0].iov_len = decompressed_headers_.length();
97 return 1;
98 }
99
IsDoneReading() const100 bool QuicDataStream::IsDoneReading() const {
101 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
102 return false;
103 }
104 return sequencer()->IsClosed();
105 }
106
HasBytesToRead() const107 bool QuicDataStream::HasBytesToRead() const {
108 return !decompressed_headers_.empty() || sequencer()->HasBytesToRead();
109 }
110
set_priority(QuicPriority priority)111 void QuicDataStream::set_priority(QuicPriority priority) {
112 DCHECK_EQ(0u, stream_bytes_written());
113 priority_ = priority;
114 }
115
EffectivePriority() const116 QuicPriority QuicDataStream::EffectivePriority() const {
117 return priority();
118 }
119
ProcessRawData(const char * data,uint32 data_len)120 uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) {
121 DCHECK_NE(0u, data_len);
122
123 uint32 total_bytes_consumed = 0;
124 if (headers_id_ == 0u) {
125 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
126 data += total_bytes_consumed;
127 data_len -= total_bytes_consumed;
128 if (data_len == 0 || total_bytes_consumed == 0) {
129 return total_bytes_consumed;
130 }
131 }
132 DCHECK_NE(0u, headers_id_);
133
134 // Once the headers are finished, we simply pass the data through.
135 if (headers_decompressed_) {
136 // Some buffered header data remains.
137 if (!decompressed_headers_.empty()) {
138 ProcessHeaderData();
139 }
140 if (decompressed_headers_.empty()) {
141 DVLOG(1) << "Delegating procesing to ProcessData";
142 total_bytes_consumed += ProcessData(data, data_len);
143 }
144 return total_bytes_consumed;
145 }
146
147 QuicHeaderId current_header_id =
148 session()->decompressor()->current_header_id();
149 // Ensure that this header id looks sane.
150 if (headers_id_ < current_header_id ||
151 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
152 DVLOG(1) << ENDPOINT
153 << "Invalid headers for stream: " << id()
154 << " header_id: " << headers_id_
155 << " current_header_id: " << current_header_id;
156 session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
157 return total_bytes_consumed;
158 }
159
160 // If we are head-of-line blocked on decompression, then back up.
161 if (current_header_id != headers_id_) {
162 session()->MarkDecompressionBlocked(headers_id_, id());
163 DVLOG(1) << ENDPOINT
164 << "Unable to decompress header data for stream: " << id()
165 << " header_id: " << headers_id_;
166 return total_bytes_consumed;
167 }
168
169 // Decompressed data will be delivered to decompressed_headers_.
170 size_t bytes_consumed = session()->decompressor()->DecompressData(
171 StringPiece(data, data_len), this);
172 DCHECK_NE(0u, bytes_consumed);
173 if (bytes_consumed > data_len) {
174 DCHECK(false) << "DecompressData returned illegal value";
175 OnDecompressionError();
176 return total_bytes_consumed;
177 }
178 total_bytes_consumed += bytes_consumed;
179 data += bytes_consumed;
180 data_len -= bytes_consumed;
181
182 if (decompression_failed_) {
183 // The session will have been closed in OnDecompressionError.
184 return total_bytes_consumed;
185 }
186
187 // Headers are complete if the decompressor has moved on to the
188 // next stream.
189 headers_decompressed_ =
190 session()->decompressor()->current_header_id() != headers_id_;
191 if (!headers_decompressed_) {
192 DCHECK_EQ(0u, data_len);
193 }
194
195 ProcessHeaderData();
196
197 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
198 return total_bytes_consumed;
199 }
200
201 // We have processed all of the decompressed data but we might
202 // have some more raw data to process.
203 if (data_len > 0) {
204 total_bytes_consumed += ProcessData(data, data_len);
205 }
206
207 // The sequencer will push any additional buffered frames if this data
208 // has been completely consumed.
209 return total_bytes_consumed;
210 }
211
GetPeerAddress()212 const IPEndPoint& QuicDataStream::GetPeerAddress() {
213 return session()->peer_address();
214 }
215
compressor()216 QuicSpdyCompressor* QuicDataStream::compressor() {
217 return session()->compressor();
218 }
219
GetSSLInfo(SSLInfo * ssl_info)220 bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) {
221 return session()->GetSSLInfo(ssl_info);
222 }
223
ProcessHeaderData()224 uint32 QuicDataStream::ProcessHeaderData() {
225 if (decompressed_headers_.empty()) {
226 return 0;
227 }
228
229 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
230 decompressed_headers_.length());
231 if (bytes_processed == decompressed_headers_.length()) {
232 decompressed_headers_.clear();
233 } else {
234 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
235 }
236 return bytes_processed;
237 }
238
OnDecompressorAvailable()239 void QuicDataStream::OnDecompressorAvailable() {
240 DCHECK_EQ(headers_id_,
241 session()->decompressor()->current_header_id());
242 DCHECK(!headers_decompressed_);
243 DCHECK(!decompression_failed_);
244 DCHECK_EQ(0u, decompressed_headers_.length());
245
246 while (!headers_decompressed_) {
247 struct iovec iovec;
248 if (sequencer()->GetReadableRegions(&iovec, 1) == 0) {
249 return;
250 }
251
252 size_t bytes_consumed = session()->decompressor()->DecompressData(
253 StringPiece(static_cast<char*>(iovec.iov_base),
254 iovec.iov_len),
255 this);
256 DCHECK_LE(bytes_consumed, iovec.iov_len);
257 if (decompression_failed_) {
258 return;
259 }
260 sequencer()->MarkConsumed(bytes_consumed);
261
262 headers_decompressed_ =
263 session()->decompressor()->current_header_id() != headers_id_;
264 }
265
266 // Either the headers are complete, or the all data as been consumed.
267 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
268 if (IsDoneReading()) {
269 OnFinRead();
270 } else if (FinishedReadingHeaders()) {
271 sequencer()->FlushBufferedFrames();
272 }
273 }
274
OnDecompressedData(StringPiece data)275 bool QuicDataStream::OnDecompressedData(StringPiece data) {
276 data.AppendToString(&decompressed_headers_);
277 return true;
278 }
279
OnDecompressionError()280 void QuicDataStream::OnDecompressionError() {
281 DCHECK(!decompression_failed_);
282 decompression_failed_ = true;
283 session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
284 }
285
OnClose()286 void QuicDataStream::OnClose() {
287 ReliableQuicStream::OnClose();
288
289 if (visitor_) {
290 Visitor* visitor = visitor_;
291 // Calling Visitor::OnClose() may result the destruction of the visitor,
292 // so we need to ensure we don't call it again.
293 visitor_ = NULL;
294 visitor->OnClose(this);
295 }
296 }
297
StripPriorityAndHeaderId(const char * data,uint32 data_len)298 uint32 QuicDataStream::StripPriorityAndHeaderId(
299 const char* data, uint32 data_len) {
300 uint32 total_bytes_parsed = 0;
301
302 if (!priority_parsed_ && session()->connection()->is_server()) {
303 QuicPriority temporary_priority = priority_;
304 total_bytes_parsed = StripUint32(
305 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
306 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
307 priority_parsed_ = true;
308
309 // Spdy priorities are inverted, so the highest numerical value is the
310 // lowest legal priority.
311 if (temporary_priority > QuicUtils::LowestPriority()) {
312 session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
313 return 0;
314 }
315 priority_ = temporary_priority;
316 }
317 data += total_bytes_parsed;
318 data_len -= total_bytes_parsed;
319 }
320 if (data_len > 0 && headers_id_ == 0u) {
321 // The headers ID has not yet been read. Strip it from the beginning of
322 // the data stream.
323 total_bytes_parsed += StripUint32(
324 data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
325 }
326 return total_bytes_parsed;
327 }
328
FinishedReadingHeaders()329 bool QuicDataStream::FinishedReadingHeaders() {
330 return headers_decompressed_ && decompressed_headers_.empty();
331 }
332
333 } // namespace net
334