• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/quic/reliable_quic_stream.h"
6 
7 #include "net/quic/quic_session.h"
8 #include "net/quic/quic_spdy_decompressor.h"
9 #include "net/spdy/write_blocked_list.h"
10 
11 using base::StringPiece;
12 using std::min;
13 
14 namespace net {
15 
16 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
17 
18 namespace {
19 
MakeIovec(StringPiece data)20 struct iovec MakeIovec(StringPiece data) {
21   struct iovec iov = {const_cast<char*>(data.data()),
22                       static_cast<size_t>(data.size())};
23   return iov;
24 }
25 
26 }  // namespace
27 
ReliableQuicStream(QuicStreamId id,QuicSession * session)28 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
29                                        QuicSession* session)
30     : sequencer_(this),
31       id_(id),
32       session_(session),
33       stream_bytes_read_(0),
34       stream_bytes_written_(0),
35       stream_error_(QUIC_STREAM_NO_ERROR),
36       connection_error_(QUIC_NO_ERROR),
37       read_side_closed_(false),
38       write_side_closed_(false),
39       fin_buffered_(false),
40       fin_sent_(false),
41       is_server_(session_->is_server()) {
42 }
43 
~ReliableQuicStream()44 ReliableQuicStream::~ReliableQuicStream() {
45 }
46 
WillAcceptStreamFrame(const QuicStreamFrame & frame) const47 bool ReliableQuicStream::WillAcceptStreamFrame(
48     const QuicStreamFrame& frame) const {
49   if (read_side_closed_) {
50     return true;
51   }
52   if (frame.stream_id != id_) {
53     LOG(ERROR) << "Error!";
54     return false;
55   }
56   return sequencer_.WillAcceptStreamFrame(frame);
57 }
58 
OnStreamFrame(const QuicStreamFrame & frame)59 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
60   DCHECK_EQ(frame.stream_id, id_);
61   if (read_side_closed_) {
62     DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
63     // We don't want to be reading: blackhole the data.
64     return true;
65   }
66   // Note: This count include duplicate data received.
67   stream_bytes_read_ += frame.data.TotalBufferSize();
68 
69   bool accepted = sequencer_.OnStreamFrame(frame);
70 
71   return accepted;
72 }
73 
OnStreamReset(QuicRstStreamErrorCode error)74 void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
75   stream_error_ = error;
76   CloseWriteSide();
77   CloseReadSide();
78 }
79 
OnConnectionClosed(QuicErrorCode error,bool from_peer)80 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
81                                             bool from_peer) {
82   if (read_side_closed_ && write_side_closed_) {
83     return;
84   }
85   if (error != QUIC_NO_ERROR) {
86     stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
87     connection_error_ = error;
88   }
89 
90   CloseWriteSide();
91   CloseReadSide();
92 }
93 
OnFinRead()94 void ReliableQuicStream::OnFinRead() {
95   DCHECK(sequencer_.IsClosed());
96   CloseReadSide();
97 }
98 
Reset(QuicRstStreamErrorCode error)99 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
100   DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
101   stream_error_ = error;
102   // Sending a RstStream results in calling CloseStream.
103   session()->SendRstStream(id(), error);
104 }
105 
CloseConnection(QuicErrorCode error)106 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
107   session()->connection()->SendConnectionClose(error);
108 }
109 
CloseConnectionWithDetails(QuicErrorCode error,const string & details)110 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
111                                                     const string& details) {
112   session()->connection()->SendConnectionCloseWithDetails(error, details);
113 }
114 
version()115 QuicVersion ReliableQuicStream::version() {
116   return session()->connection()->version();
117 }
118 
WriteOrBufferData(StringPiece data,bool fin)119 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
120   DCHECK(data.size() > 0 || fin);
121   DCHECK(!fin_buffered_);
122 
123   QuicConsumedData consumed_data(0, false);
124   fin_buffered_ = fin;
125 
126   if (queued_data_.empty()) {
127     struct iovec iov(MakeIovec(data));
128     consumed_data = WritevData(&iov, 1, fin, NULL);
129     DCHECK_LE(consumed_data.bytes_consumed, data.length());
130   }
131 
132   // If there's unconsumed data or an unconsumed fin, queue it.
133   if (consumed_data.bytes_consumed < data.length() ||
134       (fin && !consumed_data.fin_consumed)) {
135     queued_data_.push_back(
136         string(data.data() + consumed_data.bytes_consumed,
137                data.length() - consumed_data.bytes_consumed));
138   }
139 }
140 
OnCanWrite()141 void ReliableQuicStream::OnCanWrite() {
142   bool fin = false;
143   while (!queued_data_.empty()) {
144     const string& data = queued_data_.front();
145     if (queued_data_.size() == 1 && fin_buffered_) {
146       fin = true;
147     }
148     struct iovec iov(MakeIovec(data));
149     QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL);
150     if (consumed_data.bytes_consumed == data.size() &&
151         fin == consumed_data.fin_consumed) {
152       queued_data_.pop_front();
153     } else {
154       queued_data_.front().erase(0, consumed_data.bytes_consumed);
155       break;
156     }
157   }
158 }
159 
WritevData(const struct iovec * iov,int iov_count,bool fin,QuicAckNotifier::DelegateInterface * ack_notifier_delegate)160 QuicConsumedData ReliableQuicStream::WritevData(
161     const struct iovec* iov,
162     int iov_count,
163     bool fin,
164     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
165   if (write_side_closed_) {
166     DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
167     return QuicConsumedData(0, false);
168   }
169 
170   size_t write_length = 0u;
171   for (int i = 0; i < iov_count; ++i) {
172     write_length += iov[i].iov_len;
173   }
174   QuicConsumedData consumed_data = session()->WritevData(
175       id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate);
176   stream_bytes_written_ += consumed_data.bytes_consumed;
177   if (consumed_data.bytes_consumed == write_length) {
178     if (fin && consumed_data.fin_consumed) {
179       fin_sent_ = true;
180       CloseWriteSide();
181     } else if (fin && !consumed_data.fin_consumed) {
182       session_->MarkWriteBlocked(id(), EffectivePriority());
183     }
184   } else {
185     session_->MarkWriteBlocked(id(), EffectivePriority());
186   }
187   return consumed_data;
188 }
189 
CloseReadSide()190 void ReliableQuicStream::CloseReadSide() {
191   if (read_side_closed_) {
192     return;
193   }
194   DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
195 
196   read_side_closed_ = true;
197   if (write_side_closed_) {
198     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
199     session_->CloseStream(id());
200   }
201 }
202 
CloseWriteSide()203 void ReliableQuicStream::CloseWriteSide() {
204   if (write_side_closed_) {
205     return;
206   }
207   DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
208 
209   write_side_closed_ = true;
210   if (read_side_closed_) {
211     DVLOG(1) << ENDPOINT << "Closing stream: " << id();
212     session_->CloseStream(id());
213   }
214 }
215 
HasBufferedData()216 bool ReliableQuicStream::HasBufferedData() {
217   return !queued_data_.empty();
218 }
219 
OnClose()220 void ReliableQuicStream::OnClose() {
221   CloseReadSide();
222   CloseWriteSide();
223 }
224 
225 }  // namespace net
226