1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // The LLVM Compiler Infrastructure 4 // 5 // This file is distributed under the University of Illinois Open Source 6 // License. See LICENSE.TXT for details. 7 // 8 //===----------------------------------------------------------------------===// 9 10 #include "llvm/Support/Parallel.h" 11 #include "llvm/Config/llvm-config.h" 12 13 #if LLVM_ENABLE_THREADS 14 15 #include "llvm/Support/Threading.h" 16 17 #include <atomic> 18 #include <stack> 19 #include <thread> 20 21 using namespace llvm; 22 23 namespace { 24 25 /// An abstract class that takes closures and runs them asynchronously. 26 class Executor { 27 public: 28 virtual ~Executor() = default; 29 virtual void add(std::function<void()> func) = 0; 30 31 static Executor *getDefaultExecutor(); 32 }; 33 34 #if defined(_MSC_VER) 35 /// An Executor that runs tasks via ConcRT. 36 class ConcRTExecutor : public Executor { 37 struct Taskish { Taskish__anon881c7aac0111::ConcRTExecutor::Taskish38 Taskish(std::function<void()> Task) : Task(Task) {} 39 40 std::function<void()> Task; 41 run__anon881c7aac0111::ConcRTExecutor::Taskish42 static void run(void *P) { 43 Taskish *Self = static_cast<Taskish *>(P); 44 Self->Task(); 45 concurrency::Free(Self); 46 } 47 }; 48 49 public: add(std::function<void ()> F)50 virtual void add(std::function<void()> F) { 51 Concurrency::CurrentScheduler::ScheduleTask( 52 Taskish::run, new (concurrency::Alloc(sizeof(Taskish))) Taskish(F)); 53 } 54 }; 55 getDefaultExecutor()56Executor *Executor::getDefaultExecutor() { 57 static ConcRTExecutor exec; 58 return &exec; 59 } 60 61 #else 62 /// An implementation of an Executor that runs closures on a thread pool 63 /// in filo order. 64 class ThreadPoolExecutor : public Executor { 65 public: ThreadPoolExecutor(unsigned ThreadCount=hardware_concurrency ())66 explicit ThreadPoolExecutor(unsigned ThreadCount = hardware_concurrency()) 67 : Done(ThreadCount) { 68 // Spawn all but one of the threads in another thread as spawning threads 69 // can take a while. 70 std::thread([&, ThreadCount] { 71 for (size_t i = 1; i < ThreadCount; ++i) { 72 std::thread([=] { work(); }).detach(); 73 } 74 work(); 75 }).detach(); 76 } 77 ~ThreadPoolExecutor()78 ~ThreadPoolExecutor() override { 79 std::unique_lock<std::mutex> Lock(Mutex); 80 Stop = true; 81 Lock.unlock(); 82 Cond.notify_all(); 83 // Wait for ~Latch. 84 } 85 add(std::function<void ()> F)86 void add(std::function<void()> F) override { 87 std::unique_lock<std::mutex> Lock(Mutex); 88 WorkStack.push(F); 89 Lock.unlock(); 90 Cond.notify_one(); 91 } 92 93 private: work()94 void work() { 95 while (true) { 96 std::unique_lock<std::mutex> Lock(Mutex); 97 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 98 if (Stop) 99 break; 100 auto Task = WorkStack.top(); 101 WorkStack.pop(); 102 Lock.unlock(); 103 Task(); 104 } 105 Done.dec(); 106 } 107 108 std::atomic<bool> Stop{false}; 109 std::stack<std::function<void()>> WorkStack; 110 std::mutex Mutex; 111 std::condition_variable Cond; 112 parallel::detail::Latch Done; 113 }; 114 getDefaultExecutor()115Executor *Executor::getDefaultExecutor() { 116 static ThreadPoolExecutor exec; 117 return &exec; 118 } 119 #endif 120 } 121 spawn(std::function<void ()> F)122void parallel::detail::TaskGroup::spawn(std::function<void()> F) { 123 L.inc(); 124 Executor::getDefaultExecutor()->add([&, F] { 125 F(); 126 L.dec(); 127 }); 128 } 129 #endif // LLVM_ENABLE_THREADS 130