• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  // Copyright (C) 2014 Ian Forbed
2  // Copyright (C) 2014-2017 Vicente J. Botet Escriba
3  //
4  //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5  //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  //
7  
8  #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
9  #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
10  
11  #include <boost/thread/detail/config.hpp>
12  
13  #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
14  #include <boost/thread/concurrent_queues/queue_op_status.hpp>
15  #include <boost/thread/condition_variable.hpp>
16  #include <boost/thread/csbl/vector.hpp>
17  #include <boost/thread/detail/move.hpp>
18  #include <boost/thread/mutex.hpp>
19  
20  #include <boost/atomic.hpp>
21  #include <boost/chrono/duration.hpp>
22  #include <boost/chrono/time_point.hpp>
23  
24  #include <exception>
25  #include <queue>
26  #include <utility>
27  
28  #include <boost/config/abi_prefix.hpp>
29  
30  namespace boost
31  {
32  namespace detail {
33  
34    template <
35      class Type,
36      class Container = csbl::vector<Type>,
37      class Compare = std::less<Type>
38    >
39    class priority_queue
40    {
41    private:
42        Container _elements;
43        Compare _compare;
44    public:
45        typedef Type value_type;
46        typedef typename Container::size_type size_type;
47  
priority_queue(const Compare & compare=Compare ())48        explicit priority_queue(const Compare& compare = Compare())
49            : _elements(), _compare(compare)
50        { }
51  
size() const52        size_type size() const
53        {
54            return _elements.size();
55        }
56  
empty() const57        bool empty() const
58        {
59            return _elements.empty();
60        }
61  
push(Type const & element)62        void push(Type const& element)
63        {
64            _elements.push_back(element);
65            std::push_heap(_elements.begin(), _elements.end(), _compare);
66        }
push(BOOST_RV_REF (Type)element)67        void push(BOOST_RV_REF(Type) element)
68        {
69            _elements.push_back(boost::move(element));
70            std::push_heap(_elements.begin(), _elements.end(), _compare);
71        }
72  
pop()73        void pop()
74        {
75            std::pop_heap(_elements.begin(), _elements.end(), _compare);
76            _elements.pop_back();
77        }
pull()78        Type pull()
79        {
80            Type result = boost::move(_elements.front());
81            pop();
82            return boost::move(result);
83        }
84  
top() const85        Type const& top() const
86        {
87            return _elements.front();
88        }
89    };
90  }
91  
92  namespace concurrent
93  {
94    template <class ValueType,
95              class Container = csbl::vector<ValueType>,
96              class Compare = std::less<typename Container::value_type> >
97    class sync_priority_queue
98      : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
99    {
100      typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >  super;
101  
102    public:
103      typedef ValueType value_type;
104      //typedef typename super::value_type value_type; // fixme
105      typedef typename super::underlying_queue_type underlying_queue_type;
106      typedef typename super::size_type size_type;
107      typedef typename super::op_status op_status;
108  
109      typedef chrono::steady_clock clock;
110    protected:
111  
112    public:
sync_priority_queue()113      sync_priority_queue() {}
114  
~sync_priority_queue()115      ~sync_priority_queue()
116      {
117        if(!super::closed())
118        {
119          super::close();
120        }
121      }
122  
123      void push(const ValueType& elem);
124      void push(BOOST_THREAD_RV_REF(ValueType) elem);
125  
126      queue_op_status try_push(const ValueType& elem);
127      queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
128  
129      ValueType pull();
130  
131      void pull(ValueType&);
132  
133      template <class WClock, class Duration>
134      queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
135      template <class Rep, class Period>
136      queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
137  
138      queue_op_status try_pull(ValueType& elem);
139      queue_op_status wait_pull(ValueType& elem);
140      queue_op_status nonblocking_pull(ValueType&);
141  
142    private:
143      void push(unique_lock<mutex>&, const ValueType& elem);
144      void push(lock_guard<mutex>&, const ValueType& elem);
145      void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
146      void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
147  
148      queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
149      queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
150  
151      ValueType pull(unique_lock<mutex>&);
152      ValueType pull(lock_guard<mutex>&);
153  
154      void pull(unique_lock<mutex>&, ValueType&);
155      void pull(lock_guard<mutex>&, ValueType&);
156  
157      queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
158      queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
159  
160      queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
161  
162      queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
163  
164      sync_priority_queue(const sync_priority_queue&);
165      sync_priority_queue& operator= (const sync_priority_queue&);
166      sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
167      sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
168    }; //end class
169  
170  
171    //////////////////////
172    template <class T, class Container,class Cmp>
push(unique_lock<mutex> & lk,const T & elem)173    void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
174    {
175      super::throw_if_closed(lk);
176      super::data_.push(elem);
177      super::notify_elem_added(lk);
178    }
179    template <class T, class Container,class Cmp>
push(lock_guard<mutex> & lk,const T & elem)180    void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
181    {
182      super::throw_if_closed(lk);
183      super::data_.push(elem);
184      super::notify_elem_added(lk);
185    }
186    template <class T, class Container,class Cmp>
push(const T & elem)187    void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
188    {
189      lock_guard<mutex> lk(super::mtx_);
190      push(lk, elem);
191    }
192  
193    //////////////////////
194    template <class T, class Container,class Cmp>
push(unique_lock<mutex> & lk,BOOST_THREAD_RV_REF (T)elem)195    void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
196    {
197      super::throw_if_closed(lk);
198      super::data_.push(boost::move(elem));
199      super::notify_elem_added(lk);
200    }
201    template <class T, class Container,class Cmp>
push(lock_guard<mutex> & lk,BOOST_THREAD_RV_REF (T)elem)202    void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
203    {
204      super::throw_if_closed(lk);
205      super::data_.push(boost::move(elem));
206      super::notify_elem_added(lk);
207    }
208    template <class T, class Container,class Cmp>
push(BOOST_THREAD_RV_REF (T)elem)209    void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
210    {
211      lock_guard<mutex> lk(super::mtx_);
212      push(lk, boost::move(elem));
213    }
214  
215    //////////////////////
216    template <class T, class Container,class Cmp>
try_push(const T & elem)217    queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
218    {
219      lock_guard<mutex> lk(super::mtx_);
220      if (super::closed(lk)) return queue_op_status::closed;
221      push(lk, elem);
222      return queue_op_status::success;
223    }
224  
225    //////////////////////
226    template <class T, class Container,class Cmp>
try_push(BOOST_THREAD_RV_REF (T)elem)227    queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
228    {
229      lock_guard<mutex> lk(super::mtx_);
230      if (super::closed(lk)) return queue_op_status::closed;
231      push(lk, boost::move(elem));
232  
233      return queue_op_status::success;
234    }
235  
236    //////////////////////
237    template <class T,class Container, class Cmp>
pull(unique_lock<mutex> &)238    T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
239    {
240      return super::data_.pull();
241    }
242    template <class T,class Container, class Cmp>
pull(lock_guard<mutex> &)243    T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
244    {
245      return super::data_.pull();
246    }
247  
248    template <class T,class Container, class Cmp>
pull()249    T sync_priority_queue<T,Container,Cmp>::pull()
250    {
251      unique_lock<mutex> lk(super::mtx_);
252      const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
253      if (has_been_closed) super::throw_if_closed(lk);
254      return pull(lk);
255    }
256  
257    //////////////////////
258    template <class T,class Container, class Cmp>
pull(unique_lock<mutex> &,T & elem)259    void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
260    {
261      elem = super::data_.pull();
262    }
263    template <class T,class Container, class Cmp>
pull(lock_guard<mutex> &,T & elem)264    void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
265    {
266      elem = super::data_.pull();
267    }
268  
269    template <class T,class Container, class Cmp>
pull(T & elem)270    void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
271    {
272      unique_lock<mutex> lk(super::mtx_);
273      const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
274      if (has_been_closed) super::throw_if_closed(lk);
275      pull(lk, elem);
276    }
277  
278    //////////////////////
279    template <class T, class Cont,class Cmp>
280    template <class WClock, class Duration>
281    queue_op_status
pull_until(const chrono::time_point<WClock,Duration> & tp,T & elem)282    sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
283    {
284      unique_lock<mutex> lk(super::mtx_);
285      const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp);
286      if (rc == queue_op_status::success) pull(lk, elem);
287      return rc;
288    }
289  
290    //////////////////////
291    template <class T, class Cont,class Cmp>
292    template <class Rep, class Period>
293    queue_op_status
pull_for(const chrono::duration<Rep,Period> & dura,T & elem)294    sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
295    {
296      return pull_until(chrono::steady_clock::now() + dura, elem);
297    }
298  
299    //////////////////////
300    template <class T, class Container,class Cmp>
301    queue_op_status
try_pull(unique_lock<mutex> & lk,T & elem)302    sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
303    {
304      if (super::empty(lk))
305      {
306        if (super::closed(lk)) return queue_op_status::closed;
307        return queue_op_status::empty;
308      }
309      pull(lk, elem);
310      return queue_op_status::success;
311    }
312  
313    template <class T, class Container,class Cmp>
314    queue_op_status
try_pull(lock_guard<mutex> & lk,T & elem)315    sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
316    {
317      if (super::empty(lk))
318      {
319        if (super::closed(lk)) return queue_op_status::closed;
320        return queue_op_status::empty;
321      }
322      pull(lk, elem);
323      return queue_op_status::success;
324    }
325  
326    template <class T, class Container,class Cmp>
327    queue_op_status
try_pull(T & elem)328    sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
329    {
330      lock_guard<mutex> lk(super::mtx_);
331      return try_pull(lk, elem);
332    }
333  
334    //////////////////////
335    template <class T,class Container, class Cmp>
wait_pull(unique_lock<mutex> & lk,T & elem)336    queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
337    {
338      const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
339      if (has_been_closed) return queue_op_status::closed;
340      pull(lk, elem);
341      return queue_op_status::success;
342    }
343  
344    template <class T,class Container, class Cmp>
wait_pull(T & elem)345    queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
346    {
347      unique_lock<mutex> lk(super::mtx_);
348      return wait_pull(lk, elem);
349    }
350  
351    //////////////////////
352    template <class T,class Container, class Cmp>
nonblocking_pull(T & elem)353    queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
354    {
355      unique_lock<mutex> lk(super::mtx_, try_to_lock);
356      if (!lk.owns_lock()) return queue_op_status::busy;
357      return try_pull(lk, elem);
358    }
359  
360  
361  
362  } //end concurrent namespace
363  
364  using concurrent::sync_priority_queue;
365  
366  } //end boost namespace
367  #include <boost/config/abi_suffix.hpp>
368  
369  #endif
370