1 // Copyright (C) 2014 Ian Forbed 2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 // 7 8 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP 9 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP 10 11 #include <boost/thread/detail/config.hpp> 12 13 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp> 14 #include <boost/chrono/duration.hpp> 15 #include <boost/chrono/time_point.hpp> 16 #include <boost/chrono/system_clocks.hpp> 17 #include <boost/chrono/chrono_io.hpp> 18 19 #include <algorithm> // std::min 20 21 #include <boost/config/abi_prefix.hpp> 22 23 namespace boost 24 { 25 namespace concurrent 26 { 27 namespace detail 28 { 29 // fixme: shouldn't the timepoint be configurable 30 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point> 31 struct scheduled_type 32 { 33 typedef T value_type; 34 typedef Clock clock; 35 typedef TimePoint time_point; 36 T data; 37 time_point time; 38 39 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type) 40 scheduled_typeboost::concurrent::detail::scheduled_type41 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {} scheduled_typeboost::concurrent::detail::scheduled_type42 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {} 43 scheduled_typeboost::concurrent::detail::scheduled_type44 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {} operator =boost::concurrent::detail::scheduled_type45 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) { 46 data = other.data; 47 time = other.time; 48 return *this; 49 } 50 scheduled_typeboost::concurrent::detail::scheduled_type51 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {} operator =boost::concurrent::detail::scheduled_type52 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) { 53 data = boost::move(other.data); 54 time = other.time; 55 return *this; 56 } 57 operator <boost::concurrent::detail::scheduled_type58 bool operator <(const scheduled_type & other) const 59 { 60 return this->time > other.time; 61 } 62 }; //end struct 63 64 template <class Duration> 65 chrono::time_point<chrono::steady_clock,Duration> limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const & tp)66 limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) 67 { 68 // Clock == chrono::steady_clock 69 return tp; 70 } 71 72 template <class Clock, class Duration> 73 chrono::time_point<Clock,Duration> limit_timepoint(chrono::time_point<Clock,Duration> const & tp)74 limit_timepoint(chrono::time_point<Clock,Duration> const& tp) 75 { 76 // Clock != chrono::steady_clock 77 // The system time may jump while wait_until() is waiting. To compensate for this and time out near 78 // the correct time, we limit how long wait_until() can wait before going around the loop again. 79 const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS))); 80 return (std::min)(tp, tpmax); 81 } 82 83 template <class Duration> 84 chrono::steady_clock::time_point convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const & tp)85 convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) 86 { 87 // Clock == chrono::steady_clock 88 return chrono::time_point_cast<chrono::steady_clock::duration>(tp); 89 } 90 91 template <class Clock, class Duration> 92 chrono::steady_clock::time_point convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const & tp)93 convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp) 94 { 95 // Clock != chrono::steady_clock 96 // The system time may jump while wait_until() is waiting. To compensate for this and time out near 97 // the correct time, we limit how long wait_until() can wait before going around the loop again. 98 const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now())); 99 const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)); 100 return chrono::steady_clock::now() + (std::min)(dura, duramax); 101 } 102 103 } //end detail namespace 104 105 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point> 106 class sync_timed_queue 107 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> > 108 { 109 typedef detail::scheduled_type<T, Clock, TimePoint> stype; 110 typedef sync_priority_queue<stype> super; 111 public: 112 typedef T value_type; 113 typedef Clock clock; 114 typedef typename clock::duration duration; 115 typedef typename clock::time_point time_point; 116 typedef typename super::underlying_queue_type underlying_queue_type; 117 typedef typename super::size_type size_type; 118 typedef typename super::op_status op_status; 119 sync_timed_queue()120 sync_timed_queue() : super() {}; ~sync_timed_queue()121 ~sync_timed_queue() {} 122 123 using super::size; 124 using super::empty; 125 using super::full; 126 using super::close; 127 using super::closed; 128 129 T pull(); 130 void pull(T& elem); 131 132 template <class Duration> 133 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem); 134 template <class Rep, class Period> 135 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem); 136 137 queue_op_status try_pull(T& elem); 138 queue_op_status wait_pull(T& elem); 139 queue_op_status nonblocking_pull(T& elem); 140 141 template <class Duration> 142 void push(const T& elem, chrono::time_point<clock,Duration> const& tp); 143 template <class Rep, class Period> 144 void push(const T& elem, chrono::duration<Rep,Period> const& dura); 145 146 template <class Duration> 147 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp); 148 template <class Rep, class Period> 149 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura); 150 151 template <class Duration> 152 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp); 153 template <class Rep, class Period> 154 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura); 155 156 template <class Duration> 157 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp); 158 template <class Rep, class Period> 159 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura); 160 161 private: 162 inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const; 163 inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const; 164 165 bool wait_to_pull(unique_lock<mutex>&); 166 queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp); 167 template <class Rep, class Period> 168 queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura); 169 170 T pull(unique_lock<mutex>&); 171 T pull(lock_guard<mutex>&); 172 173 void pull(unique_lock<mutex>&, T& elem); 174 void pull(lock_guard<mutex>&, T& elem); 175 176 queue_op_status try_pull(unique_lock<mutex>&, T& elem); 177 queue_op_status try_pull(lock_guard<mutex>&, T& elem); 178 179 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem); 180 181 sync_timed_queue(const sync_timed_queue&); 182 sync_timed_queue& operator=(const sync_timed_queue&); 183 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue)); 184 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue)); 185 }; //end class 186 187 188 template <class T, class Clock, class TimePoint> 189 template <class Duration> push(const T & elem,chrono::time_point<clock,Duration> const & tp)190 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp) 191 { 192 super::push(stype(elem,tp)); 193 } 194 195 template <class T, class Clock, class TimePoint> 196 template <class Rep, class Period> push(const T & elem,chrono::duration<Rep,Period> const & dura)197 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura) 198 { 199 push(elem, clock::now() + dura); 200 } 201 202 template <class T, class Clock, class TimePoint> 203 template <class Duration> push(BOOST_THREAD_RV_REF (T)elem,chrono::time_point<clock,Duration> const & tp)204 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp) 205 { 206 super::push(stype(boost::move(elem),tp)); 207 } 208 209 template <class T, class Clock, class TimePoint> 210 template <class Rep, class Period> push(BOOST_THREAD_RV_REF (T)elem,chrono::duration<Rep,Period> const & dura)211 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura) 212 { 213 push(boost::move(elem), clock::now() + dura); 214 } 215 216 217 218 template <class T, class Clock, class TimePoint> 219 template <class Duration> try_push(const T & elem,chrono::time_point<clock,Duration> const & tp)220 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp) 221 { 222 return super::try_push(stype(elem,tp)); 223 } 224 225 template <class T, class Clock, class TimePoint> 226 template <class Rep, class Period> try_push(const T & elem,chrono::duration<Rep,Period> const & dura)227 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura) 228 { 229 return try_push(elem,clock::now() + dura); 230 } 231 232 template <class T, class Clock, class TimePoint> 233 template <class Duration> try_push(BOOST_THREAD_RV_REF (T)elem,chrono::time_point<clock,Duration> const & tp)234 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp) 235 { 236 return super::try_push(stype(boost::move(elem), tp)); 237 } 238 239 template <class T, class Clock, class TimePoint> 240 template <class Rep, class Period> try_push(BOOST_THREAD_RV_REF (T)elem,chrono::duration<Rep,Period> const & dura)241 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura) 242 { 243 return try_push(boost::move(elem), clock::now() + dura); 244 } 245 246 /////////////////////////// 247 template <class T, class Clock, class TimePoint> not_empty_and_time_reached(unique_lock<mutex> & lk) const248 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const 249 { 250 return ! super::empty(lk) && clock::now() >= super::data_.top().time; 251 } 252 253 template <class T, class Clock, class TimePoint> not_empty_and_time_reached(lock_guard<mutex> & lk) const254 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const 255 { 256 return ! super::empty(lk) && clock::now() >= super::data_.top().time; 257 } 258 259 /////////////////////////// 260 template <class T, class Clock, class TimePoint> wait_to_pull(unique_lock<mutex> & lk)261 bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk) 262 { 263 for (;;) 264 { 265 if (not_empty_and_time_reached(lk)) return false; // success 266 if (super::closed(lk)) return true; // closed 267 268 super::wait_until_not_empty_or_closed(lk); 269 270 if (not_empty_and_time_reached(lk)) return false; // success 271 if (super::closed(lk)) return true; // closed 272 273 const time_point tpmin(detail::limit_timepoint(super::data_.top().time)); 274 super::cond_.wait_until(lk, tpmin); 275 } 276 } 277 278 template <class T, class Clock, class TimePoint> wait_to_pull_until(unique_lock<mutex> & lk,TimePoint const & tp)279 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp) 280 { 281 for (;;) 282 { 283 if (not_empty_and_time_reached(lk)) return queue_op_status::success; 284 if (super::closed(lk)) return queue_op_status::closed; 285 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; 286 287 super::wait_until_not_empty_or_closed_until(lk, tp); 288 289 if (not_empty_and_time_reached(lk)) return queue_op_status::success; 290 if (super::closed(lk)) return queue_op_status::closed; 291 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; 292 293 const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time))); 294 super::cond_.wait_until(lk, tpmin); 295 } 296 } 297 298 template <class T, class Clock, class TimePoint> 299 template <class Rep, class Period> wait_to_pull_for(unique_lock<mutex> & lk,chrono::duration<Rep,Period> const & dura)300 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura) 301 { 302 const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura)); 303 for (;;) 304 { 305 if (not_empty_and_time_reached(lk)) return queue_op_status::success; 306 if (super::closed(lk)) return queue_op_status::closed; 307 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; 308 309 super::wait_until_not_empty_or_closed_until(lk, tp); 310 311 if (not_empty_and_time_reached(lk)) return queue_op_status::success; 312 if (super::closed(lk)) return queue_op_status::closed; 313 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; 314 315 const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time))); 316 super::cond_.wait_until(lk, tpmin); 317 } 318 } 319 320 /////////////////////////// 321 template <class T, class Clock, class TimePoint> pull(unique_lock<mutex> &)322 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&) 323 { 324 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES 325 return boost::move(super::data_.pull().data); 326 #else 327 return super::data_.pull().data; 328 #endif 329 } 330 331 template <class T, class Clock, class TimePoint> pull(lock_guard<mutex> &)332 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&) 333 { 334 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES 335 return boost::move(super::data_.pull().data); 336 #else 337 return super::data_.pull().data; 338 #endif 339 } 340 template <class T, class Clock, class TimePoint> pull()341 T sync_timed_queue<T, Clock, TimePoint>::pull() 342 { 343 unique_lock<mutex> lk(super::mtx_); 344 const bool has_been_closed = wait_to_pull(lk); 345 if (has_been_closed) super::throw_if_closed(lk); 346 return pull(lk); 347 } 348 349 /////////////////////////// 350 template <class T, class Clock, class TimePoint> pull(unique_lock<mutex> &,T & elem)351 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem) 352 { 353 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES 354 elem = boost::move(super::data_.pull().data); 355 #else 356 elem = super::data_.pull().data; 357 #endif 358 } 359 360 template <class T, class Clock, class TimePoint> pull(lock_guard<mutex> &,T & elem)361 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem) 362 { 363 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES 364 elem = boost::move(super::data_.pull().data); 365 #else 366 elem = super::data_.pull().data; 367 #endif 368 } 369 370 template <class T, class Clock, class TimePoint> pull(T & elem)371 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem) 372 { 373 unique_lock<mutex> lk(super::mtx_); 374 const bool has_been_closed = wait_to_pull(lk); 375 if (has_been_closed) super::throw_if_closed(lk); 376 pull(lk, elem); 377 } 378 379 ////////////////////// 380 template <class T, class Clock, class TimePoint> 381 template <class Duration> 382 queue_op_status pull_until(chrono::time_point<clock,Duration> const & tp,T & elem)383 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem) 384 { 385 unique_lock<mutex> lk(super::mtx_); 386 const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp)); 387 if (rc == queue_op_status::success) pull(lk, elem); 388 return rc; 389 } 390 391 ////////////////////// 392 template <class T, class Clock, class TimePoint> 393 template <class Rep, class Period> 394 queue_op_status pull_for(chrono::duration<Rep,Period> const & dura,T & elem)395 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem) 396 { 397 unique_lock<mutex> lk(super::mtx_); 398 const queue_op_status rc = wait_to_pull_for(lk, dura); 399 if (rc == queue_op_status::success) pull(lk, elem); 400 return rc; 401 } 402 403 /////////////////////////// 404 template <class T, class Clock, class TimePoint> try_pull(unique_lock<mutex> & lk,T & elem)405 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem) 406 { 407 if (not_empty_and_time_reached(lk)) 408 { 409 pull(lk, elem); 410 return queue_op_status::success; 411 } 412 if (super::closed(lk)) return queue_op_status::closed; 413 if (super::empty(lk)) return queue_op_status::empty; 414 return queue_op_status::not_ready; 415 } 416 template <class T, class Clock, class TimePoint> try_pull(lock_guard<mutex> & lk,T & elem)417 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem) 418 { 419 if (not_empty_and_time_reached(lk)) 420 { 421 pull(lk, elem); 422 return queue_op_status::success; 423 } 424 if (super::closed(lk)) return queue_op_status::closed; 425 if (super::empty(lk)) return queue_op_status::empty; 426 return queue_op_status::not_ready; 427 } 428 429 template <class T, class Clock, class TimePoint> try_pull(T & elem)430 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem) 431 { 432 lock_guard<mutex> lk(super::mtx_); 433 return try_pull(lk, elem); 434 } 435 436 /////////////////////////// 437 template <class T, class Clock, class TimePoint> wait_pull(unique_lock<mutex> & lk,T & elem)438 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem) 439 { 440 const bool has_been_closed = wait_to_pull(lk); 441 if (has_been_closed) return queue_op_status::closed; 442 pull(lk, elem); 443 return queue_op_status::success; 444 } 445 446 template <class T, class Clock, class TimePoint> wait_pull(T & elem)447 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem) 448 { 449 unique_lock<mutex> lk(super::mtx_); 450 return wait_pull(lk, elem); 451 } 452 453 /////////////////////////// 454 template <class T, class Clock, class TimePoint> nonblocking_pull(T & elem)455 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem) 456 { 457 unique_lock<mutex> lk(super::mtx_, try_to_lock); 458 if (! lk.owns_lock()) return queue_op_status::busy; 459 return try_pull(lk, elem); 460 } 461 462 } //end concurrent namespace 463 464 using concurrent::sync_timed_queue; 465 466 } //end boost namespace 467 #include <boost/config/abi_suffix.hpp> 468 469 #endif 470