• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include <boost/asio/associated_executor.hpp>
2 #include <boost/asio/bind_executor.hpp>
3 #include <boost/asio/execution.hpp>
4 #include <condition_variable>
5 #include <future>
6 #include <memory>
7 #include <mutex>
8 #include <queue>
9 #include <thread>
10 #include <vector>
11 #include <cctype>
12 
13 using boost::asio::executor_binder;
14 using boost::asio::get_associated_executor;
15 namespace execution = boost::asio::execution;
16 
17 // An executor that launches a new thread for each function submitted to it.
18 // This class satisfies the executor requirements.
19 class thread_executor
20 {
21 private:
22   // Singleton execution context that manages threads launched by the new_thread_executor.
23   class thread_bag
24   {
25     friend class thread_executor;
26 
add_thread(std::thread && t)27     void add_thread(std::thread&& t)
28     {
29       std::unique_lock<std::mutex> lock(mutex_);
30       threads_.push_back(std::move(t));
31     }
32 
33     thread_bag() = default;
34 
~thread_bag()35     ~thread_bag()
36     {
37       for (auto& t : threads_)
38         t.join();
39     }
40 
41     std::mutex mutex_;
42     std::vector<std::thread> threads_;
43   };
44 
45 public:
query(execution::context_t)46   static thread_bag& query(execution::context_t)
47   {
48     static thread_bag threads;
49     return threads;
50   }
51 
query(execution::blocking_t)52   static constexpr auto query(execution::blocking_t)
53   {
54     return execution::blocking.never;
55   }
56 
57   template <class Func>
execute(Func f) const58   void execute(Func f) const
59   {
60     thread_bag& bag = query(execution::context);
61     bag.add_thread(std::thread(std::move(f)));
62   }
63 
operator ==(const thread_executor &,const thread_executor &)64   friend bool operator==(const thread_executor&,
65       const thread_executor&) noexcept
66   {
67     return true;
68   }
69 
operator !=(const thread_executor &,const thread_executor &)70   friend bool operator!=(const thread_executor&,
71       const thread_executor&) noexcept
72   {
73     return false;
74   }
75 };
76 
77 // Base class for all thread-safe queue implementations.
78 class queue_impl_base
79 {
80   template <class> friend class queue_front;
81   template <class> friend class queue_back;
82   std::mutex mutex_;
83   std::condition_variable condition_;
84   bool stop_ = false;
85 };
86 
87 // Underlying implementation of a thread-safe queue, shared between the
88 // queue_front and queue_back classes.
89 template <class T>
90 class queue_impl : public queue_impl_base
91 {
92   template <class> friend class queue_front;
93   template <class> friend class queue_back;
94   std::queue<T> queue_;
95 };
96 
97 // The front end of a queue between consecutive pipeline stages.
98 template <class T>
99 class queue_front
100 {
101 public:
102   typedef T value_type;
103 
queue_front(std::shared_ptr<queue_impl<T>> impl)104   explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
105     : impl_(impl)
106   {
107   }
108 
push(T t)109   void push(T t)
110   {
111     std::unique_lock<std::mutex> lock(impl_->mutex_);
112     impl_->queue_.push(std::move(t));
113     impl_->condition_.notify_one();
114   }
115 
stop()116   void stop()
117   {
118     std::unique_lock<std::mutex> lock(impl_->mutex_);
119     impl_->stop_ = true;
120     impl_->condition_.notify_one();
121   }
122 
123 private:
124   std::shared_ptr<queue_impl<T>> impl_;
125 };
126 
127 // The back end of a queue between consecutive pipeline stages.
128 template <class T>
129 class queue_back
130 {
131 public:
132   typedef T value_type;
133 
queue_back(std::shared_ptr<queue_impl<T>> impl)134   explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
135     : impl_(impl)
136   {
137   }
138 
pop(T & t)139   bool pop(T& t)
140   {
141     std::unique_lock<std::mutex> lock(impl_->mutex_);
142     while (impl_->queue_.empty() && !impl_->stop_)
143       impl_->condition_.wait(lock);
144     if (!impl_->queue_.empty())
145     {
146       t = impl_->queue_.front();
147       impl_->queue_.pop();
148       return true;
149     }
150     return false;
151   }
152 
153 private:
154   std::shared_ptr<queue_impl<T>> impl_;
155 };
156 
157 // Launch the last stage in a pipeline.
158 template <class T, class F>
pipeline(queue_back<T> in,F f)159 std::future<void> pipeline(queue_back<T> in, F f)
160 {
161   // Get the function's associated executor, defaulting to thread_executor.
162   auto ex = get_associated_executor(f, thread_executor());
163 
164   // Run the function, and as we're the last stage return a future so that the
165   // caller can wait for the pipeline to finish.
166   std::packaged_task<void()> task(
167       [in, f = std::move(f)]() mutable
168       {
169         f(in);
170       });
171   std::future<void> fut = task.get_future();
172   execution::execute(
173       boost::asio::require(ex, execution::blocking.never),
174       std::move(task));
175   return fut;
176 }
177 
178 // Launch an intermediate stage in a pipeline.
179 template <class T, class F, class... Tail>
pipeline(queue_back<T> in,F f,Tail...t)180 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
181 {
182   // Determine the output queue type.
183   typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
184 
185   // Create the output queue and its implementation.
186   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
187   queue_front<output_value_type> out(out_impl);
188   queue_back<output_value_type> next_in(out_impl);
189 
190   // Get the function's associated executor, defaulting to thread_executor.
191   auto ex = get_associated_executor(f, thread_executor());
192 
193   // Run the function.
194   execution::execute(
195       boost::asio::require(ex, execution::blocking.never),
196       [in, out, f = std::move(f)]() mutable
197       {
198         f(in, out);
199         out.stop();
200       });
201 
202   // Launch the rest of the pipeline.
203   return pipeline(next_in, std::move(t)...);
204 }
205 
206 // Launch the first stage in a pipeline.
207 template <class F, class... Tail>
pipeline(F f,Tail...t)208 std::future<void> pipeline(F f, Tail... t)
209 {
210   // Determine the output queue type.
211   typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
212 
213   // Create the output queue and its implementation.
214   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
215   queue_front<output_value_type> out(out_impl);
216   queue_back<output_value_type> next_in(out_impl);
217 
218   // Get the function's associated executor, defaulting to thread_executor.
219   auto ex = get_associated_executor(f, thread_executor());
220 
221   // Run the function.
222   execution::execute(
223       boost::asio::require(ex, execution::blocking.never),
224       [out, f = std::move(f)]() mutable
225       {
226         f(out);
227         out.stop();
228       });
229 
230   // Launch the rest of the pipeline.
231   return pipeline(next_in, std::move(t)...);
232 }
233 
234 //------------------------------------------------------------------------------
235 
236 #include <boost/asio/static_thread_pool.hpp>
237 #include <iostream>
238 #include <string>
239 
240 using boost::asio::bind_executor;
241 using boost::asio::static_thread_pool;
242 
reader(queue_front<std::string> out)243 void reader(queue_front<std::string> out)
244 {
245   std::string line;
246   while (std::getline(std::cin, line))
247     out.push(line);
248 }
249 
filter(queue_back<std::string> in,queue_front<std::string> out)250 void filter(queue_back<std::string> in, queue_front<std::string> out)
251 {
252   std::string line;
253   while (in.pop(line))
254     if (line.length() > 5)
255       out.push(line);
256 }
257 
upper(queue_back<std::string> in,queue_front<std::string> out)258 void upper(queue_back<std::string> in, queue_front<std::string> out)
259 {
260   std::string line;
261   while (in.pop(line))
262   {
263     std::string new_line;
264     for (char c : line)
265       new_line.push_back(std::toupper(c));
266     out.push(new_line);
267   }
268 }
269 
writer(queue_back<std::string> in)270 void writer(queue_back<std::string> in)
271 {
272   std::size_t count = 0;
273   std::string line;
274   while (in.pop(line))
275     std::cout << count++ << ": " << line << std::endl;
276 }
277 
main()278 int main()
279 {
280   static_thread_pool pool(1);
281 
282   auto f = pipeline(reader, filter, bind_executor(pool.executor(), upper), writer);
283   f.wait();
284 }
285