• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/base/prioritized_dispatcher.h"
6 
7 #include "base/logging.h"
8 
9 namespace net {
10 
Limits(Priority num_priorities,size_t total_jobs)11 PrioritizedDispatcher::Limits::Limits(Priority num_priorities,
12                                       size_t total_jobs)
13     : total_jobs(total_jobs), reserved_slots(num_priorities) {}
14 
~Limits()15 PrioritizedDispatcher::Limits::~Limits() {}
16 
PrioritizedDispatcher(const Limits & limits)17 PrioritizedDispatcher::PrioritizedDispatcher(const Limits& limits)
18     : queue_(limits.reserved_slots.size()),
19       max_running_jobs_(limits.reserved_slots.size()),
20       num_running_jobs_(0) {
21   SetLimits(limits);
22 }
23 
~PrioritizedDispatcher()24 PrioritizedDispatcher::~PrioritizedDispatcher() {}
25 
Add(Job * job,Priority priority)26 PrioritizedDispatcher::Handle PrioritizedDispatcher::Add(
27     Job* job, Priority priority) {
28   DCHECK(job);
29   DCHECK_LT(priority, num_priorities());
30   if (num_running_jobs_ < max_running_jobs_[priority]) {
31     ++num_running_jobs_;
32     job->Start();
33     return Handle();
34   }
35   return queue_.Insert(job, priority);
36 }
37 
AddAtHead(Job * job,Priority priority)38 PrioritizedDispatcher::Handle PrioritizedDispatcher::AddAtHead(
39     Job* job, Priority priority) {
40   DCHECK(job);
41   DCHECK_LT(priority, num_priorities());
42   if (num_running_jobs_ < max_running_jobs_[priority]) {
43     ++num_running_jobs_;
44     job->Start();
45     return Handle();
46   }
47   return queue_.InsertAtFront(job, priority);
48 }
49 
Cancel(const Handle & handle)50 void PrioritizedDispatcher::Cancel(const Handle& handle) {
51   queue_.Erase(handle);
52 }
53 
EvictOldestLowest()54 PrioritizedDispatcher::Job* PrioritizedDispatcher::EvictOldestLowest() {
55   Handle handle = queue_.FirstMin();
56   if (handle.is_null())
57     return NULL;
58   Job* job = handle.value();
59   Cancel(handle);
60   return job;
61 }
62 
ChangePriority(const Handle & handle,Priority priority)63 PrioritizedDispatcher::Handle PrioritizedDispatcher::ChangePriority(
64     const Handle& handle, Priority priority) {
65   DCHECK(!handle.is_null());
66   DCHECK_LT(priority, num_priorities());
67   DCHECK_GE(num_running_jobs_, max_running_jobs_[handle.priority()]) <<
68       "Job should not be in queue when limits permit it to start.";
69 
70   if (handle.priority() == priority)
71     return handle;
72 
73   if (MaybeDispatchJob(handle, priority))
74     return Handle();
75   Job* job = handle.value();
76   queue_.Erase(handle);
77   return queue_.Insert(job, priority);
78 }
79 
OnJobFinished()80 void PrioritizedDispatcher::OnJobFinished() {
81   DCHECK_GT(num_running_jobs_, 0u);
82   --num_running_jobs_;
83   MaybeDispatchNextJob();
84 }
85 
GetLimits() const86 PrioritizedDispatcher::Limits PrioritizedDispatcher::GetLimits() const {
87   size_t num_priorities = max_running_jobs_.size();
88   Limits limits(num_priorities, max_running_jobs_.back());
89 
90   // Calculate the number of jobs reserved for each priority and higher.  Leave
91   // the number of jobs reserved for the lowest priority or higher as 0.
92   for (size_t i = 1; i < num_priorities; ++i) {
93     limits.reserved_slots[i] = max_running_jobs_[i] - max_running_jobs_[i - 1];
94   }
95 
96   return limits;
97 }
98 
SetLimits(const Limits & limits)99 void PrioritizedDispatcher::SetLimits(const Limits& limits) {
100   DCHECK_EQ(queue_.num_priorities(), limits.reserved_slots.size());
101   size_t total = 0;
102   for (size_t i = 0; i < limits.reserved_slots.size(); ++i) {
103     total += limits.reserved_slots[i];
104     max_running_jobs_[i] = total;
105   }
106   // Unreserved slots are available for all priorities.
107   DCHECK_LE(total, limits.total_jobs) << "sum(reserved_slots) <= total_jobs";
108   size_t spare = limits.total_jobs - total;
109   for (size_t i = limits.reserved_slots.size(); i > 0; --i) {
110     max_running_jobs_[i - 1] += spare;
111   }
112 
113   // Start pending jobs, if limits permit.
114   while (true) {
115     if (!MaybeDispatchNextJob())
116       break;
117   }
118 }
119 
SetLimitsToZero()120 void PrioritizedDispatcher::SetLimitsToZero() {
121   SetLimits(Limits(queue_.num_priorities(), 0));
122 }
123 
MaybeDispatchJob(const Handle & handle,Priority job_priority)124 bool PrioritizedDispatcher::MaybeDispatchJob(const Handle& handle,
125                                              Priority job_priority) {
126   DCHECK_LT(job_priority, num_priorities());
127   if (num_running_jobs_ >= max_running_jobs_[job_priority])
128     return false;
129   Job* job = handle.value();
130   queue_.Erase(handle);
131   ++num_running_jobs_;
132   job->Start();
133   return true;
134 }
135 
MaybeDispatchNextJob()136 bool PrioritizedDispatcher::MaybeDispatchNextJob() {
137   Handle handle = queue_.FirstMax();
138   if (handle.is_null()) {
139     DCHECK_EQ(0u, queue_.size());
140     return false;
141   }
142   return MaybeDispatchJob(handle, handle.priority());
143 }
144 
145 }  // namespace net
146