• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_write_queue.h"
6 
7 #include <cstddef>
8 #include <vector>
9 
10 #include "base/logging.h"
11 #include "base/stl_util.h"
12 #include "net/spdy/spdy_buffer.h"
13 #include "net/spdy/spdy_buffer_producer.h"
14 #include "net/spdy/spdy_stream.h"
15 
16 namespace net {
17 
PendingWrite()18 SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {}
19 
PendingWrite(SpdyFrameType frame_type,SpdyBufferProducer * frame_producer,const base::WeakPtr<SpdyStream> & stream)20 SpdyWriteQueue::PendingWrite::PendingWrite(
21     SpdyFrameType frame_type,
22     SpdyBufferProducer* frame_producer,
23     const base::WeakPtr<SpdyStream>& stream)
24     : frame_type(frame_type),
25       frame_producer(frame_producer),
26       stream(stream),
27       has_stream(stream.get() != NULL) {}
28 
~PendingWrite()29 SpdyWriteQueue::PendingWrite::~PendingWrite() {}
30 
SpdyWriteQueue()31 SpdyWriteQueue::SpdyWriteQueue() : removing_writes_(false) {}
32 
~SpdyWriteQueue()33 SpdyWriteQueue::~SpdyWriteQueue() {
34   Clear();
35 }
36 
IsEmpty() const37 bool SpdyWriteQueue::IsEmpty() const {
38   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) {
39     if (!queue_[i].empty())
40       return false;
41   }
42   return true;
43 }
44 
Enqueue(RequestPriority priority,SpdyFrameType frame_type,scoped_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream)45 void SpdyWriteQueue::Enqueue(RequestPriority priority,
46                              SpdyFrameType frame_type,
47                              scoped_ptr<SpdyBufferProducer> frame_producer,
48                              const base::WeakPtr<SpdyStream>& stream) {
49   CHECK(!removing_writes_);
50   CHECK_GE(priority, MINIMUM_PRIORITY);
51   CHECK_LE(priority, MAXIMUM_PRIORITY);
52   if (stream.get())
53     DCHECK_EQ(stream->priority(), priority);
54   queue_[priority].push_back(
55       PendingWrite(frame_type, frame_producer.release(), stream));
56 }
57 
Dequeue(SpdyFrameType * frame_type,scoped_ptr<SpdyBufferProducer> * frame_producer,base::WeakPtr<SpdyStream> * stream)58 bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type,
59                              scoped_ptr<SpdyBufferProducer>* frame_producer,
60                              base::WeakPtr<SpdyStream>* stream) {
61   CHECK(!removing_writes_);
62   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
63     if (!queue_[i].empty()) {
64       PendingWrite pending_write = queue_[i].front();
65       queue_[i].pop_front();
66       *frame_type = pending_write.frame_type;
67       frame_producer->reset(pending_write.frame_producer);
68       *stream = pending_write.stream;
69       if (pending_write.has_stream)
70         DCHECK(stream->get());
71       return true;
72     }
73   }
74   return false;
75 }
76 
RemovePendingWritesForStream(const base::WeakPtr<SpdyStream> & stream)77 void SpdyWriteQueue::RemovePendingWritesForStream(
78     const base::WeakPtr<SpdyStream>& stream) {
79   CHECK(!removing_writes_);
80   removing_writes_ = true;
81   RequestPriority priority = stream->priority();
82   CHECK_GE(priority, MINIMUM_PRIORITY);
83   CHECK_LE(priority, MAXIMUM_PRIORITY);
84 
85   DCHECK(stream.get());
86 #if DCHECK_IS_ON
87   // |stream| should not have pending writes in a queue not matching
88   // its priority.
89   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
90     if (priority == i)
91       continue;
92     for (std::deque<PendingWrite>::const_iterator it = queue_[i].begin();
93          it != queue_[i].end(); ++it) {
94       DCHECK_NE(it->stream.get(), stream.get());
95     }
96   }
97 #endif
98 
99   // Defer deletion until queue iteration is complete, as
100   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
101   std::vector<SpdyBufferProducer*> erased_buffer_producers;
102 
103   // Do the actual deletion and removal, preserving FIFO-ness.
104   std::deque<PendingWrite>* queue = &queue_[priority];
105   std::deque<PendingWrite>::iterator out_it = queue->begin();
106   for (std::deque<PendingWrite>::const_iterator it = queue->begin();
107        it != queue->end(); ++it) {
108     if (it->stream.get() == stream.get()) {
109       erased_buffer_producers.push_back(it->frame_producer);
110     } else {
111       *out_it = *it;
112       ++out_it;
113     }
114   }
115   queue->erase(out_it, queue->end());
116   removing_writes_ = false;
117   STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
118 }
119 
RemovePendingWritesForStreamsAfter(SpdyStreamId last_good_stream_id)120 void SpdyWriteQueue::RemovePendingWritesForStreamsAfter(
121     SpdyStreamId last_good_stream_id) {
122   CHECK(!removing_writes_);
123   removing_writes_ = true;
124   std::vector<SpdyBufferProducer*> erased_buffer_producers;
125 
126   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
127     // Do the actual deletion and removal, preserving FIFO-ness.
128     std::deque<PendingWrite>* queue = &queue_[i];
129     std::deque<PendingWrite>::iterator out_it = queue->begin();
130     for (std::deque<PendingWrite>::const_iterator it = queue->begin();
131          it != queue->end(); ++it) {
132       if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id ||
133                                it->stream->stream_id() == 0)) {
134         erased_buffer_producers.push_back(it->frame_producer);
135       } else {
136         *out_it = *it;
137         ++out_it;
138       }
139     }
140     queue->erase(out_it, queue->end());
141   }
142   removing_writes_ = false;
143   STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
144 }
145 
Clear()146 void SpdyWriteQueue::Clear() {
147   CHECK(!removing_writes_);
148   removing_writes_ = true;
149   std::vector<SpdyBufferProducer*> erased_buffer_producers;
150 
151   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
152     for (std::deque<PendingWrite>::iterator it = queue_[i].begin();
153          it != queue_[i].end(); ++it) {
154       erased_buffer_producers.push_back(it->frame_producer);
155     }
156     queue_[i].clear();
157   }
158   removing_writes_ = false;
159   STLDeleteElements(&erased_buffer_producers);  // Invokes callbacks.
160 }
161 
162 }  // namespace net
163