• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Marl Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "marl/thread.h"
16 
17 #include "marl/debug.h"
18 #include "marl/defer.h"
19 #include "marl/trace.h"
20 
21 #include <algorithm>  // std::sort
22 
23 #include <cstdarg>
24 #include <cstdio>
25 
26 #if defined(_WIN32)
27 #define WIN32_LEAN_AND_MEAN 1
28 #include <windows.h>
29 #include <array>
30 #include <cstdlib>  // mbstowcs
31 #include <limits>   // std::numeric_limits
32 #include <vector>
33 #undef max
34 #elif defined(__APPLE__)
35 #include <mach/thread_act.h>
36 #include <pthread.h>
37 #include <unistd.h>
38 #include <thread>
39 #elif defined(__FreeBSD__)
40 #include <pthread.h>
41 #include <pthread_np.h>
42 #include <unistd.h>
43 #include <thread>
44 #else
45 #include <pthread.h>
46 #include <unistd.h>
47 #include <thread>
48 #endif
49 
50 namespace {
51 
52 struct CoreHasher {
operator ()__anon6fd9e19d0111::CoreHasher53   inline uint64_t operator()(const marl::Thread::Core& core) const {
54     return core.pthread.index;
55   }
56 };
57 
58 }  // anonymous namespace
59 
60 namespace marl {
61 
62 #if defined(_WIN32)
63 static constexpr size_t MaxCoreCount =
64     std::numeric_limits<decltype(Thread::Core::windows.index)>::max() + 1ULL;
65 static constexpr size_t MaxGroupCount =
66     std::numeric_limits<decltype(Thread::Core::windows.group)>::max() + 1ULL;
67 static_assert(sizeof(KAFFINITY) * 8ULL <= MaxCoreCount,
68               "Thread::Core::windows.index is too small");
69 
70 namespace {
71 #define CHECK_WIN32(expr)                                    \
72   do {                                                       \
73     auto res = expr;                                         \
74     (void)res;                                               \
75     MARL_ASSERT(res == TRUE, #expr " failed with error: %d", \
76                 (int)GetLastError());                        \
77   } while (false)
78 
79 struct ProcessorGroup {
80   unsigned int count;  // number of logical processors in this group.
81   KAFFINITY affinity;  // affinity mask.
82 };
83 
84 struct ProcessorGroups {
85   std::array<ProcessorGroup, MaxGroupCount> groups;
86   size_t count;
87 };
88 
getProcessorGroups()89 const ProcessorGroups& getProcessorGroups() {
90   static ProcessorGroups groups = [] {
91     ProcessorGroups out = {};
92     SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX info[32] = {};
93     DWORD size = sizeof(info);
94     CHECK_WIN32(GetLogicalProcessorInformationEx(RelationGroup, info, &size));
95     DWORD count = size / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX);
96     for (DWORD i = 0; i < count; i++) {
97       if (info[i].Relationship == RelationGroup) {
98         auto groupCount = info[i].Group.ActiveGroupCount;
99         for (WORD groupIdx = 0; groupIdx < groupCount; groupIdx++) {
100           auto const& groupInfo = info[i].Group.GroupInfo[groupIdx];
101           out.groups[out.count++] = ProcessorGroup{
102               groupInfo.ActiveProcessorCount, groupInfo.ActiveProcessorMask};
103           MARL_ASSERT(out.count <= MaxGroupCount, "Group index overflow");
104         }
105       }
106     }
107     return out;
108   }();
109   return groups;
110 }
111 }  // namespace
112 #endif  // defined(_WIN32)
113 
114 ////////////////////////////////////////////////////////////////////////////////
115 // Thread::Affinty
116 ////////////////////////////////////////////////////////////////////////////////
117 
Affinity(Allocator * allocator)118 Thread::Affinity::Affinity(Allocator* allocator) : cores(allocator) {}
Affinity(Affinity && other)119 Thread::Affinity::Affinity(Affinity&& other) : cores(std::move(other.cores)) {}
Affinity(const Affinity & other,Allocator * allocator)120 Thread::Affinity::Affinity(const Affinity& other, Allocator* allocator)
121     : cores(other.cores, allocator) {}
122 
Affinity(std::initializer_list<Core> list,Allocator * allocator)123 Thread::Affinity::Affinity(std::initializer_list<Core> list,
124                            Allocator* allocator)
125     : cores(allocator) {
126   cores.reserve(list.size());
127   for (auto core : list) {
128     cores.push_back(core);
129   }
130 }
131 
Affinity(const containers::vector<Core,32> & coreList,Allocator * allocator)132 Thread::Affinity::Affinity(const containers::vector<Core, 32>& coreList,
133                            Allocator* allocator)
134     : cores(coreList, allocator) {}
135 
all(Allocator * allocator)136 Thread::Affinity Thread::Affinity::all(
137     Allocator* allocator /* = Allocator::Default */) {
138   Thread::Affinity affinity(allocator);
139 
140 #if defined(_WIN32)
141   const auto& groups = getProcessorGroups();
142   for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
143     const auto& group = groups.groups[groupIdx];
144     Core core;
145     core.windows.group = static_cast<decltype(Core::windows.group)>(groupIdx);
146     for (unsigned int coreIdx = 0; coreIdx < group.count; coreIdx++) {
147       if ((group.affinity >> coreIdx) & 1) {
148         core.windows.index = static_cast<decltype(core.windows.index)>(coreIdx);
149         affinity.cores.emplace_back(std::move(core));
150       }
151     }
152   }
153 #elif defined(__linux__) && !defined(__ANDROID__)
154   auto thread = pthread_self();
155   cpu_set_t cpuset;
156   CPU_ZERO(&cpuset);
157   if (pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset) == 0) {
158     int count = CPU_COUNT(&cpuset);
159     for (int i = 0; i < count; i++) {
160       Core core;
161       core.pthread.index = static_cast<uint16_t>(i);
162       affinity.cores.emplace_back(std::move(core));
163     }
164   }
165 #elif defined(__FreeBSD__)
166   auto thread = pthread_self();
167   cpuset_t cpuset;
168   CPU_ZERO(&cpuset);
169   if (pthread_getaffinity_np(thread, sizeof(cpuset_t), &cpuset) == 0) {
170     int count = CPU_COUNT(&cpuset);
171     for (int i = 0; i < count; i++) {
172       Core core;
173       core.pthread.index = static_cast<uint16_t>(i);
174       affinity.cores.emplace_back(std::move(core));
175     }
176   }
177 #else
178   static_assert(!supported,
179                 "marl::Thread::Affinity::supported is true, but "
180                 "Thread::Affinity::all() is not implemented for this platform");
181 #endif
182 
183   return affinity;
184 }
185 
anyOf(Affinity && affinity,Allocator * allocator)186 std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::anyOf(
187     Affinity&& affinity,
188     Allocator* allocator /* = Allocator::Default */) {
189   struct Policy : public Thread::Affinity::Policy {
190     Affinity affinity;
191     Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
192 
193     Affinity get(uint32_t threadId, Allocator* allocator) const override {
194 #if defined(_WIN32)
195       auto count = affinity.count();
196       if (count == 0) {
197         return Affinity(affinity, allocator);
198       }
199       auto group = affinity[threadId % affinity.count()].windows.group;
200       Affinity out(allocator);
201       out.cores.reserve(count);
202       for (auto core : affinity.cores) {
203         if (core.windows.group == group) {
204           out.cores.push_back(core);
205         }
206       }
207       return out;
208 #else
209       return Affinity(affinity, allocator);
210 #endif
211     }
212   };
213 
214   return allocator->make_shared<Policy>(std::move(affinity));
215 }
216 
oneOf(Affinity && affinity,Allocator * allocator)217 std::shared_ptr<Thread::Affinity::Policy> Thread::Affinity::Policy::oneOf(
218     Affinity&& affinity,
219     Allocator* allocator /* = Allocator::Default */) {
220   struct Policy : public Thread::Affinity::Policy {
221     Affinity affinity;
222     Policy(Affinity&& affinity) : affinity(std::move(affinity)) {}
223 
224     Affinity get(uint32_t threadId, Allocator* allocator) const override {
225       auto count = affinity.count();
226       if (count == 0) {
227         return Affinity(affinity, allocator);
228       }
229       return Affinity({affinity[threadId % affinity.count()]}, allocator);
230     }
231   };
232 
233   return allocator->make_shared<Policy>(std::move(affinity));
234 }
235 
count() const236 size_t Thread::Affinity::count() const {
237   return cores.size();
238 }
239 
operator [](size_t index) const240 Thread::Core Thread::Affinity::operator[](size_t index) const {
241   return cores[index];
242 }
243 
add(const Thread::Affinity & other)244 Thread::Affinity& Thread::Affinity::add(const Thread::Affinity& other) {
245   containers::unordered_set<Core, CoreHasher> set(cores.allocator);
246   for (auto core : cores) {
247     set.emplace(core);
248   }
249   for (auto core : other.cores) {
250     if (set.count(core) == 0) {
251       cores.push_back(core);
252     }
253   }
254   std::sort(cores.begin(), cores.end());
255   return *this;
256 }
257 
remove(const Thread::Affinity & other)258 Thread::Affinity& Thread::Affinity::remove(const Thread::Affinity& other) {
259   containers::unordered_set<Core, CoreHasher> set(cores.allocator);
260   for (auto core : other.cores) {
261     set.emplace(core);
262   }
263   for (size_t i = 0; i < cores.size(); i++) {
264     if (set.count(cores[i]) != 0) {
265       cores[i] = cores.back();
266       cores.resize(cores.size() - 1);
267     }
268   }
269   std::sort(cores.begin(), cores.end());
270   return *this;
271 }
272 
273 #if defined(_WIN32)
274 
275 class Thread::Impl {
276  public:
Impl(Func && func)277   Impl(Func&& func) : func(std::move(func)) {}
run(void * self)278   static DWORD WINAPI run(void* self) {
279     reinterpret_cast<Impl*>(self)->func();
280     return 0;
281   }
282 
283   Func func;
284   HANDLE handle;
285 };
286 
Thread(Affinity && affinity,Func && func)287 Thread::Thread(Affinity&& affinity, Func&& func) {
288   SIZE_T size = 0;
289   InitializeProcThreadAttributeList(nullptr, 1, 0, &size);
290   MARL_ASSERT(size > 0,
291               "InitializeProcThreadAttributeList() did not give a size");
292 
293   std::vector<uint8_t> buffer(size);
294   LPPROC_THREAD_ATTRIBUTE_LIST attributes =
295       reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>(buffer.data());
296   CHECK_WIN32(InitializeProcThreadAttributeList(attributes, 1, 0, &size));
297   defer(DeleteProcThreadAttributeList(attributes));
298 
299   GROUP_AFFINITY groupAffinity = {};
300 
301   auto count = affinity.count();
302   if (count > 0) {
303     groupAffinity.Group = affinity[0].windows.group;
304     for (size_t i = 0; i < count; i++) {
305       auto core = affinity[i];
306       MARL_ASSERT(groupAffinity.Group == core.windows.group,
307                   "Cannot create thread that uses multiple affinity groups");
308       groupAffinity.Mask |= (1ULL << core.windows.index);
309     }
310     CHECK_WIN32(UpdateProcThreadAttribute(
311         attributes, 0, PROC_THREAD_ATTRIBUTE_GROUP_AFFINITY, &groupAffinity,
312         sizeof(groupAffinity), nullptr, nullptr));
313   }
314 
315   impl = new Impl(std::move(func));
316   impl->handle = CreateRemoteThreadEx(GetCurrentProcess(), nullptr, 0,
317                                       &Impl::run, impl, 0, attributes, nullptr);
318 }
319 
~Thread()320 Thread::~Thread() {
321   if (impl) {
322     CloseHandle(impl->handle);
323     delete impl;
324   }
325 }
326 
join()327 void Thread::join() {
328   MARL_ASSERT(impl != nullptr, "join() called on unjoinable thread");
329   WaitForSingleObject(impl->handle, INFINITE);
330 }
331 
setName(const char * fmt,...)332 void Thread::setName(const char* fmt, ...) {
333   static auto setThreadDescription =
334       reinterpret_cast<HRESULT(WINAPI*)(HANDLE, PCWSTR)>(GetProcAddress(
335           GetModuleHandle("kernelbase.dll"), "SetThreadDescription"));
336   if (setThreadDescription == nullptr) {
337     return;
338   }
339 
340   char name[1024];
341   va_list vararg;
342   va_start(vararg, fmt);
343   vsnprintf(name, sizeof(name), fmt, vararg);
344   va_end(vararg);
345 
346   wchar_t wname[1024];
347   mbstowcs(wname, name, 1024);
348   setThreadDescription(GetCurrentThread(), wname);
349   MARL_NAME_THREAD("%s", name);
350 }
351 
numLogicalCPUs()352 unsigned int Thread::numLogicalCPUs() {
353   unsigned int count = 0;
354   const auto& groups = getProcessorGroups();
355   for (size_t groupIdx = 0; groupIdx < groups.count; groupIdx++) {
356     const auto& group = groups.groups[groupIdx];
357     count += group.count;
358   }
359   return count;
360 }
361 
362 #else
363 
364 class Thread::Impl {
365  public:
Impl(Affinity && affinity,Thread::Func && f)366   Impl(Affinity&& affinity, Thread::Func&& f)
367       : affinity(std::move(affinity)), func(std::move(f)), thread([this] {
368           setAffinity();
369           func();
370         }) {}
371 
372   Affinity affinity;
373   Func func;
374   std::thread thread;
375 
setAffinity()376   void setAffinity() {
377     auto count = affinity.count();
378     if (count == 0) {
379       return;
380     }
381 
382 #if defined(__linux__) && !defined(__ANDROID__)
383     cpu_set_t cpuset;
384     CPU_ZERO(&cpuset);
385     for (size_t i = 0; i < count; i++) {
386       CPU_SET(affinity[i].pthread.index, &cpuset);
387     }
388     auto thread = pthread_self();
389     pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
390 #elif defined(__FreeBSD__)
391     cpuset_t cpuset;
392     CPU_ZERO(&cpuset);
393     for (size_t i = 0; i < count; i++) {
394       CPU_SET(affinity[i].pthread.index, &cpuset);
395     }
396     auto thread = pthread_self();
397     pthread_setaffinity_np(thread, sizeof(cpuset_t), &cpuset);
398 #else
399     MARL_ASSERT(!marl::Thread::Affinity::supported,
400                 "Attempting to use thread affinity on a unsupported platform");
401 #endif
402   }
403 };
404 
Thread(Affinity && affinity,Func && func)405 Thread::Thread(Affinity&& affinity, Func&& func)
406     : impl(new Thread::Impl(std::move(affinity), std::move(func))) {}
407 
~Thread()408 Thread::~Thread() {
409   MARL_ASSERT(!impl, "Thread::join() was not called before destruction");
410 }
411 
join()412 void Thread::join() {
413   impl->thread.join();
414   delete impl;
415   impl = nullptr;
416 }
417 
setName(const char * fmt,...)418 void Thread::setName(const char* fmt, ...) {
419   char name[1024];
420   va_list vararg;
421   va_start(vararg, fmt);
422   vsnprintf(name, sizeof(name), fmt, vararg);
423   va_end(vararg);
424 
425 #if defined(__APPLE__)
426   pthread_setname_np(name);
427 #elif defined(__FreeBSD__)
428   pthread_set_name_np(pthread_self(), name);
429 #elif !defined(__Fuchsia__)
430   pthread_setname_np(pthread_self(), name);
431 #endif
432 
433   MARL_NAME_THREAD("%s", name);
434 }
435 
numLogicalCPUs()436 unsigned int Thread::numLogicalCPUs() {
437   return static_cast<unsigned int>(sysconf(_SC_NPROCESSORS_ONLN));
438 }
439 
440 #endif  // OS
441 
Thread(Thread && rhs)442 Thread::Thread(Thread&& rhs) : impl(rhs.impl) {
443   rhs.impl = nullptr;
444 }
445 
operator =(Thread && rhs)446 Thread& Thread::operator=(Thread&& rhs) {
447   if (impl) {
448     delete impl;
449     impl = nullptr;
450   }
451   impl = rhs.impl;
452   rhs.impl = nullptr;
453   return *this;
454 }
455 
456 }  // namespace marl
457