• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_QUEUE_HPP
3 
4 //////////////////////////////////////////////////////////////////////////////
5 //
6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
7 // Software License, Version 1.0. (See accompanying file
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 // See http://www.boost.org/libs/thread for documentation.
11 //
12 //////////////////////////////////////////////////////////////////////////////
13 
14 #include <boost/thread/detail/config.hpp>
15 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
16 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
17 #include <boost/thread/condition_variable.hpp>
18 #include <boost/thread/csbl/devector.hpp>
19 #include <boost/thread/detail/move.hpp>
20 #include <boost/thread/mutex.hpp>
21 
22 #include <boost/throw_exception.hpp>
23 #include <boost/smart_ptr/shared_ptr.hpp>
24 #include <boost/smart_ptr/make_shared.hpp>
25 
26 #include <boost/config/abi_prefix.hpp>
27 
28 namespace boost
29 {
30 namespace concurrent
31 {
32   template <class ValueType, class Container = csbl::devector<ValueType> >
33   class sync_queue
34     : public detail::sync_queue_base<ValueType, Container >
35   {
36     typedef detail::sync_queue_base<ValueType, Container >  super;
37 
38   public:
39     typedef ValueType value_type;
40     //typedef typename super::value_type value_type; // fixme
41     typedef typename super::underlying_queue_type underlying_queue_type;
42     typedef typename super::size_type size_type;
43     typedef typename super::op_status op_status;
44 
45     // Constructors/Assignment/Destructors
46     BOOST_THREAD_NO_COPYABLE(sync_queue)
47     inline sync_queue();
48     //template <class Range>
49     //inline explicit sync_queue(Range range);
50     inline ~sync_queue();
51 
52     // Modifiers
53     inline void push(const value_type& x);
54     inline queue_op_status try_push(const value_type& x);
55     inline queue_op_status nonblocking_push(const value_type& x);
56     inline queue_op_status wait_push(const value_type& x);
57     inline void push(BOOST_THREAD_RV_REF(value_type) x);
58     inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x);
59     inline queue_op_status nonblocking_push(BOOST_THREAD_RV_REF(value_type) x);
60     inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x);
61 
62     // Observers/Modifiers
63     inline void pull(value_type&);
64     // enable_if is_nothrow_copy_movable<value_type>
65     inline value_type pull();
66 
67     inline queue_op_status try_pull(value_type&);
68     inline queue_op_status nonblocking_pull(value_type&);
69     inline queue_op_status wait_pull(ValueType& elem);
70 
71   private:
72 
73     inline queue_op_status try_pull(value_type& x, unique_lock<mutex>& lk);
74     inline queue_op_status wait_pull(value_type& x, unique_lock<mutex>& lk);
75     inline queue_op_status try_push(const value_type& x, unique_lock<mutex>& lk);
76     inline queue_op_status wait_push(const value_type& x, unique_lock<mutex>& lk);
77     inline queue_op_status try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
78     inline queue_op_status wait_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
79 
pull(value_type & elem,unique_lock<mutex> &)80     inline void pull(value_type& elem, unique_lock<mutex>& )
81     {
82       elem = boost::move(super::data_.front());
83       super::data_.pop_front();
84     }
pull(unique_lock<mutex> &)85     inline value_type pull(unique_lock<mutex>& )
86     {
87       value_type e = boost::move(super::data_.front());
88       super::data_.pop_front();
89       return boost::move(e);
90     }
91 
push(const value_type & elem,unique_lock<mutex> & lk)92     inline void push(const value_type& elem, unique_lock<mutex>& lk)
93     {
94       super::data_.push_back(elem);
95       super::notify_elem_added(lk);
96     }
97 
push(BOOST_THREAD_RV_REF (value_type)elem,unique_lock<mutex> & lk)98     inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk)
99     {
100       super::data_.push_back(boost::move(elem));
101       super::notify_elem_added(lk);
102     }
103   };
104 
105   template <class ValueType, class Container>
sync_queue()106   sync_queue<ValueType, Container>::sync_queue() :
107     super()
108   {
109   }
110 
111 //  template <class ValueType, class Container>
112 //  template <class Range>
113 //  explicit sync_queue<ValueType, Container>::sync_queue(Range range) :
114 //    data_(), closed_(false)
115 //  {
116 //    try
117 //    {
118 //      typedef typename Range::iterator iterator_t;
119 //      iterator_t first = boost::begin(range);
120 //      iterator_t end = boost::end(range);
121 //      for (iterator_t cur = first; cur != end; ++cur)
122 //      {
123 //        data_.push(boost::move(*cur));;
124 //      }
125 //      notify_elem_added(lk);
126 //    }
127 //    catch (...)
128 //    {
129 //      delete[] data_;
130 //    }
131 //  }
132 
133   template <class ValueType, class Container>
~sync_queue()134   sync_queue<ValueType, Container>::~sync_queue()
135   {
136   }
137 
138   template <class ValueType, class Container>
try_pull(ValueType & elem,unique_lock<mutex> & lk)139   queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
140   {
141     if (super::empty(lk))
142     {
143       if (super::closed(lk)) return queue_op_status::closed;
144       return queue_op_status::empty;
145     }
146     pull(elem, lk);
147     return queue_op_status::success;
148   }
149   template <class ValueType, class Container>
wait_pull(ValueType & elem,unique_lock<mutex> & lk)150   queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem, unique_lock<mutex>& lk)
151   {
152     const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
153     if (has_been_closed) return queue_op_status::closed;
154     pull(elem, lk);
155     return queue_op_status::success;
156   }
157 
158   template <class ValueType, class Container>
try_pull(ValueType & elem)159   queue_op_status sync_queue<ValueType, Container>::try_pull(ValueType& elem)
160   {
161     unique_lock<mutex> lk(super::mtx_);
162     return try_pull(elem, lk);
163   }
164 
165   template <class ValueType, class Container>
wait_pull(ValueType & elem)166   queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem)
167   {
168     unique_lock<mutex> lk(super::mtx_);
169     return wait_pull(elem, lk);
170   }
171 
172   template <class ValueType, class Container>
nonblocking_pull(ValueType & elem)173   queue_op_status sync_queue<ValueType, Container>::nonblocking_pull(ValueType& elem)
174   {
175     unique_lock<mutex> lk(super::mtx_, try_to_lock);
176     if (!lk.owns_lock())
177     {
178       return queue_op_status::busy;
179     }
180     return try_pull(elem, lk);
181   }
182 
183   template <class ValueType, class Container>
pull(ValueType & elem)184   void sync_queue<ValueType, Container>::pull(ValueType& elem)
185   {
186       unique_lock<mutex> lk(super::mtx_);
187       const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
188       if (has_been_closed) super::throw_if_closed(lk);
189       pull(elem, lk);
190   }
191 
192   // enable if ValueType is nothrow movable
193   template <class ValueType, class Container>
pull()194   ValueType sync_queue<ValueType, Container>::pull()
195   {
196       unique_lock<mutex> lk(super::mtx_);
197       const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
198       if (has_been_closed) super::throw_if_closed(lk);
199       return pull(lk);
200   }
201 
202   template <class ValueType, class Container>
try_push(const ValueType & elem,unique_lock<mutex> & lk)203   queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
204   {
205     if (super::closed(lk)) return queue_op_status::closed;
206     push(elem, lk);
207     return queue_op_status::success;
208   }
209 
210   template <class ValueType, class Container>
try_push(const ValueType & elem)211   queue_op_status sync_queue<ValueType, Container>::try_push(const ValueType& elem)
212   {
213     unique_lock<mutex> lk(super::mtx_);
214     return try_push(elem, lk);
215   }
216 
217   template <class ValueType, class Container>
wait_push(const ValueType & elem,unique_lock<mutex> & lk)218   queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem, unique_lock<mutex>& lk)
219   {
220     if (super::closed(lk)) return queue_op_status::closed;
221     push(elem, lk);
222     return queue_op_status::success;
223   }
224 
225   template <class ValueType, class Container>
wait_push(const ValueType & elem)226   queue_op_status sync_queue<ValueType, Container>::wait_push(const ValueType& elem)
227   {
228     unique_lock<mutex> lk(super::mtx_);
229     return wait_push(elem, lk);
230   }
231 
232   template <class ValueType, class Container>
nonblocking_push(const ValueType & elem)233   queue_op_status sync_queue<ValueType, Container>::nonblocking_push(const ValueType& elem)
234   {
235     unique_lock<mutex> lk(super::mtx_, try_to_lock);
236     if (!lk.owns_lock()) return queue_op_status::busy;
237     return try_push(elem, lk);
238   }
239 
240   template <class ValueType, class Container>
push(const ValueType & elem)241   void sync_queue<ValueType, Container>::push(const ValueType& elem)
242   {
243       unique_lock<mutex> lk(super::mtx_);
244       super::throw_if_closed(lk);
245       push(elem, lk);
246   }
247 
248   template <class ValueType, class Container>
try_push(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)249   queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
250   {
251     if (super::closed(lk)) return queue_op_status::closed;
252     push(boost::move(elem), lk);
253     return queue_op_status::success;
254   }
255 
256   template <class ValueType, class Container>
try_push(BOOST_THREAD_RV_REF (ValueType)elem)257   queue_op_status sync_queue<ValueType, Container>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
258   {
259     unique_lock<mutex> lk(super::mtx_);
260     return try_push(boost::move(elem), lk);
261   }
262 
263   template <class ValueType, class Container>
wait_push(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)264   queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
265   {
266     if (super::closed(lk)) return queue_op_status::closed;
267     push(boost::move(elem), lk);
268     return queue_op_status::success;
269   }
270 
271   template <class ValueType, class Container>
wait_push(BOOST_THREAD_RV_REF (ValueType)elem)272   queue_op_status sync_queue<ValueType, Container>::wait_push(BOOST_THREAD_RV_REF(ValueType) elem)
273   {
274     unique_lock<mutex> lk(super::mtx_);
275     return wait_push(boost::move(elem), lk);
276   }
277 
278   template <class ValueType, class Container>
nonblocking_push(BOOST_THREAD_RV_REF (ValueType)elem)279   queue_op_status sync_queue<ValueType, Container>::nonblocking_push(BOOST_THREAD_RV_REF(ValueType) elem)
280   {
281     unique_lock<mutex> lk(super::mtx_, try_to_lock);
282     if (!lk.owns_lock())
283     {
284       return queue_op_status::busy;
285     }
286     return try_push(boost::move(elem), lk);
287   }
288 
289   template <class ValueType, class Container>
push(BOOST_THREAD_RV_REF (ValueType)elem)290   void sync_queue<ValueType, Container>::push(BOOST_THREAD_RV_REF(ValueType) elem)
291   {
292       unique_lock<mutex> lk(super::mtx_);
293       super::throw_if_closed(lk);
294       push(boost::move(elem), lk);
295   }
296 
297   template <class ValueType, class Container>
operator <<(sync_queue<ValueType,Container> & sbq,BOOST_THREAD_RV_REF (ValueType)elem)298   sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
299   {
300     sbq.push(boost::move(elem));
301     return sbq;
302   }
303 
304   template <class ValueType, class Container>
operator <<(sync_queue<ValueType,Container> & sbq,ValueType const & elem)305   sync_queue<ValueType, Container>& operator<<(sync_queue<ValueType, Container>& sbq, ValueType const&elem)
306   {
307     sbq.push(elem);
308     return sbq;
309   }
310 
311   template <class ValueType, class Container>
operator >>(sync_queue<ValueType,Container> & sbq,ValueType & elem)312   sync_queue<ValueType, Container>& operator>>(sync_queue<ValueType, Container>& sbq, ValueType &elem)
313   {
314     sbq.pull(elem);
315     return sbq;
316   }
317 
318 }
319 using concurrent::sync_queue;
320 
321 }
322 
323 #include <boost/config/abi_suffix.hpp>
324 
325 #endif
326