• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_write_queue.h"
6 
7 #include <cstddef>
8 #include <utility>
9 #include <vector>
10 
11 #include "base/check_op.h"
12 #include "base/containers/circular_deque.h"
13 #include "base/trace_event/memory_usage_estimator.h"
14 #include "net/spdy/spdy_buffer.h"
15 #include "net/spdy/spdy_buffer_producer.h"
16 #include "net/spdy/spdy_stream.h"
17 
18 namespace net {
19 
IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type)20 bool IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type) {
21   return frame_type == spdy::SpdyFrameType::RST_STREAM ||
22          frame_type == spdy::SpdyFrameType::SETTINGS ||
23          frame_type == spdy::SpdyFrameType::WINDOW_UPDATE ||
24          frame_type == spdy::SpdyFrameType::PING ||
25          frame_type == spdy::SpdyFrameType::GOAWAY;
26 }
27 
28 SpdyWriteQueue::PendingWrite::PendingWrite() = default;
29 
PendingWrite(spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const MutableNetworkTrafficAnnotationTag & traffic_annotation)30 SpdyWriteQueue::PendingWrite::PendingWrite(
31     spdy::SpdyFrameType frame_type,
32     std::unique_ptr<SpdyBufferProducer> frame_producer,
33     const base::WeakPtr<SpdyStream>& stream,
34     const MutableNetworkTrafficAnnotationTag& traffic_annotation)
35     : frame_type(frame_type),
36       frame_producer(std::move(frame_producer)),
37       stream(stream),
38       traffic_annotation(traffic_annotation),
39       has_stream(stream.get() != nullptr) {}
40 
41 SpdyWriteQueue::PendingWrite::~PendingWrite() = default;
42 
43 SpdyWriteQueue::PendingWrite::PendingWrite(PendingWrite&& other) = default;
44 SpdyWriteQueue::PendingWrite& SpdyWriteQueue::PendingWrite::operator=(
45     PendingWrite&& other) = default;
46 
47 SpdyWriteQueue::SpdyWriteQueue() = default;
48 
~SpdyWriteQueue()49 SpdyWriteQueue::~SpdyWriteQueue() {
50   DCHECK_GE(num_queued_capped_frames_, 0);
51   Clear();
52 }
53 
IsEmpty() const54 bool SpdyWriteQueue::IsEmpty() const {
55   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) {
56     if (!queue_[i].empty())
57       return false;
58   }
59   return true;
60 }
61 
Enqueue(RequestPriority priority,spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const NetworkTrafficAnnotationTag & traffic_annotation)62 void SpdyWriteQueue::Enqueue(
63     RequestPriority priority,
64     spdy::SpdyFrameType frame_type,
65     std::unique_ptr<SpdyBufferProducer> frame_producer,
66     const base::WeakPtr<SpdyStream>& stream,
67     const NetworkTrafficAnnotationTag& traffic_annotation) {
68   CHECK(!removing_writes_);
69   CHECK_GE(priority, MINIMUM_PRIORITY);
70   CHECK_LE(priority, MAXIMUM_PRIORITY);
71   if (stream.get())
72     DCHECK_EQ(stream->priority(), priority);
73   queue_[priority].push_back(
74       {frame_type, std::move(frame_producer), stream,
75        MutableNetworkTrafficAnnotationTag(traffic_annotation)});
76   if (IsSpdyFrameTypeWriteCapped(frame_type)) {
77     DCHECK_GE(num_queued_capped_frames_, 0);
78     num_queued_capped_frames_++;
79   }
80 }
81 
Dequeue(spdy::SpdyFrameType * frame_type,std::unique_ptr<SpdyBufferProducer> * frame_producer,base::WeakPtr<SpdyStream> * stream,MutableNetworkTrafficAnnotationTag * traffic_annotation)82 bool SpdyWriteQueue::Dequeue(
83     spdy::SpdyFrameType* frame_type,
84     std::unique_ptr<SpdyBufferProducer>* frame_producer,
85     base::WeakPtr<SpdyStream>* stream,
86     MutableNetworkTrafficAnnotationTag* traffic_annotation) {
87   CHECK(!removing_writes_);
88   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
89     if (!queue_[i].empty()) {
90       PendingWrite pending_write = std::move(queue_[i].front());
91       queue_[i].pop_front();
92       *frame_type = pending_write.frame_type;
93       *frame_producer = std::move(pending_write.frame_producer);
94       *stream = pending_write.stream;
95       *traffic_annotation = pending_write.traffic_annotation;
96       if (pending_write.has_stream)
97         DCHECK(stream->get());
98       if (IsSpdyFrameTypeWriteCapped(*frame_type)) {
99         num_queued_capped_frames_--;
100         DCHECK_GE(num_queued_capped_frames_, 0);
101       }
102       return true;
103     }
104   }
105   return false;
106 }
107 
RemovePendingWritesForStream(SpdyStream * stream)108 void SpdyWriteQueue::RemovePendingWritesForStream(SpdyStream* stream) {
109   CHECK(!removing_writes_);
110   removing_writes_ = true;
111   RequestPriority priority = stream->priority();
112   CHECK_GE(priority, MINIMUM_PRIORITY);
113   CHECK_LE(priority, MAXIMUM_PRIORITY);
114 
115 #if DCHECK_IS_ON()
116   // |stream| should not have pending writes in a queue not matching
117   // its priority.
118   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
119     if (priority == i)
120       continue;
121     for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
122       DCHECK_NE(it->stream.get(), stream);
123   }
124 #endif
125 
126   // Defer deletion until queue iteration is complete, as
127   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
128   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
129   base::circular_deque<PendingWrite>& queue = queue_[priority];
130   for (auto it = queue.begin(); it != queue.end();) {
131     if (it->stream.get() == stream) {
132       if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
133         num_queued_capped_frames_--;
134         DCHECK_GE(num_queued_capped_frames_, 0);
135       }
136       erased_buffer_producers.push_back(std::move(it->frame_producer));
137       it = queue.erase(it);
138     } else {
139       ++it;
140     }
141   }
142   removing_writes_ = false;
143 
144   // Iteration on |queue| is completed.  Now |erased_buffer_producers| goes out
145   // of scope, SpdyBufferProducers are destroyed.
146 }
147 
RemovePendingWritesForStreamsAfter(spdy::SpdyStreamId last_good_stream_id)148 void SpdyWriteQueue::RemovePendingWritesForStreamsAfter(
149     spdy::SpdyStreamId last_good_stream_id) {
150   CHECK(!removing_writes_);
151   removing_writes_ = true;
152 
153   // Defer deletion until queue iteration is complete, as
154   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
155   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
156   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
157     base::circular_deque<PendingWrite>& queue = queue_[i];
158     for (auto it = queue.begin(); it != queue.end();) {
159       if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id ||
160                                it->stream->stream_id() == 0)) {
161         if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
162           num_queued_capped_frames_--;
163           DCHECK_GE(num_queued_capped_frames_, 0);
164         }
165         erased_buffer_producers.push_back(std::move(it->frame_producer));
166         it = queue.erase(it);
167       } else {
168         ++it;
169       }
170     }
171   }
172   removing_writes_ = false;
173 
174   // Iteration on each |queue| is completed.  Now |erased_buffer_producers| goes
175   // out of scope, SpdyBufferProducers are destroyed.
176 }
177 
ChangePriorityOfWritesForStream(SpdyStream * stream,RequestPriority old_priority,RequestPriority new_priority)178 void SpdyWriteQueue::ChangePriorityOfWritesForStream(
179     SpdyStream* stream,
180     RequestPriority old_priority,
181     RequestPriority new_priority) {
182   CHECK(!removing_writes_);
183   DCHECK(stream);
184 
185 #if DCHECK_IS_ON()
186   // |stream| should not have pending writes in a queue not matching
187   // |old_priority|.
188   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
189     if (i == old_priority)
190       continue;
191     for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
192       DCHECK_NE(it->stream.get(), stream);
193   }
194 #endif
195 
196   base::circular_deque<PendingWrite>& old_queue = queue_[old_priority];
197   base::circular_deque<PendingWrite>& new_queue = queue_[new_priority];
198   for (auto it = old_queue.begin(); it != old_queue.end();) {
199     if (it->stream.get() == stream) {
200       new_queue.push_back(std::move(*it));
201       it = old_queue.erase(it);
202     } else {
203       ++it;
204     }
205   }
206 }
207 
Clear()208 void SpdyWriteQueue::Clear() {
209   CHECK(!removing_writes_);
210   removing_writes_ = true;
211   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
212 
213   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
214     for (auto& pending_write : queue_[i]) {
215       erased_buffer_producers.push_back(
216           std::move(pending_write.frame_producer));
217     }
218     queue_[i].clear();
219   }
220   removing_writes_ = false;
221   num_queued_capped_frames_ = 0;
222 }
223 
224 }  // namespace net
225