1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8
9 #include "llvm/Support/Parallel.h"
10 #include "llvm/Config/llvm-config.h"
11 #include "llvm/Support/ManagedStatic.h"
12
13 #if LLVM_ENABLE_THREADS
14
15 #include "llvm/Support/Threading.h"
16
17 #include <atomic>
18 #include <future>
19 #include <stack>
20 #include <thread>
21 #include <vector>
22
23 namespace llvm {
24 namespace parallel {
25 namespace detail {
26
27 namespace {
28
29 /// An abstract class that takes closures and runs them asynchronously.
30 class Executor {
31 public:
32 virtual ~Executor() = default;
33 virtual void add(std::function<void()> func) = 0;
34
35 static Executor *getDefaultExecutor();
36 };
37
38 /// An implementation of an Executor that runs closures on a thread pool
39 /// in filo order.
40 class ThreadPoolExecutor : public Executor {
41 public:
ThreadPoolExecutor(unsigned ThreadCount=hardware_concurrency ())42 explicit ThreadPoolExecutor(unsigned ThreadCount = hardware_concurrency()) {
43 // Spawn all but one of the threads in another thread as spawning threads
44 // can take a while.
45 Threads.reserve(ThreadCount);
46 Threads.resize(1);
47 std::lock_guard<std::mutex> Lock(Mutex);
48 Threads[0] = std::thread([&, ThreadCount] {
49 for (unsigned i = 1; i < ThreadCount; ++i) {
50 Threads.emplace_back([=] { work(); });
51 if (Stop)
52 break;
53 }
54 ThreadsCreated.set_value();
55 work();
56 });
57 }
58
stop()59 void stop() {
60 {
61 std::lock_guard<std::mutex> Lock(Mutex);
62 if (Stop)
63 return;
64 Stop = true;
65 }
66 Cond.notify_all();
67 ThreadsCreated.get_future().wait();
68 }
69
~ThreadPoolExecutor()70 ~ThreadPoolExecutor() override {
71 stop();
72 std::thread::id CurrentThreadId = std::this_thread::get_id();
73 for (std::thread &T : Threads)
74 if (T.get_id() == CurrentThreadId)
75 T.detach();
76 else
77 T.join();
78 }
79
80 struct Deleter {
callllvm::parallel::detail::__anon594942540111::ThreadPoolExecutor::Deleter81 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
82 };
83
add(std::function<void ()> F)84 void add(std::function<void()> F) override {
85 {
86 std::lock_guard<std::mutex> Lock(Mutex);
87 WorkStack.push(F);
88 }
89 Cond.notify_one();
90 }
91
92 private:
work()93 void work() {
94 while (true) {
95 std::unique_lock<std::mutex> Lock(Mutex);
96 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
97 if (Stop)
98 break;
99 auto Task = WorkStack.top();
100 WorkStack.pop();
101 Lock.unlock();
102 Task();
103 }
104 }
105
106 std::atomic<bool> Stop{false};
107 std::stack<std::function<void()>> WorkStack;
108 std::mutex Mutex;
109 std::condition_variable Cond;
110 std::promise<void> ThreadsCreated;
111 std::vector<std::thread> Threads;
112 };
113
getDefaultExecutor()114 Executor *Executor::getDefaultExecutor() {
115 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
116 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
117 // stops the thread pool and waits for any worker thread creation to complete
118 // but does not wait for the threads to finish. The wait for worker thread
119 // creation to complete is important as it prevents intermittent crashes on
120 // Windows due to a race condition between thread creation and process exit.
121 //
122 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
123 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
124 // destructor ensures it has been stopped and waits for worker threads to
125 // finish. The wait is important as it prevents intermittent crashes on
126 // Windows when the process is doing a full exit.
127 //
128 // The Windows crashes appear to only occur with the MSVC static runtimes and
129 // are more frequent with the debug static runtime.
130 //
131 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
132 static ManagedStatic<ThreadPoolExecutor, object_creator<ThreadPoolExecutor>,
133 ThreadPoolExecutor::Deleter>
134 ManagedExec;
135 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
136 return Exec.get();
137 }
138 } // namespace
139
140 static std::atomic<int> TaskGroupInstances;
141
142 // Latch::sync() called by the dtor may cause one thread to block. If is a dead
143 // lock if all threads in the default executor are blocked. To prevent the dead
144 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
145 // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()146 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
~TaskGroup()147 TaskGroup::~TaskGroup() { --TaskGroupInstances; }
148
spawn(std::function<void ()> F)149 void TaskGroup::spawn(std::function<void()> F) {
150 if (Parallel) {
151 L.inc();
152 Executor::getDefaultExecutor()->add([&, F] {
153 F();
154 L.dec();
155 });
156 } else {
157 F();
158 }
159 }
160
161 } // namespace detail
162 } // namespace parallel
163 } // namespace llvm
164 #endif // LLVM_ENABLE_THREADS
165