1 // Copyright (C) 2014 Ian Forbed 2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 // 7 8 #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE 9 #define BOOST_THREAD_SYNC_PRIORITY_QUEUE 10 11 #include <boost/thread/detail/config.hpp> 12 13 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp> 14 #include <boost/thread/concurrent_queues/queue_op_status.hpp> 15 #include <boost/thread/condition_variable.hpp> 16 #include <boost/thread/csbl/vector.hpp> 17 #include <boost/thread/detail/move.hpp> 18 #include <boost/thread/mutex.hpp> 19 20 #include <boost/atomic.hpp> 21 #include <boost/chrono/duration.hpp> 22 #include <boost/chrono/time_point.hpp> 23 24 #include <exception> 25 #include <queue> 26 #include <utility> 27 28 #include <boost/config/abi_prefix.hpp> 29 30 namespace boost 31 { 32 namespace detail { 33 34 template < 35 class Type, 36 class Container = csbl::vector<Type>, 37 class Compare = std::less<Type> 38 > 39 class priority_queue 40 { 41 private: 42 Container _elements; 43 Compare _compare; 44 public: 45 typedef Type value_type; 46 typedef typename Container::size_type size_type; 47 priority_queue(const Compare & compare=Compare ())48 explicit priority_queue(const Compare& compare = Compare()) 49 : _elements(), _compare(compare) 50 { } 51 size() const52 size_type size() const 53 { 54 return _elements.size(); 55 } 56 empty() const57 bool empty() const 58 { 59 return _elements.empty(); 60 } 61 push(Type const & element)62 void push(Type const& element) 63 { 64 _elements.push_back(element); 65 std::push_heap(_elements.begin(), _elements.end(), _compare); 66 } push(BOOST_RV_REF (Type)element)67 void push(BOOST_RV_REF(Type) element) 68 { 69 _elements.push_back(boost::move(element)); 70 std::push_heap(_elements.begin(), _elements.end(), _compare); 71 } 72 pop()73 void pop() 74 { 75 std::pop_heap(_elements.begin(), _elements.end(), _compare); 76 _elements.pop_back(); 77 } pull()78 Type pull() 79 { 80 Type result = boost::move(_elements.front()); 81 pop(); 82 return boost::move(result); 83 } 84 top() const85 Type const& top() const 86 { 87 return _elements.front(); 88 } 89 }; 90 } 91 92 namespace concurrent 93 { 94 template <class ValueType, 95 class Container = csbl::vector<ValueType>, 96 class Compare = std::less<typename Container::value_type> > 97 class sync_priority_queue 98 : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > 99 { 100 typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super; 101 102 public: 103 typedef ValueType value_type; 104 //typedef typename super::value_type value_type; // fixme 105 typedef typename super::underlying_queue_type underlying_queue_type; 106 typedef typename super::size_type size_type; 107 typedef typename super::op_status op_status; 108 109 typedef chrono::steady_clock clock; 110 protected: 111 112 public: sync_priority_queue()113 sync_priority_queue() {} 114 ~sync_priority_queue()115 ~sync_priority_queue() 116 { 117 if(!super::closed()) 118 { 119 super::close(); 120 } 121 } 122 123 void push(const ValueType& elem); 124 void push(BOOST_THREAD_RV_REF(ValueType) elem); 125 126 queue_op_status try_push(const ValueType& elem); 127 queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem); 128 129 ValueType pull(); 130 131 void pull(ValueType&); 132 133 template <class WClock, class Duration> 134 queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&); 135 template <class Rep, class Period> 136 queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&); 137 138 queue_op_status try_pull(ValueType& elem); 139 queue_op_status wait_pull(ValueType& elem); 140 queue_op_status nonblocking_pull(ValueType&); 141 142 private: 143 void push(unique_lock<mutex>&, const ValueType& elem); 144 void push(lock_guard<mutex>&, const ValueType& elem); 145 void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); 146 void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); 147 148 queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem); 149 queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); 150 151 ValueType pull(unique_lock<mutex>&); 152 ValueType pull(lock_guard<mutex>&); 153 154 void pull(unique_lock<mutex>&, ValueType&); 155 void pull(lock_guard<mutex>&, ValueType&); 156 157 queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem); 158 queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem); 159 160 queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem); 161 162 queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&); 163 164 sync_priority_queue(const sync_priority_queue&); 165 sync_priority_queue& operator= (const sync_priority_queue&); 166 sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue)); 167 sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue)); 168 }; //end class 169 170 171 ////////////////////// 172 template <class T, class Container,class Cmp> push(unique_lock<mutex> & lk,const T & elem)173 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem) 174 { 175 super::throw_if_closed(lk); 176 super::data_.push(elem); 177 super::notify_elem_added(lk); 178 } 179 template <class T, class Container,class Cmp> push(lock_guard<mutex> & lk,const T & elem)180 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem) 181 { 182 super::throw_if_closed(lk); 183 super::data_.push(elem); 184 super::notify_elem_added(lk); 185 } 186 template <class T, class Container,class Cmp> push(const T & elem)187 void sync_priority_queue<T,Container,Cmp>::push(const T& elem) 188 { 189 lock_guard<mutex> lk(super::mtx_); 190 push(lk, elem); 191 } 192 193 ////////////////////// 194 template <class T, class Container,class Cmp> push(unique_lock<mutex> & lk,BOOST_THREAD_RV_REF (T)elem)195 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem) 196 { 197 super::throw_if_closed(lk); 198 super::data_.push(boost::move(elem)); 199 super::notify_elem_added(lk); 200 } 201 template <class T, class Container,class Cmp> push(lock_guard<mutex> & lk,BOOST_THREAD_RV_REF (T)elem)202 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem) 203 { 204 super::throw_if_closed(lk); 205 super::data_.push(boost::move(elem)); 206 super::notify_elem_added(lk); 207 } 208 template <class T, class Container,class Cmp> push(BOOST_THREAD_RV_REF (T)elem)209 void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem) 210 { 211 lock_guard<mutex> lk(super::mtx_); 212 push(lk, boost::move(elem)); 213 } 214 215 ////////////////////// 216 template <class T, class Container,class Cmp> try_push(const T & elem)217 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem) 218 { 219 lock_guard<mutex> lk(super::mtx_); 220 if (super::closed(lk)) return queue_op_status::closed; 221 push(lk, elem); 222 return queue_op_status::success; 223 } 224 225 ////////////////////// 226 template <class T, class Container,class Cmp> try_push(BOOST_THREAD_RV_REF (T)elem)227 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem) 228 { 229 lock_guard<mutex> lk(super::mtx_); 230 if (super::closed(lk)) return queue_op_status::closed; 231 push(lk, boost::move(elem)); 232 233 return queue_op_status::success; 234 } 235 236 ////////////////////// 237 template <class T,class Container, class Cmp> pull(unique_lock<mutex> &)238 T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&) 239 { 240 return super::data_.pull(); 241 } 242 template <class T,class Container, class Cmp> pull(lock_guard<mutex> &)243 T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&) 244 { 245 return super::data_.pull(); 246 } 247 248 template <class T,class Container, class Cmp> pull()249 T sync_priority_queue<T,Container,Cmp>::pull() 250 { 251 unique_lock<mutex> lk(super::mtx_); 252 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); 253 if (has_been_closed) super::throw_if_closed(lk); 254 return pull(lk); 255 } 256 257 ////////////////////// 258 template <class T,class Container, class Cmp> pull(unique_lock<mutex> &,T & elem)259 void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem) 260 { 261 elem = super::data_.pull(); 262 } 263 template <class T,class Container, class Cmp> pull(lock_guard<mutex> &,T & elem)264 void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem) 265 { 266 elem = super::data_.pull(); 267 } 268 269 template <class T,class Container, class Cmp> pull(T & elem)270 void sync_priority_queue<T,Container,Cmp>::pull(T& elem) 271 { 272 unique_lock<mutex> lk(super::mtx_); 273 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); 274 if (has_been_closed) super::throw_if_closed(lk); 275 pull(lk, elem); 276 } 277 278 ////////////////////// 279 template <class T, class Cont,class Cmp> 280 template <class WClock, class Duration> 281 queue_op_status pull_until(const chrono::time_point<WClock,Duration> & tp,T & elem)282 sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem) 283 { 284 unique_lock<mutex> lk(super::mtx_); 285 const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp); 286 if (rc == queue_op_status::success) pull(lk, elem); 287 return rc; 288 } 289 290 ////////////////////// 291 template <class T, class Cont,class Cmp> 292 template <class Rep, class Period> 293 queue_op_status pull_for(const chrono::duration<Rep,Period> & dura,T & elem)294 sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem) 295 { 296 return pull_until(chrono::steady_clock::now() + dura, elem); 297 } 298 299 ////////////////////// 300 template <class T, class Container,class Cmp> 301 queue_op_status try_pull(unique_lock<mutex> & lk,T & elem)302 sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem) 303 { 304 if (super::empty(lk)) 305 { 306 if (super::closed(lk)) return queue_op_status::closed; 307 return queue_op_status::empty; 308 } 309 pull(lk, elem); 310 return queue_op_status::success; 311 } 312 313 template <class T, class Container,class Cmp> 314 queue_op_status try_pull(lock_guard<mutex> & lk,T & elem)315 sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem) 316 { 317 if (super::empty(lk)) 318 { 319 if (super::closed(lk)) return queue_op_status::closed; 320 return queue_op_status::empty; 321 } 322 pull(lk, elem); 323 return queue_op_status::success; 324 } 325 326 template <class T, class Container,class Cmp> 327 queue_op_status try_pull(T & elem)328 sync_priority_queue<T,Container,Cmp>::try_pull(T& elem) 329 { 330 lock_guard<mutex> lk(super::mtx_); 331 return try_pull(lk, elem); 332 } 333 334 ////////////////////// 335 template <class T,class Container, class Cmp> wait_pull(unique_lock<mutex> & lk,T & elem)336 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem) 337 { 338 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); 339 if (has_been_closed) return queue_op_status::closed; 340 pull(lk, elem); 341 return queue_op_status::success; 342 } 343 344 template <class T,class Container, class Cmp> wait_pull(T & elem)345 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem) 346 { 347 unique_lock<mutex> lk(super::mtx_); 348 return wait_pull(lk, elem); 349 } 350 351 ////////////////////// 352 template <class T,class Container, class Cmp> nonblocking_pull(T & elem)353 queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem) 354 { 355 unique_lock<mutex> lk(super::mtx_, try_to_lock); 356 if (!lk.owns_lock()) return queue_op_status::busy; 357 return try_pull(lk, elem); 358 } 359 360 361 362 } //end concurrent namespace 363 364 using concurrent::sync_priority_queue; 365 366 } //end boost namespace 367 #include <boost/config/abi_suffix.hpp> 368 369 #endif 370