1 /* 2 * Copyright Andrey Semashev 2007 - 2015. 3 * Distributed under the Boost Software License, Version 1.0. 4 * (See accompanying file LICENSE_1_0.txt or copy at 5 * http://www.boost.org/LICENSE_1_0.txt) 6 */ 7 /*! 8 * \file bounded_ordering_queue.hpp 9 * \author Andrey Semashev 10 * \date 06.01.2012 11 * 12 * The header contains implementation of bounded ordering queueing strategy for 13 * the asynchronous sink frontend. 14 */ 15 16 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ 17 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ 18 19 #include <boost/log/detail/config.hpp> 20 21 #ifdef BOOST_HAS_PRAGMA_ONCE 22 #pragma once 23 #endif 24 25 #if defined(BOOST_LOG_NO_THREADS) 26 #error Boost.Log: This header content is only supported in multithreaded environment 27 #endif 28 29 #include <cstddef> 30 #include <queue> 31 #include <vector> 32 #include <boost/cstdint.hpp> 33 #include <boost/thread/locks.hpp> 34 #include <boost/thread/mutex.hpp> 35 #include <boost/thread/condition_variable.hpp> 36 #include <boost/thread/thread_time.hpp> 37 #include <boost/date_time/posix_time/posix_time_types.hpp> 38 #include <boost/log/detail/timestamp.hpp> 39 #include <boost/log/detail/enqueued_record.hpp> 40 #include <boost/log/keywords/order.hpp> 41 #include <boost/log/keywords/ordering_window.hpp> 42 #include <boost/log/core/record_view.hpp> 43 #include <boost/log/detail/header.hpp> 44 45 namespace boost { 46 47 BOOST_LOG_OPEN_NAMESPACE 48 49 namespace sinks { 50 51 /*! 52 * \brief Bounded ordering log record queueing strategy 53 * 54 * The \c bounded_ordering_queue class is intended to be used with 55 * the \c asynchronous_sink frontend as a log record queueing strategy. 56 * 57 * This strategy provides the following properties to the record queueing mechanism: 58 * 59 * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter. 60 * \li Upon reaching the size limit, the queue invokes the overflow handling strategy 61 * specified in the \c OverflowStrategyT template parameter to handle the situation. 62 * The library provides overflow handling strategies for most common cases: 63 * \c drop_on_overflow will silently discard the log record, and \c block_on_overflow 64 * will put the enqueueing thread to wait until there is space in the queue. 65 * \li The queue has a fixed latency window. This means that each log record put 66 * into the queue will normally not be dequeued for a certain period of time. 67 * \li The queue performs stable record ordering within the latency window. 68 * The ordering predicate can be specified in the \c OrderT template parameter. 69 */ 70 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT > 71 class bounded_ordering_queue : 72 private OverflowStrategyT 73 { 74 private: 75 typedef OverflowStrategyT overflow_strategy; 76 typedef boost::mutex mutex_type; 77 typedef sinks::aux::enqueued_record enqueued_record; 78 79 typedef std::priority_queue< 80 enqueued_record, 81 std::vector< enqueued_record >, 82 enqueued_record::order< OrderT > 83 > queue_type; 84 85 private: 86 //! Ordering window duration, in milliseconds 87 const uint64_t m_ordering_window; 88 //! Synchronization primitive 89 mutex_type m_mutex; 90 //! Condition to block the consuming thread on 91 condition_variable m_cond; 92 //! Log record queue 93 queue_type m_queue; 94 //! Interruption flag 95 bool m_interruption_requested; 96 97 public: 98 /*! 99 * Returns ordering window size specified during initialization 100 */ get_ordering_window() const101 posix_time::time_duration get_ordering_window() const 102 { 103 return posix_time::milliseconds(m_ordering_window); 104 } 105 106 /*! 107 * Returns default ordering window size. 108 * The default window size is specific to the operating system thread scheduling mechanism. 109 */ get_default_ordering_window()110 static posix_time::time_duration get_default_ordering_window() 111 { 112 // The main idea behind this parameter is that the ordering window should be large enough 113 // to allow the frontend to order records from different threads on an attribute 114 // that contains system time. Thus this value should be: 115 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. 116 // For instance, on Windows it defaults to around 15-16 ms. 117 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to 118 // switch threads on any known OS. It can be tuned for other platforms as needed. 119 return posix_time::milliseconds(30); 120 } 121 122 protected: 123 //! Initializing constructor 124 template< typename ArgsT > bounded_ordering_queue(ArgsT const & args)125 explicit bounded_ordering_queue(ArgsT const& args) : 126 m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()), 127 m_queue(args[keywords::order]), 128 m_interruption_requested(false) 129 { 130 } 131 132 //! Enqueues log record to the queue enqueue(record_view const & rec)133 void enqueue(record_view const& rec) 134 { 135 unique_lock< mutex_type > lock(m_mutex); 136 std::size_t size = m_queue.size(); 137 for (; size >= MaxQueueSizeV; size = m_queue.size()) 138 { 139 if (!overflow_strategy::on_overflow(rec, lock)) 140 return; 141 } 142 143 m_queue.push(enqueued_record(rec)); 144 if (size == 0) 145 m_cond.notify_one(); 146 } 147 148 //! Attempts to enqueue log record to the queue try_enqueue(record_view const & rec)149 bool try_enqueue(record_view const& rec) 150 { 151 unique_lock< mutex_type > lock(m_mutex, try_to_lock); 152 if (lock.owns_lock()) 153 { 154 const std::size_t size = m_queue.size(); 155 156 // Do not invoke the bounding strategy in case of overflow as it may block 157 if (size < MaxQueueSizeV) 158 { 159 m_queue.push(enqueued_record(rec)); 160 if (size == 0) 161 m_cond.notify_one(); 162 return true; 163 } 164 } 165 166 return false; 167 } 168 169 //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty try_dequeue_ready(record_view & rec)170 bool try_dequeue_ready(record_view& rec) 171 { 172 lock_guard< mutex_type > lock(m_mutex); 173 const std::size_t size = m_queue.size(); 174 if (size > 0) 175 { 176 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); 177 enqueued_record const& elem = m_queue.top(); 178 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) 179 { 180 // We got a new element 181 rec = elem.m_record; 182 m_queue.pop(); 183 overflow_strategy::on_queue_space_available(); 184 return true; 185 } 186 } 187 188 return false; 189 } 190 191 //! Attempts to dequeue log record from the queue, does not block if the queue is empty try_dequeue(record_view & rec)192 bool try_dequeue(record_view& rec) 193 { 194 lock_guard< mutex_type > lock(m_mutex); 195 const std::size_t size = m_queue.size(); 196 if (size > 0) 197 { 198 enqueued_record const& elem = m_queue.top(); 199 rec = elem.m_record; 200 m_queue.pop(); 201 overflow_strategy::on_queue_space_available(); 202 return true; 203 } 204 205 return false; 206 } 207 208 //! Dequeues log record from the queue, blocks if the queue is empty dequeue_ready(record_view & rec)209 bool dequeue_ready(record_view& rec) 210 { 211 unique_lock< mutex_type > lock(m_mutex); 212 213 while (!m_interruption_requested) 214 { 215 const std::size_t size = m_queue.size(); 216 if (size > 0) 217 { 218 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); 219 enqueued_record const& elem = m_queue.top(); 220 const uint64_t difference = (now - elem.m_timestamp).milliseconds(); 221 if (difference >= m_ordering_window) 222 { 223 rec = elem.m_record; 224 m_queue.pop(); 225 overflow_strategy::on_queue_space_available(); 226 return true; 227 } 228 else 229 { 230 // Wait until the element becomes ready to be processed 231 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); 232 } 233 } 234 else 235 { 236 m_cond.wait(lock); 237 } 238 } 239 m_interruption_requested = false; 240 241 return false; 242 } 243 244 //! Wakes a thread possibly blocked in the \c dequeue method interrupt_dequeue()245 void interrupt_dequeue() 246 { 247 lock_guard< mutex_type > lock(m_mutex); 248 m_interruption_requested = true; 249 overflow_strategy::interrupt(); 250 m_cond.notify_one(); 251 } 252 }; 253 254 } // namespace sinks 255 256 BOOST_LOG_CLOSE_NAMESPACE // namespace log 257 258 } // namespace boost 259 260 #include <boost/log/detail/footer.hpp> 261 262 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ 263