1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP 2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP 3 4 ////////////////////////////////////////////////////////////////////////////// 5 // 6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost 7 // Software License, Version 1.0. (See accompanying file 8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 // See http://www.boost.org/libs/thread for documentation. 11 // 12 ////////////////////////////////////////////////////////////////////////////// 13 14 #include <boost/thread/detail/config.hpp> 15 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp> 16 #include <boost/thread/concurrent_queues/queue_op_status.hpp> 17 #include <boost/thread/condition_variable.hpp> 18 #include <boost/thread/csbl/devector.hpp> 19 #include <boost/thread/detail/move.hpp> 20 #include <boost/thread/mutex.hpp> 21 22 #include <boost/throw_exception.hpp> 23 #include <boost/smart_ptr/shared_ptr.hpp> 24 #include <boost/smart_ptr/make_shared.hpp> 25 26 #include <boost/config/abi_prefix.hpp> 27 28 namespace boost 29 { 30 namespace concurrent 31 { 32 template <class ValueType, class Container = csbl::devector<ValueType> > 33 class sync_queue 34 : public detail::sync_queue_base<ValueType, Container > 35 { 36 typedef detail::sync_queue_base<ValueType, Container > super; 37 38 public: 39 typedef ValueType value_type; 40 //typedef typename super::value_type value_type; // fixme 41 typedef typename super::underlying_queue_type underlying_queue_type; 42 typedef typename super::size_type size_type; 43 typedef typename super::op_status op_status; 44 45 // Constructors/Assignment/Destructors 46 BOOST_THREAD_NO_COPYABLE(sync_queue) 47 inline sync_queue(); 48 //template <class Range> 49 //inline explicit sync_queue(Range range); 50 inline ~sync_queue(); 51 52 // Modifiers 53 inline void push(const value_type& x); 54 inline queue_op_status try_push(const value_type& x); 55 inline queue_op_status nonblocking_push(const value_type& x); 56 inline queue_op_status wait_push(const value_type& x); 57 inline void push(BOOST_THREAD_RV_REF(value_type) x); 58 inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x); 59 inline queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x); 60 inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x); 61 62 // Observers/Modifiers 63 inline void pull(value_type&); 64 // enable_if is_nothrow_copy_movable<value_type> 65 inline value_type pull(); 66 67 inline queue_op_status try_pull(value_type&); 68 inline queue_op_status nonblocking_pull(value_type&); 69 inline queue_op_status wait_pull(ValueType& elem); 70 71 private: 72 73 inline queue_op_status try_pull(value_type& x, unique_lock<mutex>& lk); 74 inline queue_op_status wait_pull(value_type& x, unique_lock<mutex>& lk); 75 inline queue_op_status try_push(const value_type& x, unique_lock<mutex>& lk); 76 inline queue_op_status wait_push(const value_type& x, unique_lock<mutex>& lk); 77 inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk); 78 inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk); 79 pull(value_type & elem,unique_lock<mutex> &)80 inline void pull(value_type& elem, unique_lock<mutex>& ) 81 { 82 elem = boost::move(super::data_.front()); 83 super::data_.pop_front(); 84 } pull(unique_lock<mutex> &)85 inline value_type pull(unique_lock<mutex>& ) 86 { 87 value_type e = boost::move(super::data_.front()); 88 super::data_.pop_front(); 89 return boost::move(e); 90 } 91 push(const value_type & elem,unique_lock<mutex> & lk)92 inline void push(const value_type& elem, unique_lock<mutex>& lk) 93 { 94 super::data_.push_back(elem); 95 super::notify_elem_added(lk); 96 } 97 push(BOOST_THREAD_RV_REF (value_type)elem,unique_lock<mutex> & lk)98 inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk) 99 { 100 super::data_.push_back(boost::move(elem)); 101 super::notify_elem_added(lk); 102 } 103 }; 104 105 template <class ValueType, class Container> sync_queue()106 sync_queue<ValueType, Container>::sync_queue() : 107 super() 108 { 109 } 110 111 // template <class ValueType, class Container> 112 // template <class Range> 113 // explicit sync_queue<ValueType, Container>::sync_queue(Range range) : 114 // data_(), closed_(false) 115 // { 116 // try 117 // { 118 // typedef typename Range::iterator iterator_t; 119 // iterator_t first = boost::begin(range); 120 // iterator_t end = boost::end(range); 121 // for (iterator_t cur = first; cur != end; ++cur) 122 // { 123 // data_.push(boost::move(*cur));; 124 // } 125 // notify_elem_added(lk); 126 // } 127 // catch (...) 128 // { 129 // delete[] data_; 130 // } 131 // } 132 133 template <class ValueType, class Container> ~sync_queue()134 sync_queue<ValueType, Container>::~sync_queue() 135 { 136 } 137 138 template <class ValueType, class Container> try_pull(ValueType & elem,unique_lock<mutex> & lk)139 queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem, unique_lock<mutex>& lk) 140 { 141 if (super::empty(lk)) 142 { 143 if (super::closed(lk)) return queue_op_status::closed; 144 return queue_op_status::empty; 145 } 146 pull(elem, lk); 147 return queue_op_status::success; 148 } 149 template <class ValueType, class Container> wait_pull(ValueType & elem,unique_lock<mutex> & lk)150 queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem, unique_lock<mutex>& lk) 151 { 152 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); 153 if (has_been_closed) return queue_op_status::closed; 154 pull(elem, lk); 155 return queue_op_status::success; 156 } 157 158 template <class ValueType, class Container> try_pull(ValueType & elem)159 queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem) 160 { 161 unique_lock<mutex> lk(super::mtx_); 162 return try_pull(elem, lk); 163 } 164 165 template <class ValueType, class Container> wait_pull(ValueType & elem)166 queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem) 167 { 168 unique_lock<mutex> lk(super::mtx_); 169 return wait_pull(elem, lk); 170 } 171 172 template <class ValueType, class Container> nonblocking_pull(ValueType & elem)173 queue_op_status sync_queue<ValueType, Container>::nonblocking_pull(ValueType& elem) 174 { 175 unique_lock<mutex> lk(super::mtx_, try_to_lock); 176 if (!lk.owns_lock()) 177 { 178 return queue_op_status::busy; 179 } 180 return try_pull(elem, lk); 181 } 182 183 template <class ValueType, class Container> pull(ValueType & elem)184 void sync_queue<ValueType, Container>::pull(ValueType& elem) 185 { 186 unique_lock<mutex> lk(super::mtx_); 187 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); 188 if (has_been_closed) super::throw_if_closed(lk); 189 pull(elem, lk); 190 } 191 192 // enable if ValueType is nothrow movable 193 template <class ValueType, class Container> pull()194 ValueType sync_queue<ValueType, Container>::pull() 195 { 196 unique_lock<mutex> lk(super::mtx_); 197 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); 198 if (has_been_closed) super::throw_if_closed(lk); 199 return pull(lk); 200 } 201 202 template <class ValueType, class Container> try_push(const ValueType & elem,unique_lock<mutex> & lk)203 queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem, unique_lock<mutex>& lk) 204 { 205 if (super::closed(lk)) return queue_op_status::closed; 206 push(elem, lk); 207 return queue_op_status::success; 208 } 209 210 template <class ValueType, class Container> try_push(const ValueType & elem)211 queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem) 212 { 213 unique_lock<mutex> lk(super::mtx_); 214 return try_push(elem, lk); 215 } 216 217 template <class ValueType, class Container> wait_push(const ValueType & elem,unique_lock<mutex> & lk)218 queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem, unique_lock<mutex>& lk) 219 { 220 if (super::closed(lk)) return queue_op_status::closed; 221 push(elem, lk); 222 return queue_op_status::success; 223 } 224 225 template <class ValueType, class Container> wait_push(const ValueType & elem)226 queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem) 227 { 228 unique_lock<mutex> lk(super::mtx_); 229 return wait_push(elem, lk); 230 } 231 232 template <class ValueType, class Container> nonblocking_push(const ValueType & elem)233 queue_op_status sync_queue<ValueType, Container>::nonblocking_push(const ValueType& elem) 234 { 235 unique_lock<mutex> lk(super::mtx_, try_to_lock); 236 if (!lk.owns_lock()) return queue_op_status::busy; 237 return try_push(elem, lk); 238 } 239 240 template <class ValueType, class Container> push(const ValueType & elem)241 void sync_queue<ValueType, Container>::push(const ValueType& elem) 242 { 243 unique_lock<mutex> lk(super::mtx_); 244 super::throw_if_closed(lk); 245 push(elem, lk); 246 } 247 248 template <class ValueType, class Container> try_push(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)249 queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk) 250 { 251 if (super::closed(lk)) return queue_op_status::closed; 252 push(boost::move(elem), lk); 253 return queue_op_status::success; 254 } 255 256 template <class ValueType, class Container> try_push(BOOST_THREAD_RV_REF (ValueType)elem)257 queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem) 258 { 259 unique_lock<mutex> lk(super::mtx_); 260 return try_push(boost::move(elem), lk); 261 } 262 263 template <class ValueType, class Container> wait_push(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)264 queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk) 265 { 266 if (super::closed(lk)) return queue_op_status::closed; 267 push(boost::move(elem), lk); 268 return queue_op_status::success; 269 } 270 271 template <class ValueType, class Container> wait_push(BOOST_THREAD_RV_REF (ValueType)elem)272 queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem) 273 { 274 unique_lock<mutex> lk(super::mtx_); 275 return wait_push(boost::move(elem), lk); 276 } 277 278 template <class ValueType, class Container> nonblocking_push(BOOST_THREAD_RV_REF (ValueType)elem)279 queue_op_status sync_queue<ValueType, Container>::nonblocking_push(BOOST_THREAD_RV_REF(ValueType) elem) 280 { 281 unique_lock<mutex> lk(super::mtx_, try_to_lock); 282 if (!lk.owns_lock()) 283 { 284 return queue_op_status::busy; 285 } 286 return try_push(boost::move(elem), lk); 287 } 288 289 template <class ValueType, class Container> push(BOOST_THREAD_RV_REF (ValueType)elem)290 void sync_queue<ValueType, Container>::push(BOOST_THREAD_RV_REF(ValueType) elem) 291 { 292 unique_lock<mutex> lk(super::mtx_); 293 super::throw_if_closed(lk); 294 push(boost::move(elem), lk); 295 } 296 297 template <class ValueType, class Container> operator <<(sync_queue<ValueType,Container> & sbq,BOOST_THREAD_RV_REF (ValueType)elem)298 sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, BOOST_THREAD_RV_REF(ValueType) elem) 299 { 300 sbq.push(boost::move(elem)); 301 return sbq; 302 } 303 304 template <class ValueType, class Container> operator <<(sync_queue<ValueType,Container> & sbq,ValueType const & elem)305 sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, ValueType const&elem) 306 { 307 sbq.push(elem); 308 return sbq; 309 } 310 311 template <class ValueType, class Container> operator >>(sync_queue<ValueType,Container> & sbq,ValueType & elem)312 sync_queue<ValueType, Container>& operator>>(sync_queue<ValueType, Container>& sbq, ValueType &elem) 313 { 314 sbq.pull(elem); 315 return sbq; 316 } 317 318 } 319 using concurrent::sync_queue; 320 321 } 322 323 #include <boost/config/abi_suffix.hpp> 324 325 #endif 326