1 /* 2 * Copyright Andrey Semashev 2007 - 2015. 3 * Distributed under the Boost Software License, Version 1.0. 4 * (See accompanying file LICENSE_1_0.txt or copy at 5 * http://www.boost.org/LICENSE_1_0.txt) 6 */ 7 /*! 8 * \file unbounded_ordering_queue.hpp 9 * \author Andrey Semashev 10 * \date 24.07.2011 11 * 12 * The header contains implementation of unbounded ordering record queueing strategy for 13 * the asynchronous sink frontend. 14 */ 15 16 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ 17 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ 18 19 #include <boost/log/detail/config.hpp> 20 21 #ifdef BOOST_HAS_PRAGMA_ONCE 22 #pragma once 23 #endif 24 25 #if defined(BOOST_LOG_NO_THREADS) 26 #error Boost.Log: This header content is only supported in multithreaded environment 27 #endif 28 29 #include <queue> 30 #include <vector> 31 #include <boost/cstdint.hpp> 32 #include <boost/thread/locks.hpp> 33 #include <boost/thread/mutex.hpp> 34 #include <boost/thread/condition_variable.hpp> 35 #include <boost/thread/thread_time.hpp> 36 #include <boost/date_time/posix_time/posix_time_types.hpp> 37 #include <boost/log/detail/timestamp.hpp> 38 #include <boost/log/detail/enqueued_record.hpp> 39 #include <boost/log/keywords/order.hpp> 40 #include <boost/log/keywords/ordering_window.hpp> 41 #include <boost/log/core/record_view.hpp> 42 #include <boost/log/detail/header.hpp> 43 44 namespace boost { 45 46 BOOST_LOG_OPEN_NAMESPACE 47 48 namespace sinks { 49 50 /*! 51 * \brief Unbounded ordering log record queueing strategy 52 * 53 * The \c unbounded_ordering_queue class is intended to be used with 54 * the \c asynchronous_sink frontend as a log record queueing strategy. 55 * 56 * This strategy provides the following properties to the record queueing mechanism: 57 * 58 * \li The queue has no size limits. 59 * \li The queue has a fixed latency window. This means that each log record put 60 * into the queue will normally not be dequeued for a certain period of time. 61 * \li The queue performs stable record ordering within the latency window. 62 * The ordering predicate can be specified in the \c OrderT template parameter. 63 * 64 * Since this queue has no size limits, it may grow uncontrollably if sink backends 65 * dequeue log records not fast enough. When this is an issue, it is recommended to 66 * use one of the bounded strategies. 67 */ 68 template< typename OrderT > 69 class unbounded_ordering_queue 70 { 71 private: 72 typedef boost::mutex mutex_type; 73 typedef sinks::aux::enqueued_record enqueued_record; 74 75 typedef std::priority_queue< 76 enqueued_record, 77 std::vector< enqueued_record >, 78 enqueued_record::order< OrderT > 79 > queue_type; 80 81 private: 82 //! Ordering window duration, in milliseconds 83 const uint64_t m_ordering_window; 84 //! Synchronization mutex 85 mutex_type m_mutex; 86 //! Condition for blocking 87 condition_variable m_cond; 88 //! Thread-safe queue 89 queue_type m_queue; 90 //! Interruption flag 91 bool m_interruption_requested; 92 93 public: 94 /*! 95 * Returns ordering window size specified during initialization 96 */ get_ordering_window() const97 posix_time::time_duration get_ordering_window() const 98 { 99 return posix_time::milliseconds(m_ordering_window); 100 } 101 102 /*! 103 * Returns default ordering window size. 104 * The default window size is specific to the operating system thread scheduling mechanism. 105 */ get_default_ordering_window()106 static posix_time::time_duration get_default_ordering_window() 107 { 108 // The main idea behind this parameter is that the ordering window should be large enough 109 // to allow the frontend to order records from different threads on an attribute 110 // that contains system time. Thus this value should be: 111 // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS. 112 // For instance, on Windows it defaults to around 15-16 ms. 113 // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to 114 // switch threads on any known OS. It can be tuned for other platforms as needed. 115 return posix_time::milliseconds(30); 116 } 117 118 protected: 119 //! Initializing constructor 120 template< typename ArgsT > unbounded_ordering_queue(ArgsT const & args)121 explicit unbounded_ordering_queue(ArgsT const& args) : 122 m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()), 123 m_queue(args[keywords::order]), 124 m_interruption_requested(false) 125 { 126 } 127 128 //! Enqueues log record to the queue enqueue(record_view const & rec)129 void enqueue(record_view const& rec) 130 { 131 lock_guard< mutex_type > lock(m_mutex); 132 enqueue_unlocked(rec); 133 } 134 135 //! Attempts to enqueue log record to the queue try_enqueue(record_view const & rec)136 bool try_enqueue(record_view const& rec) 137 { 138 unique_lock< mutex_type > lock(m_mutex, try_to_lock); 139 if (lock.owns_lock()) 140 { 141 enqueue_unlocked(rec); 142 return true; 143 } 144 else 145 return false; 146 } 147 148 //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed try_dequeue_ready(record_view & rec)149 bool try_dequeue_ready(record_view& rec) 150 { 151 lock_guard< mutex_type > lock(m_mutex); 152 if (!m_queue.empty()) 153 { 154 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); 155 enqueued_record const& elem = m_queue.top(); 156 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window) 157 { 158 // We got a new element 159 rec = elem.m_record; 160 m_queue.pop(); 161 return true; 162 } 163 } 164 165 return false; 166 } 167 168 //! Attempts to dequeue log record from the queue, does not block. try_dequeue(record_view & rec)169 bool try_dequeue(record_view& rec) 170 { 171 lock_guard< mutex_type > lock(m_mutex); 172 if (!m_queue.empty()) 173 { 174 enqueued_record const& elem = m_queue.top(); 175 rec = elem.m_record; 176 m_queue.pop(); 177 return true; 178 } 179 180 return false; 181 } 182 183 //! Dequeues log record from the queue, blocks if no log records are ready to be processed dequeue_ready(record_view & rec)184 bool dequeue_ready(record_view& rec) 185 { 186 unique_lock< mutex_type > lock(m_mutex); 187 while (!m_interruption_requested) 188 { 189 if (!m_queue.empty()) 190 { 191 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp(); 192 enqueued_record const& elem = m_queue.top(); 193 const uint64_t difference = (now - elem.m_timestamp).milliseconds(); 194 if (difference >= m_ordering_window) 195 { 196 // We got a new element 197 rec = elem.m_record; 198 m_queue.pop(); 199 return true; 200 } 201 else 202 { 203 // Wait until the element becomes ready to be processed 204 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference)); 205 } 206 } 207 else 208 { 209 // Wait for an element to come 210 m_cond.wait(lock); 211 } 212 } 213 m_interruption_requested = false; 214 215 return false; 216 } 217 218 //! Wakes a thread possibly blocked in the \c dequeue method interrupt_dequeue()219 void interrupt_dequeue() 220 { 221 lock_guard< mutex_type > lock(m_mutex); 222 m_interruption_requested = true; 223 m_cond.notify_one(); 224 } 225 226 private: 227 //! Enqueues a log record enqueue_unlocked(record_view const & rec)228 void enqueue_unlocked(record_view const& rec) 229 { 230 const bool was_empty = m_queue.empty(); 231 m_queue.push(enqueued_record(rec)); 232 if (was_empty) 233 m_cond.notify_one(); 234 } 235 }; 236 237 } // namespace sinks 238 239 BOOST_LOG_CLOSE_NAMESPACE // namespace log 240 241 } // namespace boost 242 243 #include <boost/log/detail/footer.hpp> 244 245 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_ 246