• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *          Copyright Andrey Semashev 2007 - 2015.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 /*!
8  * \file   bounded_ordering_queue.hpp
9  * \author Andrey Semashev
10  * \date   06.01.2012
11  *
12  * The header contains implementation of bounded ordering queueing strategy for
13  * the asynchronous sink frontend.
14  */
15 
16 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
17 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
18 
19 #include <boost/log/detail/config.hpp>
20 
21 #ifdef BOOST_HAS_PRAGMA_ONCE
22 #pragma once
23 #endif
24 
25 #if defined(BOOST_LOG_NO_THREADS)
26 #error Boost.Log: This header content is only supported in multithreaded environment
27 #endif
28 
29 #include <cstddef>
30 #include <queue>
31 #include <vector>
32 #include <boost/cstdint.hpp>
33 #include <boost/thread/locks.hpp>
34 #include <boost/thread/mutex.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #include <boost/thread/thread_time.hpp>
37 #include <boost/date_time/posix_time/posix_time_types.hpp>
38 #include <boost/log/detail/timestamp.hpp>
39 #include <boost/log/detail/enqueued_record.hpp>
40 #include <boost/log/keywords/order.hpp>
41 #include <boost/log/keywords/ordering_window.hpp>
42 #include <boost/log/core/record_view.hpp>
43 #include <boost/log/detail/header.hpp>
44 
45 namespace boost {
46 
47 BOOST_LOG_OPEN_NAMESPACE
48 
49 namespace sinks {
50 
51 /*!
52  * \brief Bounded ordering log record queueing strategy
53  *
54  * The \c bounded_ordering_queue class is intended to be used with
55  * the \c asynchronous_sink frontend as a log record queueing strategy.
56  *
57  * This strategy provides the following properties to the record queueing mechanism:
58  *
59  * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter.
60  * \li Upon reaching the size limit, the queue invokes the overflow handling strategy
61  *     specified in the \c OverflowStrategyT template parameter to handle the situation.
62  *     The library provides overflow handling strategies for most common cases:
63  *     \c drop_on_overflow will silently discard the log record, and \c block_on_overflow
64  *     will put the enqueueing thread to wait until there is space in the queue.
65  * \li The queue has a fixed latency window. This means that each log record put
66  *     into the queue will normally not be dequeued for a certain period of time.
67  * \li The queue performs stable record ordering within the latency window.
68  *     The ordering predicate can be specified in the \c OrderT template parameter.
69  */
70 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
71 class bounded_ordering_queue :
72     private OverflowStrategyT
73 {
74 private:
75     typedef OverflowStrategyT overflow_strategy;
76     typedef boost::mutex mutex_type;
77     typedef sinks::aux::enqueued_record enqueued_record;
78 
79     typedef std::priority_queue<
80         enqueued_record,
81         std::vector< enqueued_record >,
82         enqueued_record::order< OrderT >
83     > queue_type;
84 
85 private:
86     //! Ordering window duration, in milliseconds
87     const uint64_t m_ordering_window;
88     //! Synchronization primitive
89     mutex_type m_mutex;
90     //! Condition to block the consuming thread on
91     condition_variable m_cond;
92     //! Log record queue
93     queue_type m_queue;
94     //! Interruption flag
95     bool m_interruption_requested;
96 
97 public:
98     /*!
99      * Returns ordering window size specified during initialization
100      */
get_ordering_window() const101     posix_time::time_duration get_ordering_window() const
102     {
103         return posix_time::milliseconds(m_ordering_window);
104     }
105 
106     /*!
107      * Returns default ordering window size.
108      * The default window size is specific to the operating system thread scheduling mechanism.
109      */
get_default_ordering_window()110     static posix_time::time_duration get_default_ordering_window()
111     {
112         // The main idea behind this parameter is that the ordering window should be large enough
113         // to allow the frontend to order records from different threads on an attribute
114         // that contains system time. Thus this value should be:
115         // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
116         //   For instance, on Windows it defaults to around 15-16 ms.
117         // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
118         //   switch threads on any known OS. It can be tuned for other platforms as needed.
119         return posix_time::milliseconds(30);
120     }
121 
122 protected:
123     //! Initializing constructor
124     template< typename ArgsT >
bounded_ordering_queue(ArgsT const & args)125     explicit bounded_ordering_queue(ArgsT const& args) :
126         m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
127         m_queue(args[keywords::order]),
128         m_interruption_requested(false)
129     {
130     }
131 
132     //! Enqueues log record to the queue
enqueue(record_view const & rec)133     void enqueue(record_view const& rec)
134     {
135         unique_lock< mutex_type > lock(m_mutex);
136         std::size_t size = m_queue.size();
137         for (; size >= MaxQueueSizeV; size = m_queue.size())
138         {
139             if (!overflow_strategy::on_overflow(rec, lock))
140                 return;
141         }
142 
143         m_queue.push(enqueued_record(rec));
144         if (size == 0)
145             m_cond.notify_one();
146     }
147 
148     //! Attempts to enqueue log record to the queue
try_enqueue(record_view const & rec)149     bool try_enqueue(record_view const& rec)
150     {
151         unique_lock< mutex_type > lock(m_mutex, try_to_lock);
152         if (lock.owns_lock())
153         {
154             const std::size_t size = m_queue.size();
155 
156             // Do not invoke the bounding strategy in case of overflow as it may block
157             if (size < MaxQueueSizeV)
158             {
159                 m_queue.push(enqueued_record(rec));
160                 if (size == 0)
161                     m_cond.notify_one();
162                 return true;
163             }
164         }
165 
166         return false;
167     }
168 
169     //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
try_dequeue_ready(record_view & rec)170     bool try_dequeue_ready(record_view& rec)
171     {
172         lock_guard< mutex_type > lock(m_mutex);
173         const std::size_t size = m_queue.size();
174         if (size > 0)
175         {
176             const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
177             enqueued_record const& elem = m_queue.top();
178             if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
179             {
180                 // We got a new element
181                 rec = elem.m_record;
182                 m_queue.pop();
183                 overflow_strategy::on_queue_space_available();
184                 return true;
185             }
186         }
187 
188         return false;
189     }
190 
191     //! Attempts to dequeue log record from the queue, does not block if the queue is empty
try_dequeue(record_view & rec)192     bool try_dequeue(record_view& rec)
193     {
194         lock_guard< mutex_type > lock(m_mutex);
195         const std::size_t size = m_queue.size();
196         if (size > 0)
197         {
198             enqueued_record const& elem = m_queue.top();
199             rec = elem.m_record;
200             m_queue.pop();
201             overflow_strategy::on_queue_space_available();
202             return true;
203         }
204 
205         return false;
206     }
207 
208     //! Dequeues log record from the queue, blocks if the queue is empty
dequeue_ready(record_view & rec)209     bool dequeue_ready(record_view& rec)
210     {
211         unique_lock< mutex_type > lock(m_mutex);
212 
213         while (!m_interruption_requested)
214         {
215             const std::size_t size = m_queue.size();
216             if (size > 0)
217             {
218                 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
219                 enqueued_record const& elem = m_queue.top();
220                 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
221                 if (difference >= m_ordering_window)
222                 {
223                     rec = elem.m_record;
224                     m_queue.pop();
225                     overflow_strategy::on_queue_space_available();
226                     return true;
227                 }
228                 else
229                 {
230                     // Wait until the element becomes ready to be processed
231                     m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
232                 }
233             }
234             else
235             {
236                 m_cond.wait(lock);
237             }
238         }
239         m_interruption_requested = false;
240 
241         return false;
242     }
243 
244     //! Wakes a thread possibly blocked in the \c dequeue method
interrupt_dequeue()245     void interrupt_dequeue()
246     {
247         lock_guard< mutex_type > lock(m_mutex);
248         m_interruption_requested = true;
249         overflow_strategy::interrupt();
250         m_cond.notify_one();
251     }
252 };
253 
254 } // namespace sinks
255 
256 BOOST_LOG_CLOSE_NAMESPACE // namespace log
257 
258 } // namespace boost
259 
260 #include <boost/log/detail/footer.hpp>
261 
262 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
263