• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2 //
3 //                     The LLVM Compiler Infrastructure
4 //
5 // This file is distributed under the University of Illinois Open Source
6 // License. See LICENSE.TXT for details.
7 //
8 //===----------------------------------------------------------------------===//
9 
10 #include "llvm/Support/Parallel.h"
11 #include "llvm/Config/llvm-config.h"
12 
13 #if LLVM_ENABLE_THREADS
14 
15 #include "llvm/Support/Threading.h"
16 
17 #include <atomic>
18 #include <stack>
19 #include <thread>
20 
21 using namespace llvm;
22 
23 namespace {
24 
25 /// An abstract class that takes closures and runs them asynchronously.
26 class Executor {
27 public:
28   virtual ~Executor() = default;
29   virtual void add(std::function<void()> func) = 0;
30 
31   static Executor *getDefaultExecutor();
32 };
33 
34 #if defined(_MSC_VER)
35 /// An Executor that runs tasks via ConcRT.
36 class ConcRTExecutor : public Executor {
37   struct Taskish {
Taskish__anon881c7aac0111::ConcRTExecutor::Taskish38     Taskish(std::function<void()> Task) : Task(Task) {}
39 
40     std::function<void()> Task;
41 
run__anon881c7aac0111::ConcRTExecutor::Taskish42     static void run(void *P) {
43       Taskish *Self = static_cast<Taskish *>(P);
44       Self->Task();
45       concurrency::Free(Self);
46     }
47   };
48 
49 public:
add(std::function<void ()> F)50   virtual void add(std::function<void()> F) {
51     Concurrency::CurrentScheduler::ScheduleTask(
52         Taskish::run, new (concurrency::Alloc(sizeof(Taskish))) Taskish(F));
53   }
54 };
55 
getDefaultExecutor()56 Executor *Executor::getDefaultExecutor() {
57   static ConcRTExecutor exec;
58   return &exec;
59 }
60 
61 #else
62 /// An implementation of an Executor that runs closures on a thread pool
63 ///   in filo order.
64 class ThreadPoolExecutor : public Executor {
65 public:
ThreadPoolExecutor(unsigned ThreadCount=hardware_concurrency ())66   explicit ThreadPoolExecutor(unsigned ThreadCount = hardware_concurrency())
67       : Done(ThreadCount) {
68     // Spawn all but one of the threads in another thread as spawning threads
69     // can take a while.
70     std::thread([&, ThreadCount] {
71       for (size_t i = 1; i < ThreadCount; ++i) {
72         std::thread([=] { work(); }).detach();
73       }
74       work();
75     }).detach();
76   }
77 
~ThreadPoolExecutor()78   ~ThreadPoolExecutor() override {
79     std::unique_lock<std::mutex> Lock(Mutex);
80     Stop = true;
81     Lock.unlock();
82     Cond.notify_all();
83     // Wait for ~Latch.
84   }
85 
add(std::function<void ()> F)86   void add(std::function<void()> F) override {
87     std::unique_lock<std::mutex> Lock(Mutex);
88     WorkStack.push(F);
89     Lock.unlock();
90     Cond.notify_one();
91   }
92 
93 private:
work()94   void work() {
95     while (true) {
96       std::unique_lock<std::mutex> Lock(Mutex);
97       Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
98       if (Stop)
99         break;
100       auto Task = WorkStack.top();
101       WorkStack.pop();
102       Lock.unlock();
103       Task();
104     }
105     Done.dec();
106   }
107 
108   std::atomic<bool> Stop{false};
109   std::stack<std::function<void()>> WorkStack;
110   std::mutex Mutex;
111   std::condition_variable Cond;
112   parallel::detail::Latch Done;
113 };
114 
getDefaultExecutor()115 Executor *Executor::getDefaultExecutor() {
116   static ThreadPoolExecutor exec;
117   return &exec;
118 }
119 #endif
120 }
121 
spawn(std::function<void ()> F)122 void parallel::detail::TaskGroup::spawn(std::function<void()> F) {
123   L.inc();
124   Executor::getDefaultExecutor()->add([&, F] {
125     F();
126     L.dec();
127   });
128 }
129 #endif // LLVM_ENABLE_THREADS
130