• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/quic/quic_data_stream.h"
6 
7 #include "base/logging.h"
8 #include "net/quic/quic_session.h"
9 #include "net/quic/quic_spdy_decompressor.h"
10 #include "net/spdy/write_blocked_list.h"
11 
12 using base::StringPiece;
13 using std::min;
14 
15 namespace net {
16 
17 #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ")
18 
19 namespace {
20 
21 // This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
22 // to set a priority client-side, or cancel a stream before stripping the
23 // priority from the wire server-side.  In either case, start out with a
24 // priority in the middle.
25 QuicPriority kDefaultPriority = 3;
26 
27 // Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
28 // reaches 4 bytes, copies the data into 'result' and clears
29 // partial_data_buffer.
30 // Returns the number of bytes consumed.
StripUint32(const char * data,uint32 data_len,string * partial_data_buffer,uint32 * result)31 uint32 StripUint32(const char* data, uint32 data_len,
32                    string* partial_data_buffer,
33                    uint32* result) {
34   DCHECK_GT(4u, partial_data_buffer->length());
35   size_t missing_size = 4 - partial_data_buffer->length();
36   if (data_len < missing_size) {
37     StringPiece(data, data_len).AppendToString(partial_data_buffer);
38     return data_len;
39   }
40   StringPiece(data, missing_size).AppendToString(partial_data_buffer);
41   DCHECK_EQ(4u, partial_data_buffer->length());
42   memcpy(result, partial_data_buffer->data(), 4);
43   partial_data_buffer->clear();
44   return missing_size;
45 }
46 
47 }  // namespace
48 
QuicDataStream(QuicStreamId id,QuicSession * session)49 QuicDataStream::QuicDataStream(QuicStreamId id,
50                                QuicSession* session)
51     : ReliableQuicStream(id, session),
52       visitor_(NULL),
53       headers_decompressed_(false),
54       priority_(kDefaultPriority),
55       headers_id_(0),
56       decompression_failed_(false),
57       priority_parsed_(false) {
58   DCHECK_NE(kCryptoStreamId, id);
59 }
60 
~QuicDataStream()61 QuicDataStream::~QuicDataStream() {
62 }
63 
Readv(const struct iovec * iov,size_t iov_len)64 size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) {
65   if (FinishedReadingHeaders()) {
66     // If the headers have been read, simply delegate to the sequencer's
67     // Readv method.
68     return sequencer()->Readv(iov, iov_len);
69   }
70   // Otherwise, copy decompressed header data into |iov|.
71   size_t bytes_consumed = 0;
72   size_t iov_index = 0;
73   while (iov_index < iov_len &&
74          decompressed_headers_.length() > bytes_consumed) {
75     size_t bytes_to_read = min(iov[iov_index].iov_len,
76                                decompressed_headers_.length() - bytes_consumed);
77     char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
78     memcpy(iov_ptr,
79            decompressed_headers_.data() + bytes_consumed, bytes_to_read);
80     bytes_consumed += bytes_to_read;
81     ++iov_index;
82   }
83   decompressed_headers_.erase(0, bytes_consumed);
84   return bytes_consumed;
85 }
86 
GetReadableRegions(iovec * iov,size_t iov_len)87 int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) {
88   if (FinishedReadingHeaders()) {
89     return sequencer()->GetReadableRegions(iov, iov_len);
90   }
91   if (iov_len == 0) {
92     return 0;
93   }
94   iov[0].iov_base = static_cast<void*>(
95       const_cast<char*>(decompressed_headers_.data()));
96   iov[0].iov_len = decompressed_headers_.length();
97   return 1;
98 }
99 
IsDoneReading() const100 bool QuicDataStream::IsDoneReading() const {
101   if (!headers_decompressed_ || !decompressed_headers_.empty()) {
102     return false;
103   }
104   return sequencer()->IsClosed();
105 }
106 
HasBytesToRead() const107 bool QuicDataStream::HasBytesToRead() const {
108   return !decompressed_headers_.empty() || sequencer()->HasBytesToRead();
109 }
110 
set_priority(QuicPriority priority)111 void QuicDataStream::set_priority(QuicPriority priority) {
112   DCHECK_EQ(0u, stream_bytes_written());
113   priority_ = priority;
114 }
115 
EffectivePriority() const116 QuicPriority QuicDataStream::EffectivePriority() const {
117   return priority();
118 }
119 
ProcessRawData(const char * data,uint32 data_len)120 uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) {
121   DCHECK_NE(0u, data_len);
122 
123   uint32 total_bytes_consumed = 0;
124   if (headers_id_ == 0u) {
125     total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
126     data += total_bytes_consumed;
127     data_len -= total_bytes_consumed;
128     if (data_len == 0 || total_bytes_consumed == 0) {
129       return total_bytes_consumed;
130     }
131   }
132   DCHECK_NE(0u, headers_id_);
133 
134   // Once the headers are finished, we simply pass the data through.
135   if (headers_decompressed_) {
136     // Some buffered header data remains.
137     if (!decompressed_headers_.empty()) {
138       ProcessHeaderData();
139     }
140     if (decompressed_headers_.empty()) {
141       DVLOG(1) << "Delegating procesing to ProcessData";
142       total_bytes_consumed += ProcessData(data, data_len);
143     }
144     return total_bytes_consumed;
145   }
146 
147   QuicHeaderId current_header_id =
148       session()->decompressor()->current_header_id();
149   // Ensure that this header id looks sane.
150   if (headers_id_ < current_header_id ||
151       headers_id_ > kMaxHeaderIdDelta + current_header_id) {
152     DVLOG(1) << ENDPOINT
153              << "Invalid headers for stream: " << id()
154              << " header_id: " << headers_id_
155              << " current_header_id: " << current_header_id;
156     session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
157     return total_bytes_consumed;
158   }
159 
160   // If we are head-of-line blocked on decompression, then back up.
161   if (current_header_id != headers_id_) {
162     session()->MarkDecompressionBlocked(headers_id_, id());
163     DVLOG(1) << ENDPOINT
164              << "Unable to decompress header data for stream: " << id()
165              << " header_id: " << headers_id_;
166     return total_bytes_consumed;
167   }
168 
169   // Decompressed data will be delivered to decompressed_headers_.
170   size_t bytes_consumed = session()->decompressor()->DecompressData(
171       StringPiece(data, data_len), this);
172   DCHECK_NE(0u, bytes_consumed);
173   if (bytes_consumed > data_len) {
174     DCHECK(false) << "DecompressData returned illegal value";
175     OnDecompressionError();
176     return total_bytes_consumed;
177   }
178   total_bytes_consumed += bytes_consumed;
179   data += bytes_consumed;
180   data_len -= bytes_consumed;
181 
182   if (decompression_failed_) {
183     // The session will have been closed in OnDecompressionError.
184     return total_bytes_consumed;
185   }
186 
187   // Headers are complete if the decompressor has moved on to the
188   // next stream.
189   headers_decompressed_ =
190       session()->decompressor()->current_header_id() != headers_id_;
191   if (!headers_decompressed_) {
192     DCHECK_EQ(0u, data_len);
193   }
194 
195   ProcessHeaderData();
196 
197   if (!headers_decompressed_ || !decompressed_headers_.empty()) {
198     return total_bytes_consumed;
199   }
200 
201   // We have processed all of the decompressed data but we might
202   // have some more raw data to process.
203   if (data_len > 0) {
204     total_bytes_consumed += ProcessData(data, data_len);
205   }
206 
207   // The sequencer will push any additional buffered frames if this data
208   // has been completely consumed.
209   return total_bytes_consumed;
210 }
211 
GetPeerAddress()212 const IPEndPoint& QuicDataStream::GetPeerAddress() {
213   return session()->peer_address();
214 }
215 
compressor()216 QuicSpdyCompressor* QuicDataStream::compressor() {
217   return session()->compressor();
218 }
219 
GetSSLInfo(SSLInfo * ssl_info)220 bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) {
221   return session()->GetSSLInfo(ssl_info);
222 }
223 
ProcessHeaderData()224 uint32 QuicDataStream::ProcessHeaderData() {
225   if (decompressed_headers_.empty()) {
226     return 0;
227   }
228 
229   size_t bytes_processed = ProcessData(decompressed_headers_.data(),
230                                        decompressed_headers_.length());
231   if (bytes_processed == decompressed_headers_.length()) {
232     decompressed_headers_.clear();
233   } else {
234     decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
235   }
236   return bytes_processed;
237 }
238 
OnDecompressorAvailable()239 void QuicDataStream::OnDecompressorAvailable() {
240   DCHECK_EQ(headers_id_,
241             session()->decompressor()->current_header_id());
242   DCHECK(!headers_decompressed_);
243   DCHECK(!decompression_failed_);
244   DCHECK_EQ(0u, decompressed_headers_.length());
245 
246   while (!headers_decompressed_) {
247     struct iovec iovec;
248     if (sequencer()->GetReadableRegions(&iovec, 1) == 0) {
249       return;
250     }
251 
252     size_t bytes_consumed = session()->decompressor()->DecompressData(
253         StringPiece(static_cast<char*>(iovec.iov_base),
254                     iovec.iov_len),
255         this);
256     DCHECK_LE(bytes_consumed, iovec.iov_len);
257     if (decompression_failed_) {
258       return;
259     }
260     sequencer()->MarkConsumed(bytes_consumed);
261 
262     headers_decompressed_ =
263         session()->decompressor()->current_header_id() != headers_id_;
264   }
265 
266   // Either the headers are complete, or the all data as been consumed.
267   ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
268   if (IsDoneReading()) {
269     OnFinRead();
270   } else if (FinishedReadingHeaders()) {
271     sequencer()->FlushBufferedFrames();
272   }
273 }
274 
OnDecompressedData(StringPiece data)275 bool QuicDataStream::OnDecompressedData(StringPiece data) {
276   data.AppendToString(&decompressed_headers_);
277   return true;
278 }
279 
OnDecompressionError()280 void QuicDataStream::OnDecompressionError() {
281   DCHECK(!decompression_failed_);
282   decompression_failed_ = true;
283   session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
284 }
285 
OnClose()286 void QuicDataStream::OnClose() {
287   ReliableQuicStream::OnClose();
288 
289   if (visitor_) {
290     Visitor* visitor = visitor_;
291     // Calling Visitor::OnClose() may result the destruction of the visitor,
292     // so we need to ensure we don't call it again.
293     visitor_ = NULL;
294     visitor->OnClose(this);
295   }
296 }
297 
StripPriorityAndHeaderId(const char * data,uint32 data_len)298 uint32 QuicDataStream::StripPriorityAndHeaderId(
299     const char* data, uint32 data_len) {
300   uint32 total_bytes_parsed = 0;
301 
302   if (!priority_parsed_ && session()->connection()->is_server()) {
303     QuicPriority temporary_priority = priority_;
304     total_bytes_parsed = StripUint32(
305         data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
306     if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
307       priority_parsed_ = true;
308 
309       // Spdy priorities are inverted, so the highest numerical value is the
310       // lowest legal priority.
311       if (temporary_priority > QuicUtils::LowestPriority()) {
312         session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
313         return 0;
314       }
315       priority_ = temporary_priority;
316     }
317     data += total_bytes_parsed;
318     data_len -= total_bytes_parsed;
319   }
320   if (data_len > 0 && headers_id_ == 0u) {
321     // The headers ID has not yet been read.  Strip it from the beginning of
322     // the data stream.
323     total_bytes_parsed += StripUint32(
324         data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
325   }
326   return total_bytes_parsed;
327 }
328 
FinishedReadingHeaders()329 bool QuicDataStream::FinishedReadingHeaders() {
330   return headers_decompressed_ && decompressed_headers_.empty();
331 }
332 
333 }  // namespace net
334