• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2019 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IORAP_SRC_COMMON_ASYNC_POOL_H_
16 #define IORAP_SRC_COMMON_ASYNC_POOL_H_
17 
18 #include <atomic>
19 #include <vector>
20 #include <deque>
21 #include <future>
22 
23 #include <condition_variable>
24 #include <future>
25 
26 namespace iorap::common {
27 
28 class AsyncPool {
29   std::atomic<bool> shutting_down_{false};
30   std::deque<std::future<void>> futures_;
31 
32   std::mutex mutex_;
33   std::condition_variable cond_var_;
34 
35  public:
36 
37   // Any threads calling 'Join' should eventually unblock
38   // once all functors have run to completition.
Shutdown()39   void Shutdown() {
40     shutting_down_ = true;
41 
42     cond_var_.notify_all();
43   }
44 
45   // Block forever until Shutdown is called *and* all
46   // functors passed to 'LaunchAsync' have run to completition.
Join()47   void Join() {
48     std::unique_lock<std::mutex> lock(mutex_);
49     while (true) {
50       // Pop all items eagerly
51       while (true) {
52         auto it = futures_.begin();
53         if (it == futures_.end()) {
54           break;
55         }
56 
57         std::future<void> future = std::move(*it);
58         futures_.pop_front();
59 
60         lock.unlock();  // do not stall callers of LaunchAsync
61         future.get();
62         lock.lock();
63       }
64 
65       if (shutting_down_) {
66         break;
67       }
68 
69       // Wait until we either get more items or ask to be shutdown.
70       cond_var_.wait(lock);
71     }
72   }
73 
74   // Execute the functor 'u' in a new thread asynchronously.
75   // Using this spawns a new thread each time to immediately begin
76   // async execution.
77   template <typename T>
LaunchAsync(T && u)78   void LaunchAsync(T&& u) {
79     auto future = std::async(std::launch::async, std::forward<T>(u));
80 
81     {
82       std::unique_lock<std::mutex> lock(mutex_);
83       futures_.push_back(std::move(future));
84     }
85     cond_var_.notify_one();
86   }
87 };
88 
89 }  // namespace iorap::common
90 
91 #endif  // IORAP_SRC_COMMON_ASYNC_POOL_H_
92