1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "util/worker_pool.h"
6
7 #include "base/command_line.h"
8 #include "base/strings/string_number_conversions.h"
9 #include "gn/switches.h"
10 #include "util/build_config.h"
11 #include "util/sys_info.h"
12
13 #if defined(OS_WIN)
14 #include <windows.h>
15 #endif
16
17 namespace {
18
19 #if defined(OS_WIN)
20 class ProcessorGroupSetter {
21 public:
22 void SetProcessorGroup(std::thread* thread);
23
24 private:
25 int group_ = 0;
26 GROUP_AFFINITY group_affinity_;
27 int num_available_cores_in_group_ = ::GetActiveProcessorCount(group_) / 2;
28 const int num_groups_ = ::GetActiveProcessorGroupCount();
29 };
30
SetProcessorGroup(std::thread * thread)31 void ProcessorGroupSetter::SetProcessorGroup(std::thread* thread) {
32 if (num_groups_ <= 1)
33 return;
34
35 const HANDLE thread_handle = HANDLE(thread->native_handle());
36 ::GetThreadGroupAffinity(thread_handle, &group_affinity_);
37 group_affinity_.Group = group_;
38 const bool success =
39 ::SetThreadGroupAffinity(thread_handle, &group_affinity_, nullptr);
40 DCHECK(success);
41
42 // Move to next group once one thread has been assigned per core in |group_|.
43 num_available_cores_in_group_--;
44 if (num_available_cores_in_group_ <= 0) {
45 group_++;
46 if (group_ >= num_groups_) {
47 group_ = 0;
48 }
49 num_available_cores_in_group_ = ::GetActiveProcessorCount(group_) / 2;
50 }
51 }
52 #endif
53
GetThreadCount()54 int GetThreadCount() {
55 std::string thread_count =
56 base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
57 switches::kThreads);
58
59 // See if an override was specified on the command line.
60 int result;
61 if (!thread_count.empty() && base::StringToInt(thread_count, &result) &&
62 result >= 1) {
63 return result;
64 }
65
66 // Base the default number of worker threads on number of cores in the
67 // system. When building large projects, the speed can be limited by how fast
68 // the main thread can dispatch work and connect the dependency graph. If
69 // there are too many worker threads, the main thread can be starved and it
70 // will run slower overall.
71 //
72 // One less worker thread than the number of physical CPUs seems to be a
73 // good value, both theoretically and experimentally. But always use at
74 // least some workers to prevent us from being too sensitive to I/O latency
75 // on low-end systems.
76 //
77 // The minimum thread count is based on measuring the optimal threads for the
78 // Chrome build on a several-year-old 4-core MacBook.
79 // Almost all CPUs now are hyperthreaded.
80 int num_cores = NumberOfProcessors() / 2;
81 return std::max(num_cores - 1, 8);
82 }
83
84 } // namespace
85
WorkerPool()86 WorkerPool::WorkerPool() : WorkerPool(GetThreadCount()) {}
87
WorkerPool(size_t thread_count)88 WorkerPool::WorkerPool(size_t thread_count) : should_stop_processing_(false) {
89 #if defined(OS_WIN)
90 ProcessorGroupSetter processor_group_setter;
91 #endif
92
93 threads_.reserve(thread_count);
94 for (size_t i = 0; i < thread_count; ++i) {
95 threads_.emplace_back([this]() { Worker(); });
96
97 #if defined(OS_WIN)
98 // Set thread processor group. This is needed for systems with more than 64
99 // logical processors, wherein available processors are divided into groups,
100 // and applications that need to use more than one group's processors must
101 // manually assign their threads to groups.
102 processor_group_setter.SetProcessorGroup(&threads_.back());
103 #endif
104 }
105 }
106
~WorkerPool()107 WorkerPool::~WorkerPool() {
108 {
109 std::unique_lock<std::mutex> queue_lock(queue_mutex_);
110 should_stop_processing_ = true;
111 }
112
113 pool_notifier_.notify_all();
114
115 for (auto& task_thread : threads_) {
116 task_thread.join();
117 }
118 }
119
PostTask(std::function<void ()> work)120 void WorkerPool::PostTask(std::function<void()> work) {
121 {
122 std::unique_lock<std::mutex> queue_lock(queue_mutex_);
123 CHECK(!should_stop_processing_);
124 task_queue_.emplace(std::move(work));
125 }
126
127 pool_notifier_.notify_one();
128 }
129
Worker()130 void WorkerPool::Worker() {
131 for (;;) {
132 std::function<void()> task;
133
134 {
135 std::unique_lock<std::mutex> queue_lock(queue_mutex_);
136
137 pool_notifier_.wait(queue_lock, [this]() {
138 return (!task_queue_.empty()) || should_stop_processing_;
139 });
140
141 if (should_stop_processing_ && task_queue_.empty())
142 return;
143
144 task = std::move(task_queue_.front());
145 task_queue_.pop();
146 }
147
148 task();
149 }
150 }
151