1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/websockets/websocket_throttle.h" 6 7 #include <algorithm> 8 #include <set> 9 #include <string> 10 #include <utility> 11 12 #include "base/memory/singleton.h" 13 #include "base/message_loop/message_loop.h" 14 #include "base/strings/string_number_conversions.h" 15 #include "base/strings/string_util.h" 16 #include "base/strings/stringprintf.h" 17 #include "net/base/io_buffer.h" 18 #include "net/socket_stream/socket_stream.h" 19 #include "net/websockets/websocket_job.h" 20 21 namespace net { 22 23 namespace { 24 25 const size_t kMaxWebSocketJobsThrottled = 1024; 26 27 } // namespace 28 WebSocketThrottle()29WebSocketThrottle::WebSocketThrottle() { 30 } 31 ~WebSocketThrottle()32WebSocketThrottle::~WebSocketThrottle() { 33 DCHECK(queue_.empty()); 34 DCHECK(addr_map_.empty()); 35 } 36 37 // static GetInstance()38WebSocketThrottle* WebSocketThrottle::GetInstance() { 39 return Singleton<WebSocketThrottle>::get(); 40 } 41 PutInQueue(WebSocketJob * job)42bool WebSocketThrottle::PutInQueue(WebSocketJob* job) { 43 if (queue_.size() >= kMaxWebSocketJobsThrottled) 44 return false; 45 46 queue_.push_back(job); 47 const AddressList& address_list = job->address_list(); 48 std::set<IPEndPoint> address_set; 49 for (AddressList::const_iterator addr_iter = address_list.begin(); 50 addr_iter != address_list.end(); 51 ++addr_iter) { 52 const IPEndPoint& address = *addr_iter; 53 // If |address| is already processed, don't do it again. 54 if (!address_set.insert(address).second) 55 continue; 56 57 ConnectingAddressMap::iterator iter = addr_map_.find(address); 58 if (iter == addr_map_.end()) { 59 ConnectingAddressMap::iterator new_queue = 60 addr_map_.insert(make_pair(address, ConnectingQueue())).first; 61 new_queue->second.push_back(job); 62 } else { 63 DCHECK(!iter->second.empty()); 64 iter->second.push_back(job); 65 job->SetWaiting(); 66 DVLOG(1) << "Waiting on " << address.ToString(); 67 } 68 } 69 70 return true; 71 } 72 RemoveFromQueue(WebSocketJob * job)73void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { 74 ConnectingQueue::iterator queue_iter = 75 std::find(queue_.begin(), queue_.end(), job); 76 if (queue_iter == queue_.end()) 77 return; 78 queue_.erase(queue_iter); 79 80 std::set<WebSocketJob*> wakeup_candidates; 81 82 const AddressList& resolved_address_list = job->address_list(); 83 std::set<IPEndPoint> address_set; 84 for (AddressList::const_iterator addr_iter = resolved_address_list.begin(); 85 addr_iter != resolved_address_list.end(); 86 ++addr_iter) { 87 const IPEndPoint& address = *addr_iter; 88 // If |address| is already processed, don't do it again. 89 if (!address_set.insert(address).second) 90 continue; 91 92 ConnectingAddressMap::iterator map_iter = addr_map_.find(address); 93 DCHECK(map_iter != addr_map_.end()); 94 95 ConnectingQueue& per_address_queue = map_iter->second; 96 DCHECK(!per_address_queue.empty()); 97 // Job may not be front of the queue if the socket is closed while waiting. 98 ConnectingQueue::iterator per_address_queue_iter = 99 std::find(per_address_queue.begin(), per_address_queue.end(), job); 100 bool was_front = false; 101 if (per_address_queue_iter != per_address_queue.end()) { 102 was_front = (per_address_queue_iter == per_address_queue.begin()); 103 per_address_queue.erase(per_address_queue_iter); 104 } 105 if (per_address_queue.empty()) { 106 addr_map_.erase(map_iter); 107 } else if (was_front) { 108 // The new front is a wake-up candidate. 109 wakeup_candidates.insert(per_address_queue.front()); 110 } 111 } 112 113 WakeupSocketIfNecessary(wakeup_candidates); 114 } 115 WakeupSocketIfNecessary(const std::set<WebSocketJob * > & wakeup_candidates)116void WebSocketThrottle::WakeupSocketIfNecessary( 117 const std::set<WebSocketJob*>& wakeup_candidates) { 118 for (std::set<WebSocketJob*>::const_iterator iter = wakeup_candidates.begin(); 119 iter != wakeup_candidates.end(); 120 ++iter) { 121 WebSocketJob* job = *iter; 122 if (!job->IsWaiting()) 123 continue; 124 125 bool should_wakeup = true; 126 const AddressList& resolved_address_list = job->address_list(); 127 for (AddressList::const_iterator addr_iter = resolved_address_list.begin(); 128 addr_iter != resolved_address_list.end(); 129 ++addr_iter) { 130 const IPEndPoint& address = *addr_iter; 131 ConnectingAddressMap::iterator map_iter = addr_map_.find(address); 132 DCHECK(map_iter != addr_map_.end()); 133 const ConnectingQueue& per_address_queue = map_iter->second; 134 if (job != per_address_queue.front()) { 135 should_wakeup = false; 136 break; 137 } 138 } 139 if (should_wakeup) 140 job->Wakeup(); 141 } 142 } 143 144 } // namespace net 145