• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "util/worker_pool.h"
6 
7 #include "base/command_line.h"
8 #include "base/strings/string_number_conversions.h"
9 #include "gn/switches.h"
10 #include "util/build_config.h"
11 #include "util/sys_info.h"
12 
13 #if defined(OS_WIN)
14 #include <windows.h>
15 #endif
16 
17 namespace {
18 
19 #if defined(OS_WIN)
20 class ProcessorGroupSetter {
21  public:
22   void SetProcessorGroup(std::thread* thread);
23 
24  private:
25   int group_ = 0;
26   GROUP_AFFINITY group_affinity_;
27   int num_available_cores_in_group_ = ::GetActiveProcessorCount(group_) / 2;
28   const int num_groups_ = ::GetActiveProcessorGroupCount();
29 };
30 
SetProcessorGroup(std::thread * thread)31 void ProcessorGroupSetter::SetProcessorGroup(std::thread* thread) {
32   if (num_groups_ <= 1)
33     return;
34 
35   const HANDLE thread_handle = HANDLE(thread->native_handle());
36   ::GetThreadGroupAffinity(thread_handle, &group_affinity_);
37   group_affinity_.Group = group_;
38   const bool success =
39       ::SetThreadGroupAffinity(thread_handle, &group_affinity_, nullptr);
40   DCHECK(success);
41 
42   // Move to next group once one thread has been assigned per core in |group_|.
43   num_available_cores_in_group_--;
44   if (num_available_cores_in_group_ <= 0) {
45     group_++;
46     if (group_ >= num_groups_) {
47       group_ = 0;
48     }
49     num_available_cores_in_group_ = ::GetActiveProcessorCount(group_) / 2;
50   }
51 }
52 #endif
53 
GetThreadCount()54 int GetThreadCount() {
55   std::string thread_count =
56       base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
57           switches::kThreads);
58 
59   // See if an override was specified on the command line.
60   int result;
61   if (!thread_count.empty() && base::StringToInt(thread_count, &result) &&
62       result >= 1) {
63     return result;
64   }
65 
66   // Base the default number of worker threads on number of cores in the
67   // system. When building large projects, the speed can be limited by how fast
68   // the main thread can dispatch work and connect the dependency graph. If
69   // there are too many worker threads, the main thread can be starved and it
70   // will run slower overall.
71   //
72   // One less worker thread than the number of physical CPUs seems to be a
73   // good value, both theoretically and experimentally. But always use at
74   // least some workers to prevent us from being too sensitive to I/O latency
75   // on low-end systems.
76   //
77   // The minimum thread count is based on measuring the optimal threads for the
78   // Chrome build on a several-year-old 4-core MacBook.
79   // Almost all CPUs now are hyperthreaded.
80   int num_cores = NumberOfProcessors() / 2;
81   return std::max(num_cores - 1, 8);
82 }
83 
84 }  // namespace
85 
WorkerPool()86 WorkerPool::WorkerPool() : WorkerPool(GetThreadCount()) {}
87 
WorkerPool(size_t thread_count)88 WorkerPool::WorkerPool(size_t thread_count) : should_stop_processing_(false) {
89 #if defined(OS_WIN)
90   ProcessorGroupSetter processor_group_setter;
91 #endif
92 
93   threads_.reserve(thread_count);
94   for (size_t i = 0; i < thread_count; ++i) {
95     threads_.emplace_back([this]() { Worker(); });
96 
97 #if defined(OS_WIN)
98     // Set thread processor group. This is needed for systems with more than 64
99     // logical processors, wherein available processors are divided into groups,
100     // and applications that need to use more than one group's processors must
101     // manually assign their threads to groups.
102     processor_group_setter.SetProcessorGroup(&threads_.back());
103 #endif
104   }
105 }
106 
~WorkerPool()107 WorkerPool::~WorkerPool() {
108   {
109     std::unique_lock<std::mutex> queue_lock(queue_mutex_);
110     should_stop_processing_ = true;
111   }
112 
113   pool_notifier_.notify_all();
114 
115   for (auto& task_thread : threads_) {
116     task_thread.join();
117   }
118 }
119 
PostTask(std::function<void ()> work)120 void WorkerPool::PostTask(std::function<void()> work) {
121   {
122     std::unique_lock<std::mutex> queue_lock(queue_mutex_);
123     CHECK(!should_stop_processing_);
124     task_queue_.emplace(std::move(work));
125   }
126 
127   pool_notifier_.notify_one();
128 }
129 
Worker()130 void WorkerPool::Worker() {
131   for (;;) {
132     std::function<void()> task;
133 
134     {
135       std::unique_lock<std::mutex> queue_lock(queue_mutex_);
136 
137       pool_notifier_.wait(queue_lock, [this]() {
138         return (!task_queue_.empty()) || should_stop_processing_;
139       });
140 
141       if (should_stop_processing_ && task_queue_.empty())
142         return;
143 
144       task = std::move(task_queue_.front());
145       task_queue_.pop();
146     }
147 
148     task();
149   }
150 }
151