• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/websockets/websocket_throttle.h"
6 
7 #include <algorithm>
8 #include <set>
9 #include <string>
10 #include <utility>
11 
12 #include "base/memory/singleton.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/strings/string_number_conversions.h"
15 #include "base/strings/string_util.h"
16 #include "base/strings/stringprintf.h"
17 #include "net/base/io_buffer.h"
18 #include "net/socket_stream/socket_stream.h"
19 #include "net/websockets/websocket_job.h"
20 
21 namespace net {
22 
23 namespace {
24 
25 const size_t kMaxWebSocketJobsThrottled = 1024;
26 
27 }  // namespace
28 
WebSocketThrottle()29 WebSocketThrottle::WebSocketThrottle() {
30 }
31 
~WebSocketThrottle()32 WebSocketThrottle::~WebSocketThrottle() {
33   DCHECK(queue_.empty());
34   DCHECK(addr_map_.empty());
35 }
36 
37 // static
GetInstance()38 WebSocketThrottle* WebSocketThrottle::GetInstance() {
39   return Singleton<WebSocketThrottle>::get();
40 }
41 
PutInQueue(WebSocketJob * job)42 bool WebSocketThrottle::PutInQueue(WebSocketJob* job) {
43   if (queue_.size() >= kMaxWebSocketJobsThrottled)
44     return false;
45 
46   queue_.push_back(job);
47   const AddressList& address_list = job->address_list();
48   std::set<IPEndPoint> address_set;
49   for (AddressList::const_iterator addr_iter = address_list.begin();
50        addr_iter != address_list.end();
51        ++addr_iter) {
52     const IPEndPoint& address = *addr_iter;
53     // If |address| is already processed, don't do it again.
54     if (!address_set.insert(address).second)
55       continue;
56 
57     ConnectingAddressMap::iterator iter = addr_map_.find(address);
58     if (iter == addr_map_.end()) {
59       ConnectingAddressMap::iterator new_queue =
60           addr_map_.insert(make_pair(address, ConnectingQueue())).first;
61       new_queue->second.push_back(job);
62     } else {
63       DCHECK(!iter->second.empty());
64       iter->second.push_back(job);
65       job->SetWaiting();
66       DVLOG(1) << "Waiting on " << address.ToString();
67     }
68   }
69 
70   return true;
71 }
72 
RemoveFromQueue(WebSocketJob * job)73 void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
74   ConnectingQueue::iterator queue_iter =
75       std::find(queue_.begin(), queue_.end(), job);
76   if (queue_iter == queue_.end())
77     return;
78   queue_.erase(queue_iter);
79 
80   std::set<WebSocketJob*> wakeup_candidates;
81 
82   const AddressList& resolved_address_list = job->address_list();
83   std::set<IPEndPoint> address_set;
84   for (AddressList::const_iterator addr_iter = resolved_address_list.begin();
85        addr_iter != resolved_address_list.end();
86        ++addr_iter) {
87     const IPEndPoint& address = *addr_iter;
88     // If |address| is already processed, don't do it again.
89     if (!address_set.insert(address).second)
90       continue;
91 
92     ConnectingAddressMap::iterator map_iter = addr_map_.find(address);
93     DCHECK(map_iter != addr_map_.end());
94 
95     ConnectingQueue& per_address_queue = map_iter->second;
96     DCHECK(!per_address_queue.empty());
97     // Job may not be front of the queue if the socket is closed while waiting.
98     ConnectingQueue::iterator per_address_queue_iter =
99         std::find(per_address_queue.begin(), per_address_queue.end(), job);
100     bool was_front = false;
101     if (per_address_queue_iter != per_address_queue.end()) {
102       was_front = (per_address_queue_iter == per_address_queue.begin());
103       per_address_queue.erase(per_address_queue_iter);
104     }
105     if (per_address_queue.empty()) {
106       addr_map_.erase(map_iter);
107     } else if (was_front) {
108       // The new front is a wake-up candidate.
109       wakeup_candidates.insert(per_address_queue.front());
110     }
111   }
112 
113   WakeupSocketIfNecessary(wakeup_candidates);
114 }
115 
WakeupSocketIfNecessary(const std::set<WebSocketJob * > & wakeup_candidates)116 void WebSocketThrottle::WakeupSocketIfNecessary(
117     const std::set<WebSocketJob*>& wakeup_candidates) {
118   for (std::set<WebSocketJob*>::const_iterator iter = wakeup_candidates.begin();
119        iter != wakeup_candidates.end();
120        ++iter) {
121     WebSocketJob* job = *iter;
122     if (!job->IsWaiting())
123       continue;
124 
125     bool should_wakeup = true;
126     const AddressList& resolved_address_list = job->address_list();
127     for (AddressList::const_iterator addr_iter = resolved_address_list.begin();
128          addr_iter != resolved_address_list.end();
129          ++addr_iter) {
130       const IPEndPoint& address = *addr_iter;
131       ConnectingAddressMap::iterator map_iter = addr_map_.find(address);
132       DCHECK(map_iter != addr_map_.end());
133       const ConnectingQueue& per_address_queue = map_iter->second;
134       if (job != per_address_queue.front()) {
135         should_wakeup = false;
136         break;
137       }
138     }
139     if (should_wakeup)
140       job->Wakeup();
141   }
142 }
143 
144 }  // namespace net
145