1 /* 2 * Copyright Andrey Semashev 2007 - 2015. 3 * Distributed under the Boost Software License, Version 1.0. 4 * (See accompanying file LICENSE_1_0.txt or copy at 5 * http://www.boost.org/LICENSE_1_0.txt) 6 */ 7 /*! 8 * \file async_frontend.hpp 9 * \author Andrey Semashev 10 * \date 14.07.2009 11 * 12 * The header contains implementation of asynchronous sink frontend. 13 */ 14 15 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ 16 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ 17 18 #include <exception> // std::terminate 19 #include <boost/log/detail/config.hpp> 20 21 #ifdef BOOST_HAS_PRAGMA_ONCE 22 #pragma once 23 #endif 24 25 #if defined(BOOST_LOG_NO_THREADS) 26 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment 27 #endif 28 29 #include <boost/bind/bind.hpp> 30 #include <boost/static_assert.hpp> 31 #include <boost/memory_order.hpp> 32 #include <boost/atomic/atomic.hpp> 33 #include <boost/smart_ptr/shared_ptr.hpp> 34 #include <boost/smart_ptr/make_shared_object.hpp> 35 #include <boost/preprocessor/control/if.hpp> 36 #include <boost/preprocessor/comparison/equal.hpp> 37 #include <boost/thread/locks.hpp> 38 #include <boost/thread/recursive_mutex.hpp> 39 #include <boost/thread/thread.hpp> 40 #include <boost/thread/condition_variable.hpp> 41 #include <boost/log/exceptions.hpp> 42 #include <boost/log/detail/locking_ptr.hpp> 43 #include <boost/log/detail/parameter_tools.hpp> 44 #include <boost/log/core/record_view.hpp> 45 #include <boost/log/sinks/basic_sink_frontend.hpp> 46 #include <boost/log/sinks/frontend_requirements.hpp> 47 #include <boost/log/sinks/unbounded_fifo_queue.hpp> 48 #include <boost/log/keywords/start_thread.hpp> 49 #include <boost/log/detail/header.hpp> 50 51 namespace boost { 52 53 BOOST_LOG_OPEN_NAMESPACE 54 55 namespace sinks { 56 57 #ifndef BOOST_LOG_DOXYGEN_PASS 58 59 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(n, data)\ 60 template< typename T0 >\ 61 explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\ 62 base_type(true),\ 63 queue_base_type(arg0),\ 64 m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\ 65 m_StopRequested(false),\ 66 m_FlushRequested(false)\ 67 {\ 68 if (arg0[keywords::start_thread | true])\ 69 start_feeding_thread();\ 70 }\ 71 template< typename T0 >\ 72 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\ 73 base_type(true),\ 74 queue_base_type(arg0),\ 75 m_pBackend(backend),\ 76 m_StopRequested(false),\ 77 m_FlushRequested(false)\ 78 {\ 79 if (arg0[keywords::start_thread | true])\ 80 start_feeding_thread();\ 81 } 82 83 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(n, data)\ 84 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ 85 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ 86 base_type(true),\ 87 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ 88 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\ 89 m_StopRequested(false),\ 90 m_FlushRequested(false)\ 91 {\ 92 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ 93 start_feeding_thread();\ 94 }\ 95 template< BOOST_PP_ENUM_PARAMS(n, typename T) >\ 96 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\ 97 base_type(true),\ 98 queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\ 99 m_pBackend(backend),\ 100 m_StopRequested(false),\ 101 m_FlushRequested(false)\ 102 {\ 103 if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\ 104 start_feeding_thread();\ 105 } 106 107 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\ 108 BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(n, data) 109 110 #endif // BOOST_LOG_DOXYGEN_PASS 111 112 /*! 113 * \brief Asynchronous logging sink frontend 114 * 115 * The frontend starts a separate thread on construction. All logging records are passed 116 * to the backend in this dedicated thread only. 117 */ 118 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue > 119 class asynchronous_sink : 120 public aux::make_sink_frontend_base< SinkBackendT >::type, 121 public QueueingStrategyT 122 { 123 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type; 124 typedef QueueingStrategyT queue_base_type; 125 126 private: 127 //! Backend synchronization mutex type 128 typedef boost::recursive_mutex backend_mutex_type; 129 //! Frontend synchronization mutex type 130 typedef typename base_type::mutex_type frontend_mutex_type; 131 132 //! A scope guard that implements thread ID management 133 class scoped_thread_id 134 { 135 private: 136 frontend_mutex_type& m_Mutex; 137 condition_variable_any& m_Cond; 138 thread::id& m_ThreadID; 139 boost::atomic< bool >& m_StopRequested; 140 141 public: 142 //! Initializing constructor scoped_thread_id(frontend_mutex_type & mut,condition_variable_any & cond,thread::id & tid,boost::atomic<bool> & sr)143 scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr) 144 : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) 145 { 146 lock_guard< frontend_mutex_type > lock(m_Mutex); 147 if (m_ThreadID != thread::id()) 148 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); 149 m_ThreadID = this_thread::get_id(); 150 } 151 //! Initializing constructor scoped_thread_id(unique_lock<frontend_mutex_type> & l,condition_variable_any & cond,thread::id & tid,boost::atomic<bool> & sr)152 scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr) 153 : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr) 154 { 155 unique_lock< frontend_mutex_type > lock(move(l)); 156 if (m_ThreadID != thread::id()) 157 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread"); 158 m_ThreadID = this_thread::get_id(); 159 } 160 //! Destructor ~scoped_thread_id()161 ~scoped_thread_id() 162 { 163 try 164 { 165 lock_guard< frontend_mutex_type > lock(m_Mutex); 166 m_StopRequested.store(false, boost::memory_order_release); 167 m_ThreadID = thread::id(); 168 m_Cond.notify_all(); 169 } 170 catch (...) 171 { 172 } 173 } 174 175 BOOST_DELETED_FUNCTION(scoped_thread_id(scoped_thread_id const&)) 176 BOOST_DELETED_FUNCTION(scoped_thread_id& operator= (scoped_thread_id const&)) 177 }; 178 179 //! A scope guard that resets a flag on destructor 180 class scoped_flag 181 { 182 private: 183 frontend_mutex_type& m_Mutex; 184 condition_variable_any& m_Cond; 185 boost::atomic< bool >& m_Flag; 186 187 public: scoped_flag(frontend_mutex_type & mut,condition_variable_any & cond,boost::atomic<bool> & f)188 explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) : 189 m_Mutex(mut), m_Cond(cond), m_Flag(f) 190 { 191 } ~scoped_flag()192 ~scoped_flag() 193 { 194 try 195 { 196 lock_guard< frontend_mutex_type > lock(m_Mutex); 197 m_Flag.store(false, boost::memory_order_release); 198 m_Cond.notify_all(); 199 } 200 catch (...) 201 { 202 } 203 } 204 205 BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&)) 206 BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&)) 207 }; 208 209 public: 210 //! Sink implementation type 211 typedef SinkBackendT sink_backend_type; 212 //! \cond 213 BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met"); 214 //! \endcond 215 216 #ifndef BOOST_LOG_DOXYGEN_PASS 217 218 //! A pointer type that locks the backend until it's destroyed 219 typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr; 220 221 #else // BOOST_LOG_DOXYGEN_PASS 222 223 //! A pointer type that locks the backend until it's destroyed 224 typedef implementation_defined locked_backend_ptr; 225 226 #endif // BOOST_LOG_DOXYGEN_PASS 227 228 private: 229 //! Synchronization mutex 230 backend_mutex_type m_BackendMutex; 231 //! Pointer to the backend 232 const shared_ptr< sink_backend_type > m_pBackend; 233 234 //! Dedicated record feeding thread 235 thread m_DedicatedFeedingThread; 236 //! Feeding thread ID 237 thread::id m_FeedingThreadID; 238 //! Condition variable to implement blocking operations 239 condition_variable_any m_BlockCond; 240 241 //! The flag indicates that the feeding loop has to be stopped 242 boost::atomic< bool > m_StopRequested; 243 //! The flag indicates that queue flush has been requested 244 boost::atomic< bool > m_FlushRequested; 245 246 public: 247 /*! 248 * Default constructor. Constructs the sink backend instance. 249 * Requires the backend to be default-constructible. 250 * 251 * \param start_thread If \c true, the frontend creates a thread to feed 252 * log records to the backend. Otherwise no thread is 253 * started and it is assumed that the user will call 254 * either \c run or \c feed_records himself. 255 */ asynchronous_sink(bool start_thread=true)256 asynchronous_sink(bool start_thread = true) : 257 base_type(true), 258 m_pBackend(boost::make_shared< sink_backend_type >()), 259 m_StopRequested(false), 260 m_FlushRequested(false) 261 { 262 if (start_thread) 263 start_feeding_thread(); 264 } 265 /*! 266 * Constructor attaches user-constructed backend instance 267 * 268 * \param backend Pointer to the backend instance. 269 * \param start_thread If \c true, the frontend creates a thread to feed 270 * log records to the backend. Otherwise no thread is 271 * started and it is assumed that the user will call 272 * either \c run or \c feed_records himself. 273 * 274 * \pre \a backend is not \c NULL. 275 */ asynchronous_sink(shared_ptr<sink_backend_type> const & backend,bool start_thread=true)276 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) : 277 base_type(true), 278 m_pBackend(backend), 279 m_StopRequested(false), 280 m_FlushRequested(false) 281 { 282 if (start_thread) 283 start_feeding_thread(); 284 } 285 286 /*! 287 * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor. 288 * Refer to the backend documentation for the list of supported parameters. 289 * 290 * The frontend uses the following named parameters: 291 * 292 * \li start_thread - If \c true, the frontend creates a thread to feed 293 * log records to the backend. Otherwise no thread is 294 * started and it is assumed that the user will call 295 * either \c run or \c feed_records himself. 296 */ 297 #ifndef BOOST_LOG_DOXYGEN_PASS 298 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~) 299 #else 300 template< typename... Args > 301 explicit asynchronous_sink(Args&&... args); 302 #endif 303 304 /*! 305 * Destructor. Implicitly stops the dedicated feeding thread, if one is running. 306 */ ~asynchronous_sink()307 ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE 308 { 309 try 310 { 311 boost::this_thread::disable_interruption no_interrupts; 312 stop(); 313 } 314 catch (...) 315 { 316 std::terminate(); 317 } 318 } 319 320 /*! 321 * Locking accessor to the attached backend 322 */ locked_backend()323 locked_backend_ptr locked_backend() 324 { 325 return locked_backend_ptr(m_pBackend, m_BackendMutex); 326 } 327 328 /*! 329 * Enqueues the log record to the backend 330 */ consume(record_view const & rec)331 void consume(record_view const& rec) BOOST_OVERRIDE 332 { 333 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire))) 334 { 335 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); 336 // Wait until flush is done 337 while (m_FlushRequested.load(boost::memory_order_acquire)) 338 m_BlockCond.wait(lock); 339 } 340 queue_base_type::enqueue(rec); 341 } 342 343 /*! 344 * The method attempts to pass logging record to the backend 345 */ try_consume(record_view const & rec)346 bool try_consume(record_view const& rec) BOOST_OVERRIDE 347 { 348 if (!m_FlushRequested.load(boost::memory_order_acquire)) 349 { 350 return queue_base_type::try_enqueue(rec); 351 } 352 else 353 return false; 354 } 355 356 /*! 357 * The method starts record feeding loop and effectively blocks until either of this happens: 358 * 359 * \li the thread is interrupted due to either standard thread interruption or a call to \c stop 360 * \li an exception is thrown while processing a log record in the backend, and the exception is 361 * not terminated by the exception handler, if one is installed 362 * 363 * \pre The sink frontend must be constructed without spawning a dedicated thread 364 */ run()365 void run() 366 { 367 // First check that no other thread is running 368 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); 369 370 // Now start the feeding loop 371 while (true) 372 { 373 do_feed_records(); 374 if (!m_StopRequested.load(boost::memory_order_acquire)) 375 { 376 // Block until new record is available 377 record_view rec; 378 if (queue_base_type::dequeue_ready(rec)) 379 base_type::feed_record(rec, m_BackendMutex, *m_pBackend); 380 } 381 else 382 break; 383 } 384 } 385 386 /*! 387 * The method softly interrupts record feeding loop. This method must be called when the \c run 388 * method execution has to be interrupted. Unlike regular thread interruption, calling 389 * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend 390 * will attempt to finish its business with the record in progress and return afterwards. 391 * This method can be called either if the sink was created with a dedicated thread, 392 * or if the feeding loop was initiated by user. 393 * 394 * \note Returning from this method does not guarantee that there are no records left buffered 395 * in the sink frontend. It is possible that log records keep coming during and after this 396 * method is called. At some point of execution of this method log records stop being processed, 397 * and all records that come after this point are put into the queue. These records will be 398 * processed upon further calls to \c run or \c feed_records. 399 */ stop()400 void stop() 401 { 402 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); 403 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) 404 { 405 try 406 { 407 m_StopRequested.store(true, boost::memory_order_release); 408 queue_base_type::interrupt_dequeue(); 409 while (m_StopRequested.load(boost::memory_order_acquire)) 410 m_BlockCond.wait(lock); 411 } 412 catch (...) 413 { 414 m_StopRequested.store(false, boost::memory_order_release); 415 throw; 416 } 417 418 lock.unlock(); 419 m_DedicatedFeedingThread.join(); 420 } 421 } 422 423 /*! 424 * The method feeds log records that may have been buffered to the backend and returns 425 * 426 * \pre The sink frontend must be constructed without spawning a dedicated thread 427 */ feed_records()428 void feed_records() 429 { 430 // First check that no other thread is running 431 scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested); 432 433 // Now start the feeding loop 434 do_feed_records(); 435 } 436 437 /*! 438 * The method feeds all log records that may have been buffered to the backend and returns. 439 * Unlike \c feed_records, in case of ordering queueing the method also feeds records 440 * that were enqueued during the ordering window, attempting to empty the queue completely. 441 */ flush()442 void flush() BOOST_OVERRIDE 443 { 444 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex()); 445 if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable()) 446 { 447 // There is already a thread feeding records, let it do the job 448 m_FlushRequested.store(true, boost::memory_order_release); 449 queue_base_type::interrupt_dequeue(); 450 while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire)) 451 m_BlockCond.wait(lock); 452 453 // The condition may have been signalled when the feeding thread was finishing. 454 // In that case records may not have been flushed, and we do the flush ourselves. 455 if (m_FeedingThreadID != thread::id()) 456 return; 457 } 458 459 m_FlushRequested.store(true, boost::memory_order_release); 460 461 // Flush records ourselves. The guard releases the lock. 462 scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested); 463 464 do_feed_records(); 465 } 466 467 private: 468 #ifndef BOOST_LOG_DOXYGEN_PASS 469 //! The method spawns record feeding thread start_feeding_thread()470 void start_feeding_thread() 471 { 472 boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread); 473 } 474 475 //! The record feeding loop do_feed_records()476 void do_feed_records() 477 { 478 while (!m_StopRequested.load(boost::memory_order_acquire)) 479 { 480 record_view rec; 481 bool dequeued = false; 482 if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire))) 483 dequeued = queue_base_type::try_dequeue_ready(rec); 484 else 485 dequeued = queue_base_type::try_dequeue(rec); 486 487 if (dequeued) 488 base_type::feed_record(rec, m_BackendMutex, *m_pBackend); 489 else 490 break; 491 } 492 493 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire))) 494 { 495 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested); 496 base_type::flush_backend(m_BackendMutex, *m_pBackend); 497 } 498 } 499 #endif // BOOST_LOG_DOXYGEN_PASS 500 }; 501 502 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1 503 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N 504 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL 505 506 } // namespace sinks 507 508 BOOST_LOG_CLOSE_NAMESPACE // namespace log 509 510 } // namespace boost 511 512 #include <boost/log/detail/footer.hpp> 513 514 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_ 515