• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include <boost/asio/associated_executor.hpp>
2 #include <boost/asio/bind_executor.hpp>
3 #include <boost/asio/execution_context.hpp>
4 #include <boost/asio/post.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <boost/asio/use_future.hpp>
7 #include <condition_variable>
8 #include <future>
9 #include <memory>
10 #include <mutex>
11 #include <queue>
12 #include <thread>
13 #include <vector>
14 #include <cctype>
15 
16 using boost::asio::execution_context;
17 using boost::asio::executor_binder;
18 using boost::asio::get_associated_executor;
19 using boost::asio::post;
20 using boost::asio::system_executor;
21 using boost::asio::use_future;
22 using boost::asio::use_service;
23 namespace execution = boost::asio::execution;
24 
25 // An executor that launches a new thread for each function submitted to it.
26 // This class satisfies the executor requirements.
27 class thread_executor
28 {
29 private:
30   // Service to track all threads started through a thread_executor.
31   class thread_bag : public execution_context::service
32   {
33   public:
34     typedef thread_bag key_type;
35 
thread_bag(execution_context & ctx)36     explicit thread_bag(execution_context& ctx)
37       : execution_context::service(ctx)
38     {
39     }
40 
add_thread(std::thread && t)41     void add_thread(std::thread&& t)
42     {
43       std::unique_lock<std::mutex> lock(mutex_);
44       threads_.push_back(std::move(t));
45     }
46 
47   private:
shutdown()48     virtual void shutdown()
49     {
50       for (auto& t : threads_)
51         t.join();
52     }
53 
54     std::mutex mutex_;
55     std::vector<std::thread> threads_;
56   };
57 
58 public:
query(execution::context_t) const59   execution_context& query(execution::context_t) const
60   {
61     return boost::asio::query(system_executor(), execution::context);
62   }
63 
query(execution::blocking_t) const64   execution::blocking_t query(execution::blocking_t) const
65   {
66     return execution::blocking.never;
67   }
68 
require(execution::blocking_t::never_t) const69   thread_executor require(execution::blocking_t::never_t) const
70   {
71     return *this;
72   }
73 
74   template <class Func>
execute(Func f) const75   void execute(Func f) const
76   {
77     thread_bag& bag = use_service<thread_bag>(query(execution::context));
78     bag.add_thread(std::thread(std::move(f)));
79   }
80 
operator ==(const thread_executor &,const thread_executor &)81   friend bool operator==(const thread_executor&,
82       const thread_executor&) noexcept
83   {
84     return true;
85   }
86 
operator !=(const thread_executor &,const thread_executor &)87   friend bool operator!=(const thread_executor&,
88       const thread_executor&) noexcept
89   {
90     return false;
91   }
92 };
93 
94 // Base class for all thread-safe queue implementations.
95 class queue_impl_base
96 {
97   template <class> friend class queue_front;
98   template <class> friend class queue_back;
99   std::mutex mutex_;
100   std::condition_variable condition_;
101   bool stop_ = false;
102 };
103 
104 // Underlying implementation of a thread-safe queue, shared between the
105 // queue_front and queue_back classes.
106 template <class T>
107 class queue_impl : public queue_impl_base
108 {
109   template <class> friend class queue_front;
110   template <class> friend class queue_back;
111   std::queue<T> queue_;
112 };
113 
114 // The front end of a queue between consecutive pipeline stages.
115 template <class T>
116 class queue_front
117 {
118 public:
119   typedef T value_type;
120 
queue_front(std::shared_ptr<queue_impl<T>> impl)121   explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
122     : impl_(impl)
123   {
124   }
125 
push(T t)126   void push(T t)
127   {
128     std::unique_lock<std::mutex> lock(impl_->mutex_);
129     impl_->queue_.push(std::move(t));
130     impl_->condition_.notify_one();
131   }
132 
stop()133   void stop()
134   {
135     std::unique_lock<std::mutex> lock(impl_->mutex_);
136     impl_->stop_ = true;
137     impl_->condition_.notify_one();
138   }
139 
140 private:
141   std::shared_ptr<queue_impl<T>> impl_;
142 };
143 
144 // The back end of a queue between consecutive pipeline stages.
145 template <class T>
146 class queue_back
147 {
148 public:
149   typedef T value_type;
150 
queue_back(std::shared_ptr<queue_impl<T>> impl)151   explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
152     : impl_(impl)
153   {
154   }
155 
pop(T & t)156   bool pop(T& t)
157   {
158     std::unique_lock<std::mutex> lock(impl_->mutex_);
159     while (impl_->queue_.empty() && !impl_->stop_)
160       impl_->condition_.wait(lock);
161     if (!impl_->queue_.empty())
162     {
163       t = impl_->queue_.front();
164       impl_->queue_.pop();
165       return true;
166     }
167     return false;
168   }
169 
170 private:
171   std::shared_ptr<queue_impl<T>> impl_;
172 };
173 
174 // Launch the last stage in a pipeline.
175 template <class T, class F>
pipeline(queue_back<T> in,F f)176 std::future<void> pipeline(queue_back<T> in, F f)
177 {
178   // Get the function's associated executor, defaulting to thread_executor.
179   auto ex = get_associated_executor(f, thread_executor());
180 
181   // Run the function, and as we're the last stage return a future so that the
182   // caller can wait for the pipeline to finish.
183   return post(ex, use_future([in, f]() mutable { f(in); }));
184 }
185 
186 // Launch an intermediate stage in a pipeline.
187 template <class T, class F, class... Tail>
pipeline(queue_back<T> in,F f,Tail...t)188 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
189 {
190   // Determine the output queue type.
191   typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
192 
193   // Create the output queue and its implementation.
194   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
195   queue_front<output_value_type> out(out_impl);
196   queue_back<output_value_type> next_in(out_impl);
197 
198   // Get the function's associated executor, defaulting to thread_executor.
199   auto ex = get_associated_executor(f, thread_executor());
200 
201   // Run the function.
202   post(ex, [in, out, f]() mutable
203       {
204         f(in, out);
205         out.stop();
206       });
207 
208   // Launch the rest of the pipeline.
209   return pipeline(next_in, std::move(t)...);
210 }
211 
212 // Launch the first stage in a pipeline.
213 template <class F, class... Tail>
pipeline(F f,Tail...t)214 std::future<void> pipeline(F f, Tail... t)
215 {
216   // Determine the output queue type.
217   typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
218 
219   // Create the output queue and its implementation.
220   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
221   queue_front<output_value_type> out(out_impl);
222   queue_back<output_value_type> next_in(out_impl);
223 
224   // Get the function's associated executor, defaulting to thread_executor.
225   auto ex = get_associated_executor(f, thread_executor());
226 
227   // Run the function.
228   post(ex, [out, f]() mutable
229       {
230         f(out);
231         out.stop();
232       });
233 
234   // Launch the rest of the pipeline.
235   return pipeline(next_in, std::move(t)...);
236 }
237 
238 //------------------------------------------------------------------------------
239 
240 #include <boost/asio/thread_pool.hpp>
241 #include <iostream>
242 #include <string>
243 
244 using boost::asio::bind_executor;
245 using boost::asio::thread_pool;
246 
reader(queue_front<std::string> out)247 void reader(queue_front<std::string> out)
248 {
249   std::string line;
250   while (std::getline(std::cin, line))
251     out.push(line);
252 }
253 
filter(queue_back<std::string> in,queue_front<std::string> out)254 void filter(queue_back<std::string> in, queue_front<std::string> out)
255 {
256   std::string line;
257   while (in.pop(line))
258     if (line.length() > 5)
259       out.push(line);
260 }
261 
upper(queue_back<std::string> in,queue_front<std::string> out)262 void upper(queue_back<std::string> in, queue_front<std::string> out)
263 {
264   std::string line;
265   while (in.pop(line))
266   {
267     std::string new_line;
268     for (char c : line)
269       new_line.push_back(std::toupper(c));
270     out.push(new_line);
271   }
272 }
273 
writer(queue_back<std::string> in)274 void writer(queue_back<std::string> in)
275 {
276   std::size_t count = 0;
277   std::string line;
278   while (in.pop(line))
279     std::cout << count++ << ": " << line << std::endl;
280 }
281 
main()282 int main()
283 {
284   thread_pool pool(1);
285 
286   auto f = pipeline(reader, filter, bind_executor(pool, upper), writer);
287   f.wait();
288 }
289