1 // Copyright 2013 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_write_queue.h"
6
7 #include <cstddef>
8 #include <utility>
9 #include <vector>
10
11 #include "base/check_op.h"
12 #include "base/containers/circular_deque.h"
13 #include "base/trace_event/memory_usage_estimator.h"
14 #include "net/spdy/spdy_buffer.h"
15 #include "net/spdy/spdy_buffer_producer.h"
16 #include "net/spdy/spdy_stream.h"
17
18 namespace net {
19
IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type)20 bool IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type) {
21 return frame_type == spdy::SpdyFrameType::RST_STREAM ||
22 frame_type == spdy::SpdyFrameType::SETTINGS ||
23 frame_type == spdy::SpdyFrameType::WINDOW_UPDATE ||
24 frame_type == spdy::SpdyFrameType::PING ||
25 frame_type == spdy::SpdyFrameType::GOAWAY;
26 }
27
28 SpdyWriteQueue::PendingWrite::PendingWrite() = default;
29
PendingWrite(spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const MutableNetworkTrafficAnnotationTag & traffic_annotation)30 SpdyWriteQueue::PendingWrite::PendingWrite(
31 spdy::SpdyFrameType frame_type,
32 std::unique_ptr<SpdyBufferProducer> frame_producer,
33 const base::WeakPtr<SpdyStream>& stream,
34 const MutableNetworkTrafficAnnotationTag& traffic_annotation)
35 : frame_type(frame_type),
36 frame_producer(std::move(frame_producer)),
37 stream(stream),
38 traffic_annotation(traffic_annotation),
39 has_stream(stream.get() != nullptr) {}
40
41 SpdyWriteQueue::PendingWrite::~PendingWrite() = default;
42
43 SpdyWriteQueue::PendingWrite::PendingWrite(PendingWrite&& other) = default;
44 SpdyWriteQueue::PendingWrite& SpdyWriteQueue::PendingWrite::operator=(
45 PendingWrite&& other) = default;
46
47 SpdyWriteQueue::SpdyWriteQueue() = default;
48
~SpdyWriteQueue()49 SpdyWriteQueue::~SpdyWriteQueue() {
50 DCHECK_GE(num_queued_capped_frames_, 0);
51 Clear();
52 }
53
IsEmpty() const54 bool SpdyWriteQueue::IsEmpty() const {
55 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) {
56 if (!queue_[i].empty())
57 return false;
58 }
59 return true;
60 }
61
Enqueue(RequestPriority priority,spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const NetworkTrafficAnnotationTag & traffic_annotation)62 void SpdyWriteQueue::Enqueue(
63 RequestPriority priority,
64 spdy::SpdyFrameType frame_type,
65 std::unique_ptr<SpdyBufferProducer> frame_producer,
66 const base::WeakPtr<SpdyStream>& stream,
67 const NetworkTrafficAnnotationTag& traffic_annotation) {
68 CHECK(!removing_writes_);
69 CHECK_GE(priority, MINIMUM_PRIORITY);
70 CHECK_LE(priority, MAXIMUM_PRIORITY);
71 if (stream.get())
72 DCHECK_EQ(stream->priority(), priority);
73 queue_[priority].push_back(
74 {frame_type, std::move(frame_producer), stream,
75 MutableNetworkTrafficAnnotationTag(traffic_annotation)});
76 if (IsSpdyFrameTypeWriteCapped(frame_type)) {
77 DCHECK_GE(num_queued_capped_frames_, 0);
78 num_queued_capped_frames_++;
79 }
80 }
81
Dequeue(spdy::SpdyFrameType * frame_type,std::unique_ptr<SpdyBufferProducer> * frame_producer,base::WeakPtr<SpdyStream> * stream,MutableNetworkTrafficAnnotationTag * traffic_annotation)82 bool SpdyWriteQueue::Dequeue(
83 spdy::SpdyFrameType* frame_type,
84 std::unique_ptr<SpdyBufferProducer>* frame_producer,
85 base::WeakPtr<SpdyStream>* stream,
86 MutableNetworkTrafficAnnotationTag* traffic_annotation) {
87 CHECK(!removing_writes_);
88 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
89 if (!queue_[i].empty()) {
90 PendingWrite pending_write = std::move(queue_[i].front());
91 queue_[i].pop_front();
92 *frame_type = pending_write.frame_type;
93 *frame_producer = std::move(pending_write.frame_producer);
94 *stream = pending_write.stream;
95 *traffic_annotation = pending_write.traffic_annotation;
96 if (pending_write.has_stream)
97 DCHECK(stream->get());
98 if (IsSpdyFrameTypeWriteCapped(*frame_type)) {
99 num_queued_capped_frames_--;
100 DCHECK_GE(num_queued_capped_frames_, 0);
101 }
102 return true;
103 }
104 }
105 return false;
106 }
107
RemovePendingWritesForStream(SpdyStream * stream)108 void SpdyWriteQueue::RemovePendingWritesForStream(SpdyStream* stream) {
109 CHECK(!removing_writes_);
110 removing_writes_ = true;
111 RequestPriority priority = stream->priority();
112 CHECK_GE(priority, MINIMUM_PRIORITY);
113 CHECK_LE(priority, MAXIMUM_PRIORITY);
114
115 #if DCHECK_IS_ON()
116 // |stream| should not have pending writes in a queue not matching
117 // its priority.
118 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
119 if (priority == i)
120 continue;
121 for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
122 DCHECK_NE(it->stream.get(), stream);
123 }
124 #endif
125
126 // Defer deletion until queue iteration is complete, as
127 // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
128 std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
129 base::circular_deque<PendingWrite>& queue = queue_[priority];
130 for (auto it = queue.begin(); it != queue.end();) {
131 if (it->stream.get() == stream) {
132 if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
133 num_queued_capped_frames_--;
134 DCHECK_GE(num_queued_capped_frames_, 0);
135 }
136 erased_buffer_producers.push_back(std::move(it->frame_producer));
137 it = queue.erase(it);
138 } else {
139 ++it;
140 }
141 }
142 removing_writes_ = false;
143
144 // Iteration on |queue| is completed. Now |erased_buffer_producers| goes out
145 // of scope, SpdyBufferProducers are destroyed.
146 }
147
RemovePendingWritesForStreamsAfter(spdy::SpdyStreamId last_good_stream_id)148 void SpdyWriteQueue::RemovePendingWritesForStreamsAfter(
149 spdy::SpdyStreamId last_good_stream_id) {
150 CHECK(!removing_writes_);
151 removing_writes_ = true;
152
153 // Defer deletion until queue iteration is complete, as
154 // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
155 std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
156 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
157 base::circular_deque<PendingWrite>& queue = queue_[i];
158 for (auto it = queue.begin(); it != queue.end();) {
159 if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id ||
160 it->stream->stream_id() == 0)) {
161 if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
162 num_queued_capped_frames_--;
163 DCHECK_GE(num_queued_capped_frames_, 0);
164 }
165 erased_buffer_producers.push_back(std::move(it->frame_producer));
166 it = queue.erase(it);
167 } else {
168 ++it;
169 }
170 }
171 }
172 removing_writes_ = false;
173
174 // Iteration on each |queue| is completed. Now |erased_buffer_producers| goes
175 // out of scope, SpdyBufferProducers are destroyed.
176 }
177
ChangePriorityOfWritesForStream(SpdyStream * stream,RequestPriority old_priority,RequestPriority new_priority)178 void SpdyWriteQueue::ChangePriorityOfWritesForStream(
179 SpdyStream* stream,
180 RequestPriority old_priority,
181 RequestPriority new_priority) {
182 CHECK(!removing_writes_);
183 DCHECK(stream);
184
185 #if DCHECK_IS_ON()
186 // |stream| should not have pending writes in a queue not matching
187 // |old_priority|.
188 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
189 if (i == old_priority)
190 continue;
191 for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
192 DCHECK_NE(it->stream.get(), stream);
193 }
194 #endif
195
196 base::circular_deque<PendingWrite>& old_queue = queue_[old_priority];
197 base::circular_deque<PendingWrite>& new_queue = queue_[new_priority];
198 for (auto it = old_queue.begin(); it != old_queue.end();) {
199 if (it->stream.get() == stream) {
200 new_queue.push_back(std::move(*it));
201 it = old_queue.erase(it);
202 } else {
203 ++it;
204 }
205 }
206 }
207
Clear()208 void SpdyWriteQueue::Clear() {
209 CHECK(!removing_writes_);
210 removing_writes_ = true;
211 std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
212
213 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
214 for (auto& pending_write : queue_[i]) {
215 erased_buffer_producers.push_back(
216 std::move(pending_write.frame_producer));
217 }
218 queue_[i].clear();
219 }
220 removing_writes_ = false;
221 num_queued_capped_frames_ = 0;
222 }
223
224 } // namespace net
225