1 // Copyright (C) 2019 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef IORAP_SRC_COMMON_ASYNC_POOL_H_ 16 #define IORAP_SRC_COMMON_ASYNC_POOL_H_ 17 18 #include <atomic> 19 #include <vector> 20 #include <deque> 21 #include <future> 22 23 #include <condition_variable> 24 #include <future> 25 26 namespace iorap::common { 27 28 class AsyncPool { 29 std::atomic<bool> shutting_down_{false}; 30 std::deque<std::future<void>> futures_; 31 32 std::mutex mutex_; 33 std::condition_variable cond_var_; 34 35 public: 36 37 // Any threads calling 'Join' should eventually unblock 38 // once all functors have run to completition. Shutdown()39 void Shutdown() { 40 shutting_down_ = true; 41 42 cond_var_.notify_all(); 43 } 44 45 // Block forever until Shutdown is called *and* all 46 // functors passed to 'LaunchAsync' have run to completition. Join()47 void Join() { 48 std::unique_lock<std::mutex> lock(mutex_); 49 while (true) { 50 // Pop all items eagerly 51 while (true) { 52 auto it = futures_.begin(); 53 if (it == futures_.end()) { 54 break; 55 } 56 57 std::future<void> future = std::move(*it); 58 futures_.pop_front(); 59 60 lock.unlock(); // do not stall callers of LaunchAsync 61 future.get(); 62 lock.lock(); 63 } 64 65 if (shutting_down_) { 66 break; 67 } 68 69 // Wait until we either get more items or ask to be shutdown. 70 cond_var_.wait(lock); 71 } 72 } 73 74 // Execute the functor 'u' in a new thread asynchronously. 75 // Using this spawns a new thread each time to immediately begin 76 // async execution. 77 template <typename T> LaunchAsync(T && u)78 void LaunchAsync(T&& u) { 79 auto future = std::async(std::launch::async, std::forward<T>(u)); 80 81 { 82 std::unique_lock<std::mutex> lock(mutex_); 83 futures_.push_back(std::move(future)); 84 } 85 cond_var_.notify_one(); 86 } 87 }; 88 89 } // namespace iorap::common 90 91 #endif // IORAP_SRC_COMMON_ASYNC_POOL_H_ 92