1 #include <boost/asio/associated_executor.hpp>
2 #include <boost/asio/bind_executor.hpp>
3 #include <boost/asio/execution.hpp>
4 #include <condition_variable>
5 #include <future>
6 #include <memory>
7 #include <mutex>
8 #include <queue>
9 #include <thread>
10 #include <vector>
11 #include <cctype>
12
13 using boost::asio::executor_binder;
14 using boost::asio::get_associated_executor;
15 namespace execution = boost::asio::execution;
16
17 // An executor that launches a new thread for each function submitted to it.
18 // This class satisfies the executor requirements.
19 class thread_executor
20 {
21 private:
22 // Singleton execution context that manages threads launched by the new_thread_executor.
23 class thread_bag
24 {
25 friend class thread_executor;
26
add_thread(std::thread && t)27 void add_thread(std::thread&& t)
28 {
29 std::unique_lock<std::mutex> lock(mutex_);
30 threads_.push_back(std::move(t));
31 }
32
33 thread_bag() = default;
34
~thread_bag()35 ~thread_bag()
36 {
37 for (auto& t : threads_)
38 t.join();
39 }
40
41 std::mutex mutex_;
42 std::vector<std::thread> threads_;
43 };
44
45 public:
query(execution::context_t)46 static thread_bag& query(execution::context_t)
47 {
48 static thread_bag threads;
49 return threads;
50 }
51
query(execution::blocking_t)52 static constexpr auto query(execution::blocking_t)
53 {
54 return execution::blocking.never;
55 }
56
57 template <class Func>
execute(Func f) const58 void execute(Func f) const
59 {
60 thread_bag& bag = query(execution::context);
61 bag.add_thread(std::thread(std::move(f)));
62 }
63
operator ==(const thread_executor &,const thread_executor &)64 friend bool operator==(const thread_executor&,
65 const thread_executor&) noexcept
66 {
67 return true;
68 }
69
operator !=(const thread_executor &,const thread_executor &)70 friend bool operator!=(const thread_executor&,
71 const thread_executor&) noexcept
72 {
73 return false;
74 }
75 };
76
77 // Base class for all thread-safe queue implementations.
78 class queue_impl_base
79 {
80 template <class> friend class queue_front;
81 template <class> friend class queue_back;
82 std::mutex mutex_;
83 std::condition_variable condition_;
84 bool stop_ = false;
85 };
86
87 // Underlying implementation of a thread-safe queue, shared between the
88 // queue_front and queue_back classes.
89 template <class T>
90 class queue_impl : public queue_impl_base
91 {
92 template <class> friend class queue_front;
93 template <class> friend class queue_back;
94 std::queue<T> queue_;
95 };
96
97 // The front end of a queue between consecutive pipeline stages.
98 template <class T>
99 class queue_front
100 {
101 public:
102 typedef T value_type;
103
queue_front(std::shared_ptr<queue_impl<T>> impl)104 explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
105 : impl_(impl)
106 {
107 }
108
push(T t)109 void push(T t)
110 {
111 std::unique_lock<std::mutex> lock(impl_->mutex_);
112 impl_->queue_.push(std::move(t));
113 impl_->condition_.notify_one();
114 }
115
stop()116 void stop()
117 {
118 std::unique_lock<std::mutex> lock(impl_->mutex_);
119 impl_->stop_ = true;
120 impl_->condition_.notify_one();
121 }
122
123 private:
124 std::shared_ptr<queue_impl<T>> impl_;
125 };
126
127 // The back end of a queue between consecutive pipeline stages.
128 template <class T>
129 class queue_back
130 {
131 public:
132 typedef T value_type;
133
queue_back(std::shared_ptr<queue_impl<T>> impl)134 explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
135 : impl_(impl)
136 {
137 }
138
pop(T & t)139 bool pop(T& t)
140 {
141 std::unique_lock<std::mutex> lock(impl_->mutex_);
142 while (impl_->queue_.empty() && !impl_->stop_)
143 impl_->condition_.wait(lock);
144 if (!impl_->queue_.empty())
145 {
146 t = impl_->queue_.front();
147 impl_->queue_.pop();
148 return true;
149 }
150 return false;
151 }
152
153 private:
154 std::shared_ptr<queue_impl<T>> impl_;
155 };
156
157 // Launch the last stage in a pipeline.
158 template <class T, class F>
pipeline(queue_back<T> in,F f)159 std::future<void> pipeline(queue_back<T> in, F f)
160 {
161 // Get the function's associated executor, defaulting to thread_executor.
162 auto ex = get_associated_executor(f, thread_executor());
163
164 // Run the function, and as we're the last stage return a future so that the
165 // caller can wait for the pipeline to finish.
166 std::packaged_task<void()> task(
167 [in, f = std::move(f)]() mutable
168 {
169 f(in);
170 });
171 std::future<void> fut = task.get_future();
172 execution::execute(
173 boost::asio::require(ex, execution::blocking.never),
174 std::move(task));
175 return fut;
176 }
177
178 // Launch an intermediate stage in a pipeline.
179 template <class T, class F, class... Tail>
pipeline(queue_back<T> in,F f,Tail...t)180 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
181 {
182 // Determine the output queue type.
183 typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
184
185 // Create the output queue and its implementation.
186 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
187 queue_front<output_value_type> out(out_impl);
188 queue_back<output_value_type> next_in(out_impl);
189
190 // Get the function's associated executor, defaulting to thread_executor.
191 auto ex = get_associated_executor(f, thread_executor());
192
193 // Run the function.
194 execution::execute(
195 boost::asio::require(ex, execution::blocking.never),
196 [in, out, f = std::move(f)]() mutable
197 {
198 f(in, out);
199 out.stop();
200 });
201
202 // Launch the rest of the pipeline.
203 return pipeline(next_in, std::move(t)...);
204 }
205
206 // Launch the first stage in a pipeline.
207 template <class F, class... Tail>
pipeline(F f,Tail...t)208 std::future<void> pipeline(F f, Tail... t)
209 {
210 // Determine the output queue type.
211 typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
212
213 // Create the output queue and its implementation.
214 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
215 queue_front<output_value_type> out(out_impl);
216 queue_back<output_value_type> next_in(out_impl);
217
218 // Get the function's associated executor, defaulting to thread_executor.
219 auto ex = get_associated_executor(f, thread_executor());
220
221 // Run the function.
222 execution::execute(
223 boost::asio::require(ex, execution::blocking.never),
224 [out, f = std::move(f)]() mutable
225 {
226 f(out);
227 out.stop();
228 });
229
230 // Launch the rest of the pipeline.
231 return pipeline(next_in, std::move(t)...);
232 }
233
234 //------------------------------------------------------------------------------
235
236 #include <boost/asio/static_thread_pool.hpp>
237 #include <iostream>
238 #include <string>
239
240 using boost::asio::bind_executor;
241 using boost::asio::static_thread_pool;
242
reader(queue_front<std::string> out)243 void reader(queue_front<std::string> out)
244 {
245 std::string line;
246 while (std::getline(std::cin, line))
247 out.push(line);
248 }
249
filter(queue_back<std::string> in,queue_front<std::string> out)250 void filter(queue_back<std::string> in, queue_front<std::string> out)
251 {
252 std::string line;
253 while (in.pop(line))
254 if (line.length() > 5)
255 out.push(line);
256 }
257
upper(queue_back<std::string> in,queue_front<std::string> out)258 void upper(queue_back<std::string> in, queue_front<std::string> out)
259 {
260 std::string line;
261 while (in.pop(line))
262 {
263 std::string new_line;
264 for (char c : line)
265 new_line.push_back(std::toupper(c));
266 out.push(new_line);
267 }
268 }
269
writer(queue_back<std::string> in)270 void writer(queue_back<std::string> in)
271 {
272 std::size_t count = 0;
273 std::string line;
274 while (in.pop(line))
275 std::cout << count++ << ": " << line << std::endl;
276 }
277
main()278 int main()
279 {
280 static_thread_pool pool(1);
281
282 auto f = pipeline(reader, filter, bind_executor(pool.executor(), upper), writer);
283 f.wait();
284 }
285