1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP 2 #define BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP 3 4 ////////////////////////////////////////////////////////////////////////////// 5 // 6 // (C) Copyright Vicente J. Botet Escriba 2013-2017. Distributed under the Boost 7 // Software License, Version 1.0. (See accompanying file 8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 // See http://www.boost.org/libs/thread for documentation. 11 // 12 ////////////////////////////////////////////////////////////////////////////// 13 14 #include <boost/bind/bind.hpp> 15 16 #include <boost/thread/detail/config.hpp> 17 #include <boost/thread/condition_variable.hpp> 18 #include <boost/thread/detail/move.hpp> 19 #include <boost/thread/mutex.hpp> 20 #include <boost/thread/concurrent_queues/queue_op_status.hpp> 21 22 #include <boost/chrono/time_point.hpp> 23 #include <boost/throw_exception.hpp> 24 25 #include <boost/config/abi_prefix.hpp> 26 27 namespace boost 28 { 29 namespace concurrent 30 { 31 namespace detail 32 { 33 34 template <class ValueType, class Queue> 35 class sync_queue_base 36 { 37 public: 38 typedef ValueType value_type; 39 typedef Queue underlying_queue_type; 40 typedef typename Queue::size_type size_type; 41 typedef queue_op_status op_status; 42 43 // Constructors/Assignment/Destructors 44 BOOST_THREAD_NO_COPYABLE(sync_queue_base) 45 inline sync_queue_base(); 46 //template <typename Range> 47 //inline explicit sync_queue(Range range); 48 inline ~sync_queue_base(); 49 50 // Observers 51 inline bool empty() const; 52 inline bool full() const; 53 inline size_type size() const; 54 inline bool closed() const; 55 56 // Modifiers 57 inline void close(); 58 underlying_queue()59 inline underlying_queue_type underlying_queue() { 60 lock_guard<mutex> lk(mtx_); 61 return boost::move(data_); 62 } 63 64 protected: 65 mutable mutex mtx_; 66 condition_variable cond_; 67 underlying_queue_type data_; 68 bool closed_; 69 empty(unique_lock<mutex> &) const70 inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT 71 { 72 return data_.empty(); 73 } empty(lock_guard<mutex> &) const74 inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT 75 { 76 return data_.empty(); 77 } 78 size(lock_guard<mutex> &) const79 inline size_type size(lock_guard<mutex>& ) const BOOST_NOEXCEPT 80 { 81 return data_.size(); 82 } 83 inline bool closed(unique_lock<mutex>& lk) const; 84 inline bool closed(lock_guard<mutex>& lk) const; 85 86 inline void throw_if_closed(unique_lock<mutex>&); 87 inline void throw_if_closed(lock_guard<mutex>&); 88 89 inline bool not_empty_or_closed(unique_lock<mutex>& ) const; 90 91 inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk); 92 template <class WClock, class Duration> 93 queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp); 94 notify_elem_added(unique_lock<mutex> &)95 inline void notify_elem_added(unique_lock<mutex>& ) 96 { 97 cond_.notify_all(); 98 } notify_elem_added(lock_guard<mutex> &)99 inline void notify_elem_added(lock_guard<mutex>& ) 100 { 101 cond_.notify_all(); 102 } 103 104 }; 105 106 template <class ValueType, class Queue> sync_queue_base()107 sync_queue_base<ValueType, Queue>::sync_queue_base() : 108 data_(), closed_(false) 109 { 110 BOOST_ASSERT(data_.empty()); 111 } 112 113 template <class ValueType, class Queue> ~sync_queue_base()114 sync_queue_base<ValueType, Queue>::~sync_queue_base() 115 { 116 } 117 118 template <class ValueType, class Queue> close()119 void sync_queue_base<ValueType, Queue>::close() 120 { 121 { 122 lock_guard<mutex> lk(mtx_); 123 closed_ = true; 124 } 125 cond_.notify_all(); 126 } 127 128 template <class ValueType, class Queue> closed() const129 bool sync_queue_base<ValueType, Queue>::closed() const 130 { 131 lock_guard<mutex> lk(mtx_); 132 return closed(lk); 133 } 134 template <class ValueType, class Queue> closed(unique_lock<mutex> &) const135 bool sync_queue_base<ValueType, Queue>::closed(unique_lock<mutex>&) const 136 { 137 return closed_; 138 } 139 template <class ValueType, class Queue> closed(lock_guard<mutex> &) const140 bool sync_queue_base<ValueType, Queue>::closed(lock_guard<mutex>&) const 141 { 142 return closed_; 143 } 144 145 template <class ValueType, class Queue> empty() const146 bool sync_queue_base<ValueType, Queue>::empty() const 147 { 148 lock_guard<mutex> lk(mtx_); 149 return empty(lk); 150 } 151 template <class ValueType, class Queue> full() const152 bool sync_queue_base<ValueType, Queue>::full() const 153 { 154 return false; 155 } 156 157 template <class ValueType, class Queue> size() const158 typename sync_queue_base<ValueType, Queue>::size_type sync_queue_base<ValueType, Queue>::size() const 159 { 160 lock_guard<mutex> lk(mtx_); 161 return size(lk); 162 } 163 164 template <class ValueType, class Queue> throw_if_closed(unique_lock<mutex> & lk)165 void sync_queue_base<ValueType, Queue>::throw_if_closed(unique_lock<mutex>& lk) 166 { 167 if (closed(lk)) 168 { 169 BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); 170 } 171 } 172 template <class ValueType, class Queue> throw_if_closed(lock_guard<mutex> & lk)173 void sync_queue_base<ValueType, Queue>::throw_if_closed(lock_guard<mutex>& lk) 174 { 175 if (closed(lk)) 176 { 177 BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); 178 } 179 } 180 181 template <class ValueType, class Queue> not_empty_or_closed(unique_lock<mutex> &) const182 bool sync_queue_base<ValueType, Queue>::not_empty_or_closed(unique_lock<mutex>& ) const 183 { 184 return ! data_.empty() || closed_; 185 } 186 187 template <class ValueType, class Queue> wait_until_not_empty_or_closed(unique_lock<mutex> & lk)188 bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk) 189 { 190 cond_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))); 191 if (! empty(lk)) return false; // success 192 return true; // closed 193 } 194 195 template <class ValueType, class Queue> 196 template <class WClock, class Duration> wait_until_not_empty_or_closed_until(unique_lock<mutex> & lk,chrono::time_point<WClock,Duration> const & tp)197 queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp) 198 { 199 if (! cond_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)))) 200 return queue_op_status::timeout; 201 if (! empty(lk)) return queue_op_status::success; 202 return queue_op_status::closed; 203 } 204 205 } // detail 206 } // concurrent 207 } // boost 208 209 #include <boost/config/abi_suffix.hpp> 210 211 #endif 212