• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP
2 #define BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP
3 
4 //////////////////////////////////////////////////////////////////////////////
5 //
6 // (C) Copyright Vicente J. Botet Escriba 2013-2017. Distributed under the Boost
7 // Software License, Version 1.0. (See accompanying file
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 // See http://www.boost.org/libs/thread for documentation.
11 //
12 //////////////////////////////////////////////////////////////////////////////
13 
14 #include <boost/bind/bind.hpp>
15 
16 #include <boost/thread/detail/config.hpp>
17 #include <boost/thread/condition_variable.hpp>
18 #include <boost/thread/detail/move.hpp>
19 #include <boost/thread/mutex.hpp>
20 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
21 
22 #include <boost/chrono/time_point.hpp>
23 #include <boost/throw_exception.hpp>
24 
25 #include <boost/config/abi_prefix.hpp>
26 
27 namespace boost
28 {
29 namespace concurrent
30 {
31 namespace detail
32 {
33 
34   template <class ValueType, class Queue>
35   class sync_queue_base
36   {
37   public:
38     typedef ValueType value_type;
39     typedef Queue underlying_queue_type;
40     typedef typename Queue::size_type size_type;
41     typedef queue_op_status op_status;
42 
43     // Constructors/Assignment/Destructors
44     BOOST_THREAD_NO_COPYABLE(sync_queue_base)
45     inline sync_queue_base();
46     //template <typename Range>
47     //inline explicit sync_queue(Range range);
48     inline ~sync_queue_base();
49 
50     // Observers
51     inline bool empty() const;
52     inline bool full() const;
53     inline size_type size() const;
54     inline bool closed() const;
55 
56     // Modifiers
57     inline void close();
58 
underlying_queue()59     inline underlying_queue_type underlying_queue() {
60       lock_guard<mutex> lk(mtx_);
61       return boost::move(data_);
62     }
63 
64   protected:
65     mutable mutex mtx_;
66     condition_variable cond_;
67     underlying_queue_type data_;
68     bool closed_;
69 
empty(unique_lock<mutex> &) const70     inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
71     {
72       return data_.empty();
73     }
empty(lock_guard<mutex> &) const74     inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
75     {
76       return data_.empty();
77     }
78 
size(lock_guard<mutex> &) const79     inline size_type size(lock_guard<mutex>& ) const BOOST_NOEXCEPT
80     {
81       return data_.size();
82     }
83     inline bool closed(unique_lock<mutex>& lk) const;
84     inline bool closed(lock_guard<mutex>& lk) const;
85 
86     inline void throw_if_closed(unique_lock<mutex>&);
87     inline void throw_if_closed(lock_guard<mutex>&);
88 
89     inline bool not_empty_or_closed(unique_lock<mutex>& ) const;
90 
91     inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
92     template <class WClock, class Duration>
93     queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
94 
notify_elem_added(unique_lock<mutex> &)95     inline void notify_elem_added(unique_lock<mutex>& )
96     {
97       cond_.notify_all();
98     }
notify_elem_added(lock_guard<mutex> &)99     inline void notify_elem_added(lock_guard<mutex>& )
100     {
101       cond_.notify_all();
102     }
103 
104   };
105 
106   template <class ValueType, class Queue>
sync_queue_base()107   sync_queue_base<ValueType, Queue>::sync_queue_base() :
108     data_(), closed_(false)
109   {
110     BOOST_ASSERT(data_.empty());
111   }
112 
113   template <class ValueType, class Queue>
~sync_queue_base()114   sync_queue_base<ValueType, Queue>::~sync_queue_base()
115   {
116   }
117 
118   template <class ValueType, class Queue>
close()119   void sync_queue_base<ValueType, Queue>::close()
120   {
121     {
122       lock_guard<mutex> lk(mtx_);
123       closed_ = true;
124     }
125     cond_.notify_all();
126   }
127 
128   template <class ValueType, class Queue>
closed() const129   bool sync_queue_base<ValueType, Queue>::closed() const
130   {
131     lock_guard<mutex> lk(mtx_);
132     return closed(lk);
133   }
134   template <class ValueType, class Queue>
closed(unique_lock<mutex> &) const135   bool sync_queue_base<ValueType, Queue>::closed(unique_lock<mutex>&) const
136   {
137     return closed_;
138   }
139   template <class ValueType, class Queue>
closed(lock_guard<mutex> &) const140   bool sync_queue_base<ValueType, Queue>::closed(lock_guard<mutex>&) const
141   {
142     return closed_;
143   }
144 
145   template <class ValueType, class Queue>
empty() const146   bool sync_queue_base<ValueType, Queue>::empty() const
147   {
148     lock_guard<mutex> lk(mtx_);
149     return empty(lk);
150   }
151   template <class ValueType, class Queue>
full() const152   bool sync_queue_base<ValueType, Queue>::full() const
153   {
154     return false;
155   }
156 
157   template <class ValueType, class Queue>
size() const158   typename sync_queue_base<ValueType, Queue>::size_type sync_queue_base<ValueType, Queue>::size() const
159   {
160     lock_guard<mutex> lk(mtx_);
161     return size(lk);
162   }
163 
164   template <class ValueType, class Queue>
throw_if_closed(unique_lock<mutex> & lk)165   void sync_queue_base<ValueType, Queue>::throw_if_closed(unique_lock<mutex>& lk)
166   {
167     if (closed(lk))
168     {
169       BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
170     }
171   }
172   template <class ValueType, class Queue>
throw_if_closed(lock_guard<mutex> & lk)173   void sync_queue_base<ValueType, Queue>::throw_if_closed(lock_guard<mutex>& lk)
174   {
175     if (closed(lk))
176     {
177       BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
178     }
179   }
180 
181   template <class ValueType, class Queue>
not_empty_or_closed(unique_lock<mutex> &) const182   bool sync_queue_base<ValueType, Queue>::not_empty_or_closed(unique_lock<mutex>& ) const
183   {
184     return ! data_.empty() || closed_;
185   }
186 
187   template <class ValueType, class Queue>
wait_until_not_empty_or_closed(unique_lock<mutex> & lk)188   bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
189   {
190     cond_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
191     if (! empty(lk)) return false; // success
192     return true; // closed
193   }
194 
195   template <class ValueType, class Queue>
196   template <class WClock, class Duration>
wait_until_not_empty_or_closed_until(unique_lock<mutex> & lk,chrono::time_point<WClock,Duration> const & tp)197   queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
198   {
199     if (! cond_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
200       return queue_op_status::timeout;
201     if (! empty(lk)) return queue_op_status::success;
202     return queue_op_status::closed;
203   }
204 
205 } // detail
206 } // concurrent
207 } // boost
208 
209 #include <boost/config/abi_suffix.hpp>
210 
211 #endif
212