• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifdef UNSAFE_BUFFERS_BUILD
6 // TODO(crbug.com/40284755): Remove this and spanify to fix the errors.
7 #pragma allow_unsafe_buffers
8 #endif
9 
10 #include "net/spdy/spdy_write_queue.h"
11 
12 #include <cstddef>
13 #include <utility>
14 #include <vector>
15 
16 #include "base/check_op.h"
17 #include "base/containers/circular_deque.h"
18 #include "base/trace_event/memory_usage_estimator.h"
19 #include "net/spdy/spdy_buffer.h"
20 #include "net/spdy/spdy_buffer_producer.h"
21 #include "net/spdy/spdy_stream.h"
22 
23 namespace net {
24 
IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type)25 bool IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type) {
26   return frame_type == spdy::SpdyFrameType::RST_STREAM ||
27          frame_type == spdy::SpdyFrameType::SETTINGS ||
28          frame_type == spdy::SpdyFrameType::WINDOW_UPDATE ||
29          frame_type == spdy::SpdyFrameType::PING ||
30          frame_type == spdy::SpdyFrameType::GOAWAY;
31 }
32 
33 SpdyWriteQueue::PendingWrite::PendingWrite() = default;
34 
PendingWrite(spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const MutableNetworkTrafficAnnotationTag & traffic_annotation)35 SpdyWriteQueue::PendingWrite::PendingWrite(
36     spdy::SpdyFrameType frame_type,
37     std::unique_ptr<SpdyBufferProducer> frame_producer,
38     const base::WeakPtr<SpdyStream>& stream,
39     const MutableNetworkTrafficAnnotationTag& traffic_annotation)
40     : frame_type(frame_type),
41       frame_producer(std::move(frame_producer)),
42       stream(stream),
43       traffic_annotation(traffic_annotation),
44       has_stream(stream.get() != nullptr) {}
45 
46 SpdyWriteQueue::PendingWrite::~PendingWrite() = default;
47 
48 SpdyWriteQueue::PendingWrite::PendingWrite(PendingWrite&& other) = default;
49 SpdyWriteQueue::PendingWrite& SpdyWriteQueue::PendingWrite::operator=(
50     PendingWrite&& other) = default;
51 
52 SpdyWriteQueue::SpdyWriteQueue() = default;
53 
~SpdyWriteQueue()54 SpdyWriteQueue::~SpdyWriteQueue() {
55   DCHECK_GE(num_queued_capped_frames_, 0);
56   Clear();
57 }
58 
IsEmpty() const59 bool SpdyWriteQueue::IsEmpty() const {
60   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) {
61     if (!queue_[i].empty())
62       return false;
63   }
64   return true;
65 }
66 
Enqueue(RequestPriority priority,spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const NetworkTrafficAnnotationTag & traffic_annotation)67 void SpdyWriteQueue::Enqueue(
68     RequestPriority priority,
69     spdy::SpdyFrameType frame_type,
70     std::unique_ptr<SpdyBufferProducer> frame_producer,
71     const base::WeakPtr<SpdyStream>& stream,
72     const NetworkTrafficAnnotationTag& traffic_annotation) {
73   CHECK(!removing_writes_);
74   CHECK_GE(priority, MINIMUM_PRIORITY);
75   CHECK_LE(priority, MAXIMUM_PRIORITY);
76   if (stream.get())
77     DCHECK_EQ(stream->priority(), priority);
78   queue_[priority].push_back(
79       {frame_type, std::move(frame_producer), stream,
80        MutableNetworkTrafficAnnotationTag(traffic_annotation)});
81   if (IsSpdyFrameTypeWriteCapped(frame_type)) {
82     DCHECK_GE(num_queued_capped_frames_, 0);
83     num_queued_capped_frames_++;
84   }
85 }
86 
Dequeue(spdy::SpdyFrameType * frame_type,std::unique_ptr<SpdyBufferProducer> * frame_producer,base::WeakPtr<SpdyStream> * stream,MutableNetworkTrafficAnnotationTag * traffic_annotation)87 bool SpdyWriteQueue::Dequeue(
88     spdy::SpdyFrameType* frame_type,
89     std::unique_ptr<SpdyBufferProducer>* frame_producer,
90     base::WeakPtr<SpdyStream>* stream,
91     MutableNetworkTrafficAnnotationTag* traffic_annotation) {
92   CHECK(!removing_writes_);
93   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
94     if (!queue_[i].empty()) {
95       PendingWrite pending_write = std::move(queue_[i].front());
96       queue_[i].pop_front();
97       *frame_type = pending_write.frame_type;
98       *frame_producer = std::move(pending_write.frame_producer);
99       *stream = pending_write.stream;
100       *traffic_annotation = pending_write.traffic_annotation;
101       if (pending_write.has_stream)
102         DCHECK(stream->get());
103       if (IsSpdyFrameTypeWriteCapped(*frame_type)) {
104         num_queued_capped_frames_--;
105         DCHECK_GE(num_queued_capped_frames_, 0);
106       }
107       return true;
108     }
109   }
110   return false;
111 }
112 
RemovePendingWritesForStream(SpdyStream * stream)113 void SpdyWriteQueue::RemovePendingWritesForStream(SpdyStream* stream) {
114   CHECK(!removing_writes_);
115   removing_writes_ = true;
116   RequestPriority priority = stream->priority();
117   CHECK_GE(priority, MINIMUM_PRIORITY);
118   CHECK_LE(priority, MAXIMUM_PRIORITY);
119 
120 #if DCHECK_IS_ON()
121   // |stream| should not have pending writes in a queue not matching
122   // its priority.
123   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
124     if (priority == i)
125       continue;
126     for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
127       DCHECK_NE(it->stream.get(), stream);
128   }
129 #endif
130 
131   // Defer deletion until queue iteration is complete, as
132   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
133   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
134   base::circular_deque<PendingWrite>& queue = queue_[priority];
135   for (auto it = queue.begin(); it != queue.end();) {
136     if (it->stream.get() == stream) {
137       if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
138         num_queued_capped_frames_--;
139         DCHECK_GE(num_queued_capped_frames_, 0);
140       }
141       erased_buffer_producers.push_back(std::move(it->frame_producer));
142       it = queue.erase(it);
143     } else {
144       ++it;
145     }
146   }
147   removing_writes_ = false;
148 
149   // Iteration on |queue| is completed.  Now |erased_buffer_producers| goes out
150   // of scope, SpdyBufferProducers are destroyed.
151 }
152 
RemovePendingWritesForStreamsAfter(spdy::SpdyStreamId last_good_stream_id)153 void SpdyWriteQueue::RemovePendingWritesForStreamsAfter(
154     spdy::SpdyStreamId last_good_stream_id) {
155   CHECK(!removing_writes_);
156   removing_writes_ = true;
157 
158   // Defer deletion until queue iteration is complete, as
159   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
160   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
161   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
162     base::circular_deque<PendingWrite>& queue = queue_[i];
163     for (auto it = queue.begin(); it != queue.end();) {
164       if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id ||
165                                it->stream->stream_id() == 0)) {
166         if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
167           num_queued_capped_frames_--;
168           DCHECK_GE(num_queued_capped_frames_, 0);
169         }
170         erased_buffer_producers.push_back(std::move(it->frame_producer));
171         it = queue.erase(it);
172       } else {
173         ++it;
174       }
175     }
176   }
177   removing_writes_ = false;
178 
179   // Iteration on each |queue| is completed.  Now |erased_buffer_producers| goes
180   // out of scope, SpdyBufferProducers are destroyed.
181 }
182 
ChangePriorityOfWritesForStream(SpdyStream * stream,RequestPriority old_priority,RequestPriority new_priority)183 void SpdyWriteQueue::ChangePriorityOfWritesForStream(
184     SpdyStream* stream,
185     RequestPriority old_priority,
186     RequestPriority new_priority) {
187   CHECK(!removing_writes_);
188   DCHECK(stream);
189 
190 #if DCHECK_IS_ON()
191   // |stream| should not have pending writes in a queue not matching
192   // |old_priority|.
193   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
194     if (i == old_priority)
195       continue;
196     for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
197       DCHECK_NE(it->stream.get(), stream);
198   }
199 #endif
200 
201   base::circular_deque<PendingWrite>& old_queue = queue_[old_priority];
202   base::circular_deque<PendingWrite>& new_queue = queue_[new_priority];
203   for (auto it = old_queue.begin(); it != old_queue.end();) {
204     if (it->stream.get() == stream) {
205       new_queue.push_back(std::move(*it));
206       it = old_queue.erase(it);
207     } else {
208       ++it;
209     }
210   }
211 }
212 
Clear()213 void SpdyWriteQueue::Clear() {
214   CHECK(!removing_writes_);
215   removing_writes_ = true;
216   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
217 
218   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
219     for (auto& pending_write : queue_[i]) {
220       erased_buffer_producers.push_back(
221           std::move(pending_write.frame_producer));
222     }
223     queue_[i].clear();
224   }
225   removing_writes_ = false;
226   num_queued_capped_frames_ = 0;
227 }
228 
229 }  // namespace net
230