• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *          Copyright Andrey Semashev 2007 - 2015.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 /*!
8  * \file   async_frontend.hpp
9  * \author Andrey Semashev
10  * \date   14.07.2009
11  *
12  * The header contains implementation of asynchronous sink frontend.
13  */
14 
15 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
16 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
17 
18 #include <exception> // std::terminate
19 #include <boost/log/detail/config.hpp>
20 
21 #ifdef BOOST_HAS_PRAGMA_ONCE
22 #pragma once
23 #endif
24 
25 #if defined(BOOST_LOG_NO_THREADS)
26 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
27 #endif
28 
29 #include <boost/bind/bind.hpp>
30 #include <boost/static_assert.hpp>
31 #include <boost/memory_order.hpp>
32 #include <boost/atomic/atomic.hpp>
33 #include <boost/smart_ptr/shared_ptr.hpp>
34 #include <boost/smart_ptr/make_shared_object.hpp>
35 #include <boost/preprocessor/control/if.hpp>
36 #include <boost/preprocessor/comparison/equal.hpp>
37 #include <boost/thread/locks.hpp>
38 #include <boost/thread/recursive_mutex.hpp>
39 #include <boost/thread/thread.hpp>
40 #include <boost/thread/condition_variable.hpp>
41 #include <boost/log/exceptions.hpp>
42 #include <boost/log/detail/locking_ptr.hpp>
43 #include <boost/log/detail/parameter_tools.hpp>
44 #include <boost/log/core/record_view.hpp>
45 #include <boost/log/sinks/basic_sink_frontend.hpp>
46 #include <boost/log/sinks/frontend_requirements.hpp>
47 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
48 #include <boost/log/keywords/start_thread.hpp>
49 #include <boost/log/detail/header.hpp>
50 
51 namespace boost {
52 
53 BOOST_LOG_OPEN_NAMESPACE
54 
55 namespace sinks {
56 
57 #ifndef BOOST_LOG_DOXYGEN_PASS
58 
59 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(n, data)\
60     template< typename T0 >\
61     explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
62         base_type(true),\
63         queue_base_type(arg0),\
64         m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
65         m_StopRequested(false),\
66         m_FlushRequested(false)\
67     {\
68         if (arg0[keywords::start_thread | true])\
69             start_feeding_thread();\
70     }\
71     template< typename T0 >\
72     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
73         base_type(true),\
74         queue_base_type(arg0),\
75         m_pBackend(backend),\
76         m_StopRequested(false),\
77         m_FlushRequested(false)\
78     {\
79         if (arg0[keywords::start_thread | true])\
80             start_feeding_thread();\
81     }
82 
83 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(n, data)\
84     template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
85     explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
86         base_type(true),\
87         queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
88         m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS(n, arg))),\
89         m_StopRequested(false),\
90         m_FlushRequested(false)\
91     {\
92         if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
93             start_feeding_thread();\
94     }\
95     template< BOOST_PP_ENUM_PARAMS(n, typename T) >\
96     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS(n, T, const& arg)) :\
97         base_type(true),\
98         queue_base_type((BOOST_PP_ENUM_PARAMS(n, arg))),\
99         m_pBackend(backend),\
100         m_StopRequested(false),\
101         m_FlushRequested(false)\
102     {\
103         if ((BOOST_PP_ENUM_PARAMS(n, arg))[keywords::start_thread | true])\
104             start_feeding_thread();\
105     }
106 
107 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
108     BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(n, data)
109 
110 #endif // BOOST_LOG_DOXYGEN_PASS
111 
112 /*!
113  * \brief Asynchronous logging sink frontend
114  *
115  * The frontend starts a separate thread on construction. All logging records are passed
116  * to the backend in this dedicated thread only.
117  */
118 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
119 class asynchronous_sink :
120     public aux::make_sink_frontend_base< SinkBackendT >::type,
121     public QueueingStrategyT
122 {
123     typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
124     typedef QueueingStrategyT queue_base_type;
125 
126 private:
127     //! Backend synchronization mutex type
128     typedef boost::recursive_mutex backend_mutex_type;
129     //! Frontend synchronization mutex type
130     typedef typename base_type::mutex_type frontend_mutex_type;
131 
132     //! A scope guard that implements thread ID management
133     class scoped_thread_id
134     {
135     private:
136         frontend_mutex_type& m_Mutex;
137         condition_variable_any& m_Cond;
138         thread::id& m_ThreadID;
139         boost::atomic< bool >& m_StopRequested;
140 
141     public:
142         //! Initializing constructor
scoped_thread_id(frontend_mutex_type & mut,condition_variable_any & cond,thread::id & tid,boost::atomic<bool> & sr)143         scoped_thread_id(frontend_mutex_type& mut, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
144             : m_Mutex(mut), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
145         {
146             lock_guard< frontend_mutex_type > lock(m_Mutex);
147             if (m_ThreadID != thread::id())
148                 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
149             m_ThreadID = this_thread::get_id();
150         }
151         //! Initializing constructor
scoped_thread_id(unique_lock<frontend_mutex_type> & l,condition_variable_any & cond,thread::id & tid,boost::atomic<bool> & sr)152         scoped_thread_id(unique_lock< frontend_mutex_type >& l, condition_variable_any& cond, thread::id& tid, boost::atomic< bool >& sr)
153             : m_Mutex(*l.mutex()), m_Cond(cond), m_ThreadID(tid), m_StopRequested(sr)
154         {
155             unique_lock< frontend_mutex_type > lock(move(l));
156             if (m_ThreadID != thread::id())
157                 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
158             m_ThreadID = this_thread::get_id();
159         }
160         //! Destructor
~scoped_thread_id()161         ~scoped_thread_id()
162         {
163             try
164             {
165                 lock_guard< frontend_mutex_type > lock(m_Mutex);
166                 m_StopRequested.store(false, boost::memory_order_release);
167                 m_ThreadID = thread::id();
168                 m_Cond.notify_all();
169             }
170             catch (...)
171             {
172             }
173         }
174 
175         BOOST_DELETED_FUNCTION(scoped_thread_id(scoped_thread_id const&))
176         BOOST_DELETED_FUNCTION(scoped_thread_id& operator= (scoped_thread_id const&))
177     };
178 
179     //! A scope guard that resets a flag on destructor
180     class scoped_flag
181     {
182     private:
183         frontend_mutex_type& m_Mutex;
184         condition_variable_any& m_Cond;
185         boost::atomic< bool >& m_Flag;
186 
187     public:
scoped_flag(frontend_mutex_type & mut,condition_variable_any & cond,boost::atomic<bool> & f)188         explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) :
189             m_Mutex(mut), m_Cond(cond), m_Flag(f)
190         {
191         }
~scoped_flag()192         ~scoped_flag()
193         {
194             try
195             {
196                 lock_guard< frontend_mutex_type > lock(m_Mutex);
197                 m_Flag.store(false, boost::memory_order_release);
198                 m_Cond.notify_all();
199             }
200             catch (...)
201             {
202             }
203         }
204 
205         BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
206         BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
207     };
208 
209 public:
210     //! Sink implementation type
211     typedef SinkBackendT sink_backend_type;
212     //! \cond
213     BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
214     //! \endcond
215 
216 #ifndef BOOST_LOG_DOXYGEN_PASS
217 
218     //! A pointer type that locks the backend until it's destroyed
219     typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
220 
221 #else // BOOST_LOG_DOXYGEN_PASS
222 
223     //! A pointer type that locks the backend until it's destroyed
224     typedef implementation_defined locked_backend_ptr;
225 
226 #endif // BOOST_LOG_DOXYGEN_PASS
227 
228 private:
229     //! Synchronization mutex
230     backend_mutex_type m_BackendMutex;
231     //! Pointer to the backend
232     const shared_ptr< sink_backend_type > m_pBackend;
233 
234     //! Dedicated record feeding thread
235     thread m_DedicatedFeedingThread;
236     //! Feeding thread ID
237     thread::id m_FeedingThreadID;
238     //! Condition variable to implement blocking operations
239     condition_variable_any m_BlockCond;
240 
241     //! The flag indicates that the feeding loop has to be stopped
242     boost::atomic< bool > m_StopRequested;
243     //! The flag indicates that queue flush has been requested
244     boost::atomic< bool > m_FlushRequested;
245 
246 public:
247     /*!
248      * Default constructor. Constructs the sink backend instance.
249      * Requires the backend to be default-constructible.
250      *
251      * \param start_thread If \c true, the frontend creates a thread to feed
252      *                     log records to the backend. Otherwise no thread is
253      *                     started and it is assumed that the user will call
254      *                     either \c run or \c feed_records himself.
255      */
asynchronous_sink(bool start_thread=true)256     asynchronous_sink(bool start_thread = true) :
257         base_type(true),
258         m_pBackend(boost::make_shared< sink_backend_type >()),
259         m_StopRequested(false),
260         m_FlushRequested(false)
261     {
262         if (start_thread)
263             start_feeding_thread();
264     }
265     /*!
266      * Constructor attaches user-constructed backend instance
267      *
268      * \param backend Pointer to the backend instance.
269      * \param start_thread If \c true, the frontend creates a thread to feed
270      *                     log records to the backend. Otherwise no thread is
271      *                     started and it is assumed that the user will call
272      *                     either \c run or \c feed_records himself.
273      *
274      * \pre \a backend is not \c NULL.
275      */
asynchronous_sink(shared_ptr<sink_backend_type> const & backend,bool start_thread=true)276     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
277         base_type(true),
278         m_pBackend(backend),
279         m_StopRequested(false),
280         m_FlushRequested(false)
281     {
282         if (start_thread)
283             start_feeding_thread();
284     }
285 
286     /*!
287      * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
288      * Refer to the backend documentation for the list of supported parameters.
289      *
290      * The frontend uses the following named parameters:
291      *
292      *   \li start_thread - If \c true, the frontend creates a thread to feed
293      *                      log records to the backend. Otherwise no thread is
294      *                      started and it is assumed that the user will call
295      *                      either \c run or \c feed_records himself.
296      */
297 #ifndef BOOST_LOG_DOXYGEN_PASS
298     BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
299 #else
300     template< typename... Args >
301     explicit asynchronous_sink(Args&&... args);
302 #endif
303 
304     /*!
305      * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
306      */
~asynchronous_sink()307     ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
308     {
309         try
310         {
311             boost::this_thread::disable_interruption no_interrupts;
312             stop();
313         }
314         catch (...)
315         {
316             std::terminate();
317         }
318     }
319 
320     /*!
321      * Locking accessor to the attached backend
322      */
locked_backend()323     locked_backend_ptr locked_backend()
324     {
325         return locked_backend_ptr(m_pBackend, m_BackendMutex);
326     }
327 
328     /*!
329      * Enqueues the log record to the backend
330      */
consume(record_view const & rec)331     void consume(record_view const& rec) BOOST_OVERRIDE
332     {
333         if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
334         {
335             unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
336             // Wait until flush is done
337             while (m_FlushRequested.load(boost::memory_order_acquire))
338                 m_BlockCond.wait(lock);
339         }
340         queue_base_type::enqueue(rec);
341     }
342 
343     /*!
344      * The method attempts to pass logging record to the backend
345      */
try_consume(record_view const & rec)346     bool try_consume(record_view const& rec) BOOST_OVERRIDE
347     {
348         if (!m_FlushRequested.load(boost::memory_order_acquire))
349         {
350             return queue_base_type::try_enqueue(rec);
351         }
352         else
353             return false;
354     }
355 
356     /*!
357      * The method starts record feeding loop and effectively blocks until either of this happens:
358      *
359      * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
360      * \li an exception is thrown while processing a log record in the backend, and the exception is
361      *     not terminated by the exception handler, if one is installed
362      *
363      * \pre The sink frontend must be constructed without spawning a dedicated thread
364      */
run()365     void run()
366     {
367         // First check that no other thread is running
368         scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
369 
370         // Now start the feeding loop
371         while (true)
372         {
373             do_feed_records();
374             if (!m_StopRequested.load(boost::memory_order_acquire))
375             {
376                 // Block until new record is available
377                 record_view rec;
378                 if (queue_base_type::dequeue_ready(rec))
379                     base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
380             }
381             else
382                 break;
383         }
384     }
385 
386     /*!
387      * The method softly interrupts record feeding loop. This method must be called when the \c run
388      * method execution has to be interrupted. Unlike regular thread interruption, calling
389      * \c stop will not interrupt the record processing in the middle. Instead, the sink frontend
390      * will attempt to finish its business with the record in progress and return afterwards.
391      * This method can be called either if the sink was created with a dedicated thread,
392      * or if the feeding loop was initiated by user.
393      *
394      * \note Returning from this method does not guarantee that there are no records left buffered
395      *       in the sink frontend. It is possible that log records keep coming during and after this
396      *       method is called. At some point of execution of this method log records stop being processed,
397      *       and all records that come after this point are put into the queue. These records will be
398      *       processed upon further calls to \c run or \c feed_records.
399      */
stop()400     void stop()
401     {
402         unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
403         if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
404         {
405             try
406             {
407                 m_StopRequested.store(true, boost::memory_order_release);
408                 queue_base_type::interrupt_dequeue();
409                 while (m_StopRequested.load(boost::memory_order_acquire))
410                     m_BlockCond.wait(lock);
411             }
412             catch (...)
413             {
414                 m_StopRequested.store(false, boost::memory_order_release);
415                 throw;
416             }
417 
418             lock.unlock();
419             m_DedicatedFeedingThread.join();
420         }
421     }
422 
423     /*!
424      * The method feeds log records that may have been buffered to the backend and returns
425      *
426      * \pre The sink frontend must be constructed without spawning a dedicated thread
427      */
feed_records()428     void feed_records()
429     {
430         // First check that no other thread is running
431         scoped_thread_id guard(base_type::frontend_mutex(), m_BlockCond, m_FeedingThreadID, m_StopRequested);
432 
433         // Now start the feeding loop
434         do_feed_records();
435     }
436 
437     /*!
438      * The method feeds all log records that may have been buffered to the backend and returns.
439      * Unlike \c feed_records, in case of ordering queueing the method also feeds records
440      * that were enqueued during the ordering window, attempting to empty the queue completely.
441      */
flush()442     void flush() BOOST_OVERRIDE
443     {
444         unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
445         if (m_FeedingThreadID != thread::id() || m_DedicatedFeedingThread.joinable())
446         {
447             // There is already a thread feeding records, let it do the job
448             m_FlushRequested.store(true, boost::memory_order_release);
449             queue_base_type::interrupt_dequeue();
450             while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
451                 m_BlockCond.wait(lock);
452 
453             // The condition may have been signalled when the feeding thread was finishing.
454             // In that case records may not have been flushed, and we do the flush ourselves.
455             if (m_FeedingThreadID != thread::id())
456                 return;
457         }
458 
459         m_FlushRequested.store(true, boost::memory_order_release);
460 
461         // Flush records ourselves. The guard releases the lock.
462         scoped_thread_id guard(lock, m_BlockCond, m_FeedingThreadID, m_StopRequested);
463 
464         do_feed_records();
465     }
466 
467 private:
468 #ifndef BOOST_LOG_DOXYGEN_PASS
469     //! The method spawns record feeding thread
start_feeding_thread()470     void start_feeding_thread()
471     {
472         boost::thread(boost::bind(&asynchronous_sink::run, this)).swap(m_DedicatedFeedingThread);
473     }
474 
475     //! The record feeding loop
do_feed_records()476     void do_feed_records()
477     {
478         while (!m_StopRequested.load(boost::memory_order_acquire))
479         {
480             record_view rec;
481             bool dequeued = false;
482             if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
483                 dequeued = queue_base_type::try_dequeue_ready(rec);
484             else
485                 dequeued = queue_base_type::try_dequeue(rec);
486 
487             if (dequeued)
488                 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
489             else
490                 break;
491         }
492 
493         if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
494         {
495             scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
496             base_type::flush_backend(m_BackendMutex, *m_pBackend);
497         }
498     }
499 #endif // BOOST_LOG_DOXYGEN_PASS
500 };
501 
502 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
503 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
504 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
505 
506 } // namespace sinks
507 
508 BOOST_LOG_CLOSE_NAMESPACE // namespace log
509 
510 } // namespace boost
511 
512 #include <boost/log/detail/footer.hpp>
513 
514 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
515