1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_write_blocked_list.h"
6
7 #include "quiche/quic/platform/api/quic_flag_utils.h"
8 #include "quiche/quic/platform/api/quic_flags.h"
9
10 namespace quic {
11
QuicWriteBlockedList()12 QuicWriteBlockedList::QuicWriteBlockedList()
13 : last_priority_popped_(0),
14 respect_incremental_(
15 GetQuicReloadableFlag(quic_priority_respect_incremental)),
16 disable_batch_write_(GetQuicReloadableFlag(quic_disable_batch_write)) {
17 memset(batch_write_stream_id_, 0, sizeof(batch_write_stream_id_));
18 memset(bytes_left_for_batch_write_, 0, sizeof(bytes_left_for_batch_write_));
19 }
20
ShouldYield(QuicStreamId id) const21 bool QuicWriteBlockedList::ShouldYield(QuicStreamId id) const {
22 for (const auto& stream : static_stream_collection_) {
23 if (stream.id == id) {
24 // Static streams should never yield to data streams, or to lower
25 // priority static stream.
26 return false;
27 }
28 if (stream.is_blocked) {
29 return true; // All data streams yield to static streams.
30 }
31 }
32
33 return priority_write_scheduler_.ShouldYield(id);
34 }
35
PopFront()36 QuicStreamId QuicWriteBlockedList::PopFront() {
37 QuicStreamId static_stream_id;
38 if (static_stream_collection_.UnblockFirstBlocked(&static_stream_id)) {
39 return static_stream_id;
40 }
41
42 const auto [id, priority] =
43 priority_write_scheduler_.PopNextReadyStreamAndPriority();
44 const spdy::SpdyPriority urgency = priority.urgency;
45 const bool incremental = priority.incremental;
46
47 last_priority_popped_ = urgency;
48
49 if (disable_batch_write_) {
50 QUIC_RELOADABLE_FLAG_COUNT_N(quic_disable_batch_write, 1, 3);
51
52 // Writes on incremental streams are not batched. Not setting
53 // `batch_write_stream_id_` if the current write is incremental allows the
54 // write on the last non-incremental stream to continue if only incremental
55 // writes happened within this urgency bucket while that stream had no data
56 // to write.
57 if (!respect_incremental_ || !incremental) {
58 batch_write_stream_id_[urgency] = id;
59 }
60
61 return id;
62 }
63
64 if (!priority_write_scheduler_.HasReadyStreams()) {
65 // If no streams are blocked, don't bother latching. This stream will be
66 // the first popped for its urgency anyway.
67 batch_write_stream_id_[urgency] = 0;
68 } else if (batch_write_stream_id_[urgency] != id) {
69 // If newly latching this batch write stream, let it write 16k.
70 batch_write_stream_id_[urgency] = id;
71 bytes_left_for_batch_write_[urgency] = 16000;
72 }
73
74 return id;
75 }
76
RegisterStream(QuicStreamId stream_id,bool is_static_stream,const QuicStreamPriority & priority)77 void QuicWriteBlockedList::RegisterStream(QuicStreamId stream_id,
78 bool is_static_stream,
79 const QuicStreamPriority& priority) {
80 QUICHE_DCHECK(!priority_write_scheduler_.StreamRegistered(stream_id))
81 << "stream " << stream_id << " already registered";
82 if (is_static_stream) {
83 static_stream_collection_.Register(stream_id);
84 return;
85 }
86
87 priority_write_scheduler_.RegisterStream(stream_id, priority.http());
88 }
89
UnregisterStream(QuicStreamId stream_id)90 void QuicWriteBlockedList::UnregisterStream(QuicStreamId stream_id) {
91 if (static_stream_collection_.Unregister(stream_id)) {
92 return;
93 }
94 priority_write_scheduler_.UnregisterStream(stream_id);
95 }
96
UpdateStreamPriority(QuicStreamId stream_id,const QuicStreamPriority & new_priority)97 void QuicWriteBlockedList::UpdateStreamPriority(
98 QuicStreamId stream_id, const QuicStreamPriority& new_priority) {
99 QUICHE_DCHECK(!static_stream_collection_.IsRegistered(stream_id));
100 priority_write_scheduler_.UpdateStreamPriority(stream_id,
101 new_priority.http());
102 }
103
UpdateBytesForStream(QuicStreamId stream_id,size_t bytes)104 void QuicWriteBlockedList::UpdateBytesForStream(QuicStreamId stream_id,
105 size_t bytes) {
106 if (disable_batch_write_) {
107 QUIC_RELOADABLE_FLAG_COUNT_N(quic_disable_batch_write, 2, 3);
108 return;
109 }
110
111 if (batch_write_stream_id_[last_priority_popped_] == stream_id) {
112 // If this was the last data stream popped by PopFront, update the
113 // bytes remaining in its batch write.
114 bytes_left_for_batch_write_[last_priority_popped_] -=
115 std::min(bytes_left_for_batch_write_[last_priority_popped_], bytes);
116 }
117 }
118
AddStream(QuicStreamId stream_id)119 void QuicWriteBlockedList::AddStream(QuicStreamId stream_id) {
120 if (static_stream_collection_.SetBlocked(stream_id)) {
121 return;
122 }
123
124 if (respect_incremental_) {
125 QUIC_RELOADABLE_FLAG_COUNT(quic_priority_respect_incremental);
126 if (!priority_write_scheduler_.GetStreamPriority(stream_id).incremental) {
127 const bool push_front =
128 stream_id == batch_write_stream_id_[last_priority_popped_];
129 priority_write_scheduler_.MarkStreamReady(stream_id, push_front);
130 return;
131 }
132 }
133
134 if (disable_batch_write_) {
135 QUIC_RELOADABLE_FLAG_COUNT_N(quic_disable_batch_write, 3, 3);
136 priority_write_scheduler_.MarkStreamReady(stream_id,
137 /* push_front = */ false);
138 return;
139 }
140
141 const bool push_front =
142 stream_id == batch_write_stream_id_[last_priority_popped_] &&
143 bytes_left_for_batch_write_[last_priority_popped_] > 0;
144
145 priority_write_scheduler_.MarkStreamReady(stream_id, push_front);
146 }
147
IsStreamBlocked(QuicStreamId stream_id) const148 bool QuicWriteBlockedList::IsStreamBlocked(QuicStreamId stream_id) const {
149 for (const auto& stream : static_stream_collection_) {
150 if (stream.id == stream_id) {
151 return stream.is_blocked;
152 }
153 }
154
155 return priority_write_scheduler_.IsStreamReady(stream_id);
156 }
157
Register(QuicStreamId id)158 void QuicWriteBlockedList::StaticStreamCollection::Register(QuicStreamId id) {
159 QUICHE_DCHECK(!IsRegistered(id));
160 streams_.push_back({id, false});
161 }
162
IsRegistered(QuicStreamId id) const163 bool QuicWriteBlockedList::StaticStreamCollection::IsRegistered(
164 QuicStreamId id) const {
165 for (const auto& stream : streams_) {
166 if (stream.id == id) {
167 return true;
168 }
169 }
170 return false;
171 }
172
Unregister(QuicStreamId id)173 bool QuicWriteBlockedList::StaticStreamCollection::Unregister(QuicStreamId id) {
174 for (auto it = streams_.begin(); it != streams_.end(); ++it) {
175 if (it->id == id) {
176 if (it->is_blocked) {
177 --num_blocked_;
178 }
179 streams_.erase(it);
180 return true;
181 }
182 }
183 return false;
184 }
185
SetBlocked(QuicStreamId id)186 bool QuicWriteBlockedList::StaticStreamCollection::SetBlocked(QuicStreamId id) {
187 for (auto& stream : streams_) {
188 if (stream.id == id) {
189 if (!stream.is_blocked) {
190 stream.is_blocked = true;
191 ++num_blocked_;
192 }
193 return true;
194 }
195 }
196 return false;
197 }
198
UnblockFirstBlocked(QuicStreamId * id)199 bool QuicWriteBlockedList::StaticStreamCollection::UnblockFirstBlocked(
200 QuicStreamId* id) {
201 for (auto& stream : streams_) {
202 if (stream.is_blocked) {
203 --num_blocked_;
204 stream.is_blocked = false;
205 *id = stream.id;
206 return true;
207 }
208 }
209 return false;
210 }
211
212 } // namespace quic
213