• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 
8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
10 
11 #include <boost/thread/detail/config.hpp>
12 
13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
14 #include <boost/chrono/duration.hpp>
15 #include <boost/chrono/time_point.hpp>
16 #include <boost/chrono/system_clocks.hpp>
17 #include <boost/chrono/chrono_io.hpp>
18 
19 #include <algorithm> // std::min
20 
21 #include <boost/config/abi_prefix.hpp>
22 
23 namespace boost
24 {
25 namespace concurrent
26 {
27 namespace detail
28 {
29   // fixme: shouldn't the timepoint be configurable
30   template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
31   struct scheduled_type
32   {
33     typedef T value_type;
34     typedef Clock clock;
35     typedef TimePoint time_point;
36     T data;
37     time_point time;
38 
39     BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
40 
scheduled_typeboost::concurrent::detail::scheduled_type41     scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
scheduled_typeboost::concurrent::detail::scheduled_type42     scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
43 
scheduled_typeboost::concurrent::detail::scheduled_type44     scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
operator =boost::concurrent::detail::scheduled_type45     scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
46       data = other.data;
47       time = other.time;
48       return *this;
49     }
50 
scheduled_typeboost::concurrent::detail::scheduled_type51     scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
operator =boost::concurrent::detail::scheduled_type52     scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
53       data = boost::move(other.data);
54       time = other.time;
55       return *this;
56     }
57 
operator <boost::concurrent::detail::scheduled_type58     bool operator <(const scheduled_type & other) const
59     {
60       return this->time > other.time;
61     }
62   }; //end struct
63 
64   template <class Duration>
65   chrono::time_point<chrono::steady_clock,Duration>
limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const & tp)66   limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
67   {
68     // Clock == chrono::steady_clock
69     return tp;
70   }
71 
72   template <class Clock, class Duration>
73   chrono::time_point<Clock,Duration>
limit_timepoint(chrono::time_point<Clock,Duration> const & tp)74   limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
75   {
76     // Clock != chrono::steady_clock
77     // The system time may jump while wait_until() is waiting. To compensate for this and time out near
78     // the correct time, we limit how long wait_until() can wait before going around the loop again.
79     const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
80     return (std::min)(tp, tpmax);
81   }
82 
83   template <class Duration>
84   chrono::steady_clock::time_point
convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const & tp)85   convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
86   {
87     // Clock == chrono::steady_clock
88     return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
89   }
90 
91   template <class Clock, class Duration>
92   chrono::steady_clock::time_point
convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const & tp)93   convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
94   {
95     // Clock != chrono::steady_clock
96     // The system time may jump while wait_until() is waiting. To compensate for this and time out near
97     // the correct time, we limit how long wait_until() can wait before going around the loop again.
98     const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
99     const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
100     return chrono::steady_clock::now() + (std::min)(dura, duramax);
101   }
102 
103 } //end detail namespace
104 
105   template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
106   class sync_timed_queue
107     : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
108   {
109     typedef detail::scheduled_type<T, Clock, TimePoint> stype;
110     typedef sync_priority_queue<stype> super;
111   public:
112     typedef T value_type;
113     typedef Clock clock;
114     typedef typename clock::duration duration;
115     typedef typename clock::time_point time_point;
116     typedef typename super::underlying_queue_type underlying_queue_type;
117     typedef typename super::size_type size_type;
118     typedef typename super::op_status op_status;
119 
sync_timed_queue()120     sync_timed_queue() : super() {};
~sync_timed_queue()121     ~sync_timed_queue() {}
122 
123     using super::size;
124     using super::empty;
125     using super::full;
126     using super::close;
127     using super::closed;
128 
129     T pull();
130     void pull(T& elem);
131 
132     template <class Duration>
133     queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
134     template <class Rep, class Period>
135     queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
136 
137     queue_op_status try_pull(T& elem);
138     queue_op_status wait_pull(T& elem);
139     queue_op_status nonblocking_pull(T& elem);
140 
141     template <class Duration>
142     void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
143     template <class Rep, class Period>
144     void push(const T& elem, chrono::duration<Rep,Period> const& dura);
145 
146     template <class Duration>
147     void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
148     template <class Rep, class Period>
149     void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
150 
151     template <class Duration>
152     queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
153     template <class Rep, class Period>
154     queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
155 
156     template <class Duration>
157     queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
158     template <class Rep, class Period>
159     queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
160 
161   private:
162     inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
163     inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
164 
165     bool wait_to_pull(unique_lock<mutex>&);
166     queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
167     template <class Rep, class Period>
168     queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
169 
170     T pull(unique_lock<mutex>&);
171     T pull(lock_guard<mutex>&);
172 
173     void pull(unique_lock<mutex>&, T& elem);
174     void pull(lock_guard<mutex>&, T& elem);
175 
176     queue_op_status try_pull(unique_lock<mutex>&, T& elem);
177     queue_op_status try_pull(lock_guard<mutex>&, T& elem);
178 
179     queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
180 
181     sync_timed_queue(const sync_timed_queue&);
182     sync_timed_queue& operator=(const sync_timed_queue&);
183     sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
184     sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
185   }; //end class
186 
187 
188   template <class T, class Clock, class TimePoint>
189   template <class Duration>
push(const T & elem,chrono::time_point<clock,Duration> const & tp)190   void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
191   {
192     super::push(stype(elem,tp));
193   }
194 
195   template <class T, class Clock, class TimePoint>
196   template <class Rep, class Period>
push(const T & elem,chrono::duration<Rep,Period> const & dura)197   void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
198   {
199     push(elem, clock::now() + dura);
200   }
201 
202   template <class T, class Clock, class TimePoint>
203   template <class Duration>
push(BOOST_THREAD_RV_REF (T)elem,chrono::time_point<clock,Duration> const & tp)204   void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
205   {
206     super::push(stype(boost::move(elem),tp));
207   }
208 
209   template <class T, class Clock, class TimePoint>
210   template <class Rep, class Period>
push(BOOST_THREAD_RV_REF (T)elem,chrono::duration<Rep,Period> const & dura)211   void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
212   {
213     push(boost::move(elem), clock::now() + dura);
214   }
215 
216 
217 
218   template <class T, class Clock, class TimePoint>
219   template <class Duration>
try_push(const T & elem,chrono::time_point<clock,Duration> const & tp)220   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
221   {
222     return super::try_push(stype(elem,tp));
223   }
224 
225   template <class T, class Clock, class TimePoint>
226   template <class Rep, class Period>
try_push(const T & elem,chrono::duration<Rep,Period> const & dura)227   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
228   {
229     return try_push(elem,clock::now() + dura);
230   }
231 
232   template <class T, class Clock, class TimePoint>
233   template <class Duration>
try_push(BOOST_THREAD_RV_REF (T)elem,chrono::time_point<clock,Duration> const & tp)234   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
235   {
236     return super::try_push(stype(boost::move(elem), tp));
237   }
238 
239   template <class T, class Clock, class TimePoint>
240   template <class Rep, class Period>
try_push(BOOST_THREAD_RV_REF (T)elem,chrono::duration<Rep,Period> const & dura)241   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
242   {
243     return try_push(boost::move(elem), clock::now() + dura);
244   }
245 
246   ///////////////////////////
247   template <class T, class Clock, class TimePoint>
not_empty_and_time_reached(unique_lock<mutex> & lk) const248   bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
249   {
250     return ! super::empty(lk) && clock::now() >= super::data_.top().time;
251   }
252 
253   template <class T, class Clock, class TimePoint>
not_empty_and_time_reached(lock_guard<mutex> & lk) const254   bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
255   {
256     return ! super::empty(lk) && clock::now() >= super::data_.top().time;
257   }
258 
259   ///////////////////////////
260   template <class T, class Clock, class TimePoint>
wait_to_pull(unique_lock<mutex> & lk)261   bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
262   {
263     for (;;)
264     {
265       if (not_empty_and_time_reached(lk)) return false; // success
266       if (super::closed(lk)) return true; // closed
267 
268       super::wait_until_not_empty_or_closed(lk);
269 
270       if (not_empty_and_time_reached(lk)) return false; // success
271       if (super::closed(lk)) return true; // closed
272 
273       const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
274       super::cond_.wait_until(lk, tpmin);
275     }
276   }
277 
278   template <class T, class Clock, class TimePoint>
wait_to_pull_until(unique_lock<mutex> & lk,TimePoint const & tp)279   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
280   {
281     for (;;)
282     {
283       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
284       if (super::closed(lk)) return queue_op_status::closed;
285       if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
286 
287       super::wait_until_not_empty_or_closed_until(lk, tp);
288 
289       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
290       if (super::closed(lk)) return queue_op_status::closed;
291       if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
292 
293       const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
294       super::cond_.wait_until(lk, tpmin);
295     }
296   }
297 
298   template <class T, class Clock, class TimePoint>
299   template <class Rep, class Period>
wait_to_pull_for(unique_lock<mutex> & lk,chrono::duration<Rep,Period> const & dura)300   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
301   {
302     const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
303     for (;;)
304     {
305       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
306       if (super::closed(lk)) return queue_op_status::closed;
307       if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
308 
309       super::wait_until_not_empty_or_closed_until(lk, tp);
310 
311       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
312       if (super::closed(lk)) return queue_op_status::closed;
313       if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
314 
315       const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
316       super::cond_.wait_until(lk, tpmin);
317     }
318   }
319 
320   ///////////////////////////
321   template <class T, class Clock, class TimePoint>
pull(unique_lock<mutex> &)322   T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
323   {
324 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
325     return boost::move(super::data_.pull().data);
326 #else
327     return super::data_.pull().data;
328 #endif
329   }
330 
331   template <class T, class Clock, class TimePoint>
pull(lock_guard<mutex> &)332   T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
333   {
334 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
335     return boost::move(super::data_.pull().data);
336 #else
337     return super::data_.pull().data;
338 #endif
339   }
340   template <class T, class Clock, class TimePoint>
pull()341   T sync_timed_queue<T, Clock, TimePoint>::pull()
342   {
343     unique_lock<mutex> lk(super::mtx_);
344     const bool has_been_closed = wait_to_pull(lk);
345     if (has_been_closed) super::throw_if_closed(lk);
346     return pull(lk);
347   }
348 
349   ///////////////////////////
350   template <class T, class Clock, class TimePoint>
pull(unique_lock<mutex> &,T & elem)351   void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
352   {
353 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
354     elem = boost::move(super::data_.pull().data);
355 #else
356     elem = super::data_.pull().data;
357 #endif
358   }
359 
360   template <class T, class Clock, class TimePoint>
pull(lock_guard<mutex> &,T & elem)361   void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
362   {
363 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
364     elem = boost::move(super::data_.pull().data);
365 #else
366     elem = super::data_.pull().data;
367 #endif
368   }
369 
370   template <class T, class Clock, class TimePoint>
pull(T & elem)371   void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
372   {
373     unique_lock<mutex> lk(super::mtx_);
374     const bool has_been_closed = wait_to_pull(lk);
375     if (has_been_closed) super::throw_if_closed(lk);
376     pull(lk, elem);
377   }
378 
379   //////////////////////
380   template <class T, class Clock, class TimePoint>
381   template <class Duration>
382   queue_op_status
pull_until(chrono::time_point<clock,Duration> const & tp,T & elem)383   sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
384   {
385     unique_lock<mutex> lk(super::mtx_);
386     const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
387     if (rc == queue_op_status::success) pull(lk, elem);
388     return rc;
389   }
390 
391   //////////////////////
392   template <class T, class Clock, class TimePoint>
393   template <class Rep, class Period>
394   queue_op_status
pull_for(chrono::duration<Rep,Period> const & dura,T & elem)395   sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
396   {
397     unique_lock<mutex> lk(super::mtx_);
398     const queue_op_status rc = wait_to_pull_for(lk, dura);
399     if (rc == queue_op_status::success) pull(lk, elem);
400     return rc;
401   }
402 
403   ///////////////////////////
404   template <class T, class Clock, class TimePoint>
try_pull(unique_lock<mutex> & lk,T & elem)405   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
406   {
407     if (not_empty_and_time_reached(lk))
408     {
409       pull(lk, elem);
410       return queue_op_status::success;
411     }
412     if (super::closed(lk)) return queue_op_status::closed;
413     if (super::empty(lk)) return queue_op_status::empty;
414     return queue_op_status::not_ready;
415   }
416   template <class T, class Clock, class TimePoint>
try_pull(lock_guard<mutex> & lk,T & elem)417   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
418   {
419     if (not_empty_and_time_reached(lk))
420     {
421       pull(lk, elem);
422       return queue_op_status::success;
423     }
424     if (super::closed(lk)) return queue_op_status::closed;
425     if (super::empty(lk)) return queue_op_status::empty;
426     return queue_op_status::not_ready;
427   }
428 
429   template <class T, class Clock, class TimePoint>
try_pull(T & elem)430   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
431   {
432     lock_guard<mutex> lk(super::mtx_);
433     return try_pull(lk, elem);
434   }
435 
436   ///////////////////////////
437   template <class T, class Clock, class TimePoint>
wait_pull(unique_lock<mutex> & lk,T & elem)438   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
439   {
440     const bool has_been_closed = wait_to_pull(lk);
441     if (has_been_closed) return queue_op_status::closed;
442     pull(lk, elem);
443     return queue_op_status::success;
444   }
445 
446   template <class T, class Clock, class TimePoint>
wait_pull(T & elem)447   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
448   {
449     unique_lock<mutex> lk(super::mtx_);
450     return wait_pull(lk, elem);
451   }
452 
453   ///////////////////////////
454   template <class T, class Clock, class TimePoint>
nonblocking_pull(T & elem)455   queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
456   {
457     unique_lock<mutex> lk(super::mtx_, try_to_lock);
458     if (! lk.owns_lock()) return queue_op_status::busy;
459     return try_pull(lk, elem);
460   }
461 
462 } //end concurrent namespace
463 
464 using concurrent::sync_timed_queue;
465 
466 } //end boost namespace
467 #include <boost/config/abi_suffix.hpp>
468 
469 #endif
470