• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *              Copyright Andrey Semashev 2016.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 /*!
8  * \file   windows/ipc_sync_wrappers.hpp
9  * \author Andrey Semashev
10  * \date   23.01.2016
11  *
12  * \brief  This header is the Boost.Log library implementation, see the library documentation
13  *         at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
14  */
15 
16 #ifndef BOOST_LOG_WINDOWS_IPC_SYNC_WRAPPERS_HPP_INCLUDED_
17 #define BOOST_LOG_WINDOWS_IPC_SYNC_WRAPPERS_HPP_INCLUDED_
18 
19 #include <boost/log/detail/config.hpp>
20 #include <boost/winapi/access_rights.hpp>
21 #include <boost/winapi/handles.hpp>
22 #include <boost/winapi/event.hpp>
23 #include <boost/winapi/semaphore.hpp>
24 #include <boost/winapi/wait.hpp>
25 #include <boost/winapi/dll.hpp>
26 #include <boost/winapi/time.hpp>
27 #include <boost/winapi/get_last_error.hpp>
28 #include <cstddef>
29 #include <limits>
30 #include <string>
31 #include <utility>
32 #include <boost/assert.hpp>
33 #include <boost/throw_exception.hpp>
34 #include <boost/checked_delete.hpp>
35 #include <boost/memory_order.hpp>
36 #include <boost/atomic/atomic.hpp>
37 #include <boost/intrusive/options.hpp>
38 #include <boost/intrusive/set.hpp>
39 #include <boost/intrusive/set_hook.hpp>
40 #include <boost/intrusive/list.hpp>
41 #include <boost/intrusive/list_hook.hpp>
42 #include <boost/log/exceptions.hpp>
43 #include <boost/log/utility/permissions.hpp>
44 #include "windows/auto_handle.hpp"
45 #include <boost/log/detail/header.hpp>
46 
47 namespace boost {
48 
49 BOOST_LOG_OPEN_NAMESPACE
50 
51 namespace ipc {
52 
53 namespace aux {
54 
55 //! Interprocess event object
56 class interprocess_event
57 {
58 private:
59     auto_handle m_event;
60 
61 public:
62     void create(const wchar_t* name, bool manual_reset, permissions const& perms = permissions());
63     void create_or_open(const wchar_t* name, bool manual_reset, permissions const& perms = permissions());
64     void open(const wchar_t* name);
65 
get_handle() const66     boost::winapi::HANDLE_ get_handle() const BOOST_NOEXCEPT { return m_event.get(); }
67 
set()68     void set()
69     {
70         if (BOOST_UNLIKELY(!boost::winapi::SetEvent(m_event.get())))
71         {
72             const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
73             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to set an interprocess event object", (err));
74         }
75     }
76 
set_noexcept()77     void set_noexcept() BOOST_NOEXCEPT
78     {
79         BOOST_VERIFY(!!boost::winapi::SetEvent(m_event.get()));
80     }
81 
reset()82     void reset()
83     {
84         if (BOOST_UNLIKELY(!boost::winapi::ResetEvent(m_event.get())))
85         {
86             const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
87             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to reset an interprocess event object", (err));
88         }
89     }
90 
wait()91     void wait()
92     {
93         const boost::winapi::DWORD_ retval = boost::winapi::WaitForSingleObject(m_event.get(), boost::winapi::infinite);
94         if (BOOST_UNLIKELY(retval != boost::winapi::wait_object_0))
95         {
96             const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
97             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to block on an interprocess event object", (err));
98         }
99     }
100 
wait(boost::winapi::HANDLE_ abort_handle)101     bool wait(boost::winapi::HANDLE_ abort_handle)
102     {
103         boost::winapi::HANDLE_ handles[2u] = { m_event.get(), abort_handle };
104         const boost::winapi::DWORD_ retval = boost::winapi::WaitForMultipleObjects(2u, handles, false, boost::winapi::infinite);
105         if (retval == (boost::winapi::wait_object_0 + 1u))
106         {
107             // Wait was interrupted
108             return false;
109         }
110         else if (BOOST_UNLIKELY(retval != boost::winapi::wait_object_0))
111         {
112             const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
113             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to block on an interprocess event object", (err));
114         }
115 
116         return true;
117     }
118 
swap(interprocess_event & that)119     void swap(interprocess_event& that) BOOST_NOEXCEPT
120     {
121         m_event.swap(that.m_event);
122     }
123 };
124 
125 //! Interprocess semaphore object
126 class interprocess_semaphore
127 {
128 private:
129     typedef boost::winapi::DWORD_ NTSTATUS_;
130     struct semaphore_basic_information
131     {
132         boost::winapi::ULONG_ current_count; // current semaphore count
133         boost::winapi::ULONG_ maximum_count; // max semaphore count
134     };
135     typedef NTSTATUS_ (__stdcall *nt_query_semaphore_t)(boost::winapi::HANDLE_ h, unsigned int info_class, semaphore_basic_information* pinfo, boost::winapi::ULONG_ info_size, boost::winapi::ULONG_* ret_len);
136     typedef bool (*is_semaphore_zero_count_t)(boost::winapi::HANDLE_ h);
137 
138 private:
139     auto_handle m_sem;
140 
141     static boost::atomic< is_semaphore_zero_count_t > is_semaphore_zero_count;
142     static nt_query_semaphore_t nt_query_semaphore;
143 
144 public:
145     void create_or_open(const wchar_t* name, permissions const& perms = permissions());
146     void open(const wchar_t* name);
147 
get_handle() const148     boost::winapi::HANDLE_ get_handle() const BOOST_NOEXCEPT { return m_sem.get(); }
149 
post(uint32_t count)150     void post(uint32_t count)
151     {
152         BOOST_ASSERT(count <= static_cast< uint32_t >((std::numeric_limits< boost::winapi::LONG_ >::max)()));
153 
154         if (BOOST_UNLIKELY(!boost::winapi::ReleaseSemaphore(m_sem.get(), static_cast< boost::winapi::LONG_ >(count), NULL)))
155         {
156             const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
157             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to post on an interprocess semaphore object", (err));
158         }
159     }
160 
is_zero_count() const161     bool is_zero_count() const
162     {
163         return is_semaphore_zero_count.load(boost::memory_order_acquire)(m_sem.get());
164     }
165 
wait()166     void wait()
167     {
168         const boost::winapi::DWORD_ retval = boost::winapi::WaitForSingleObject(m_sem.get(), boost::winapi::infinite);
169         if (BOOST_UNLIKELY(retval != boost::winapi::wait_object_0))
170         {
171             const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
172             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to block on an interprocess semaphore object", (err));
173         }
174     }
175 
wait(boost::winapi::HANDLE_ abort_handle)176     bool wait(boost::winapi::HANDLE_ abort_handle)
177     {
178         boost::winapi::HANDLE_ handles[2u] = { m_sem.get(), abort_handle };
179         const boost::winapi::DWORD_ retval = boost::winapi::WaitForMultipleObjects(2u, handles, false, boost::winapi::infinite);
180         if (retval == (boost::winapi::wait_object_0 + 1u))
181         {
182             // Wait was interrupted
183             return false;
184         }
185         else if (BOOST_UNLIKELY(retval != boost::winapi::wait_object_0))
186         {
187             const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
188             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to block on an interprocess semaphore object", (err));
189         }
190 
191         return true;
192     }
193 
swap(interprocess_semaphore & that)194     void swap(interprocess_semaphore& that) BOOST_NOEXCEPT
195     {
196         m_sem.swap(that.m_sem);
197     }
198 
199 private:
200     static bool is_semaphore_zero_count_init(boost::winapi::HANDLE_ h);
201     static bool is_semaphore_zero_count_nt_query_semaphore(boost::winapi::HANDLE_ h);
202     static bool is_semaphore_zero_count_emulated(boost::winapi::HANDLE_ h);
203 };
204 
205 //! Interprocess mutex. Implementation adopted from Boost.Sync.
206 class interprocess_mutex
207 {
208 public:
209     //! Shared state that should be visible to all processes using the mutex
210     struct shared_state
211     {
212         boost::atomic< uint32_t > m_lock_state;
213 
shared_stateboost::ipc::aux::interprocess_mutex::shared_state214         shared_state() BOOST_NOEXCEPT : m_lock_state(0u)
215         {
216         }
217     };
218 
219     struct auto_unlock
220     {
auto_unlockboost::ipc::aux::interprocess_mutex::auto_unlock221         explicit auto_unlock(interprocess_mutex& mutex) BOOST_NOEXCEPT : m_mutex(mutex) {}
~auto_unlockboost::ipc::aux::interprocess_mutex::auto_unlock222         ~auto_unlock() { m_mutex.unlock(); }
223 
224         BOOST_DELETED_FUNCTION(auto_unlock(auto_unlock const&))
225         BOOST_DELETED_FUNCTION(auto_unlock& operator=(auto_unlock const&))
226 
227     private:
228         interprocess_mutex& m_mutex;
229     };
230 
231     struct optional_unlock
232     {
optional_unlockboost::ipc::aux::interprocess_mutex::optional_unlock233         optional_unlock() BOOST_NOEXCEPT : m_mutex(NULL) {}
optional_unlockboost::ipc::aux::interprocess_mutex::optional_unlock234         explicit optional_unlock(interprocess_mutex& mutex) BOOST_NOEXCEPT : m_mutex(&mutex) {}
~optional_unlockboost::ipc::aux::interprocess_mutex::optional_unlock235         ~optional_unlock() { if (m_mutex) m_mutex->unlock(); }
236 
disengageboost::ipc::aux::interprocess_mutex::optional_unlock237         interprocess_mutex* disengage() BOOST_NOEXCEPT
238         {
239             interprocess_mutex* p = m_mutex;
240             m_mutex = NULL;
241             return p;
242         }
243 
engageboost::ipc::aux::interprocess_mutex::optional_unlock244         void engage(interprocess_mutex& mutex) BOOST_NOEXCEPT
245         {
246             BOOST_ASSERT(!m_mutex);
247             m_mutex = &mutex;
248         }
249 
250         BOOST_DELETED_FUNCTION(optional_unlock(optional_unlock const&))
251         BOOST_DELETED_FUNCTION(optional_unlock& operator=(optional_unlock const&))
252 
253     private:
254         interprocess_mutex* m_mutex;
255     };
256 
257 private:
258     interprocess_event m_event;
259     shared_state* m_shared_state;
260 
261 #if !defined(BOOST_MSVC) || _MSC_VER >= 1800
262     static BOOST_CONSTEXPR_OR_CONST uint32_t lock_flag_bit = 31u;
263     static BOOST_CONSTEXPR_OR_CONST uint32_t event_set_flag_bit = 30u;
264     static BOOST_CONSTEXPR_OR_CONST uint32_t lock_flag_value = 1u << lock_flag_bit;
265     static BOOST_CONSTEXPR_OR_CONST uint32_t event_set_flag_value = 1u << event_set_flag_bit;
266     static BOOST_CONSTEXPR_OR_CONST uint32_t waiter_count_mask = event_set_flag_value - 1u;
267 #else
268     // MSVC 8-11, inclusively, fail to link if these constants are declared as static constants instead of an enum
269     enum
270     {
271         lock_flag_bit = 31u,
272         event_set_flag_bit = 30u,
273         lock_flag_value = 1u << lock_flag_bit,
274         event_set_flag_value = 1u << event_set_flag_bit,
275         waiter_count_mask = event_set_flag_value - 1u
276     };
277 #endif
278 
279 public:
interprocess_mutex()280     interprocess_mutex() BOOST_NOEXCEPT : m_shared_state(NULL)
281     {
282     }
283 
create(const wchar_t * name,shared_state * shared,permissions const & perms=permissions ())284     void create(const wchar_t* name, shared_state* shared, permissions const& perms = permissions())
285     {
286         m_event.create(name, false, perms);
287         m_shared_state = shared;
288     }
289 
open(const wchar_t * name,shared_state * shared)290     void open(const wchar_t* name, shared_state* shared)
291     {
292         m_event.open(name);
293         m_shared_state = shared;
294     }
295 
try_lock()296     bool try_lock()
297     {
298         return !m_shared_state->m_lock_state.bit_test_and_set(lock_flag_bit, boost::memory_order_acquire);
299     }
300 
lock()301     void lock()
302     {
303         if (BOOST_UNLIKELY(!try_lock()))
304             lock_slow();
305     }
306 
lock(boost::winapi::HANDLE_ abort_handle)307     bool lock(boost::winapi::HANDLE_ abort_handle)
308     {
309         if (BOOST_LIKELY(try_lock()))
310             return true;
311         return lock_slow(abort_handle);
312     }
313 
unlock()314     void unlock() BOOST_NOEXCEPT
315     {
316         const uint32_t old_count = m_shared_state->m_lock_state.fetch_add(lock_flag_value, boost::memory_order_release);
317         if ((old_count & event_set_flag_value) == 0u && (old_count > lock_flag_value))
318         {
319             if (!m_shared_state->m_lock_state.bit_test_and_set(event_set_flag_bit, boost::memory_order_relaxed))
320             {
321                 m_event.set_noexcept();
322             }
323         }
324     }
325 
326     BOOST_DELETED_FUNCTION(interprocess_mutex(interprocess_mutex const&))
327     BOOST_DELETED_FUNCTION(interprocess_mutex& operator=(interprocess_mutex const&))
328 
329 private:
330     void lock_slow();
331     bool lock_slow(boost::winapi::HANDLE_ abort_handle);
332     void mark_waiting_and_try_lock(uint32_t& old_state);
333     void clear_waiting_and_try_lock(uint32_t& old_state);
334 };
335 
336 //! A simple clock that corresponds to GetTickCount/GetTickCount64 timeline
337 struct tick_count_clock
338 {
339 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
340     typedef boost::winapi::ULONGLONG_ time_point;
341 #else
342     typedef boost::winapi::DWORD_ time_point;
343 #endif
344 
nowboost::ipc::aux::tick_count_clock345     static time_point now() BOOST_NOEXCEPT
346     {
347 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
348         return boost::winapi::GetTickCount64();
349 #else
350         return boost::winapi::GetTickCount();
351 #endif
352     }
353 };
354 
355 //! Interprocess condition variable
356 class interprocess_condition_variable
357 {
358 private:
359     typedef boost::intrusive::list_base_hook<
360         boost::intrusive::tag< struct for_sem_order_by_usage >,
361         boost::intrusive::link_mode< boost::intrusive::safe_link >
362     > semaphore_info_list_hook_t;
363 
364     typedef boost::intrusive::set_base_hook<
365         boost::intrusive::tag< struct for_sem_lookup_by_id >,
366         boost::intrusive::link_mode< boost::intrusive::safe_link >,
367         boost::intrusive::optimize_size< true >
368     > semaphore_info_set_hook_t;
369 
370     //! Information about a semaphore object
371     struct semaphore_info :
372         public semaphore_info_list_hook_t,
373         public semaphore_info_set_hook_t
374     {
375         struct order_by_id
376         {
377             typedef bool result_type;
378 
operator ()boost::ipc::aux::interprocess_condition_variable::semaphore_info::order_by_id379             result_type operator() (semaphore_info const& left, semaphore_info const& right) const BOOST_NOEXCEPT
380             {
381                 return left.m_id < right.m_id;
382             }
operator ()boost::ipc::aux::interprocess_condition_variable::semaphore_info::order_by_id383             result_type operator() (semaphore_info const& left, uint32_t right) const BOOST_NOEXCEPT
384             {
385                 return left.m_id < right;
386             }
operator ()boost::ipc::aux::interprocess_condition_variable::semaphore_info::order_by_id387             result_type operator() (uint32_t left, semaphore_info const& right) const BOOST_NOEXCEPT
388             {
389                 return left < right.m_id;
390             }
391         };
392 
393         //! The semaphore
394         interprocess_semaphore m_semaphore;
395         //! Timestamp of the moment when the semaphore was checked for zero count and it was not zero. In milliseconds since epoch.
396         tick_count_clock::time_point m_last_check_for_zero;
397         //! The flag indicates that the semaphore has been checked for zero count and it was not zero
398         bool m_checked_for_zero;
399         //! The semaphore id
400         const uint32_t m_id;
401 
semaphore_infoboost::ipc::aux::interprocess_condition_variable::semaphore_info402         explicit semaphore_info(uint32_t id) BOOST_NOEXCEPT : m_last_check_for_zero(0u), m_id(id)
403         {
404         }
405 
406         //! Checks if the semaphore is in 'non-zero' state for too long
check_non_zero_timeoutboost::ipc::aux::interprocess_condition_variable::semaphore_info407         bool check_non_zero_timeout(tick_count_clock::time_point now) BOOST_NOEXCEPT
408         {
409             if (!m_checked_for_zero)
410             {
411                 m_last_check_for_zero = now;
412                 m_checked_for_zero = true;
413                 return false;
414             }
415 
416             return (now - m_last_check_for_zero) >= 2000u;
417         }
418 
419         BOOST_DELETED_FUNCTION(semaphore_info(semaphore_info const&))
420         BOOST_DELETED_FUNCTION(semaphore_info& operator=(semaphore_info const&))
421     };
422 
423     typedef boost::intrusive::list<
424         semaphore_info,
425         boost::intrusive::base_hook< semaphore_info_list_hook_t >,
426         boost::intrusive::constant_time_size< false >
427     > semaphore_info_list;
428 
429     typedef boost::intrusive::set<
430         semaphore_info,
431         boost::intrusive::base_hook< semaphore_info_set_hook_t >,
432         boost::intrusive::compare< semaphore_info::order_by_id >,
433         boost::intrusive::constant_time_size< false >
434     > semaphore_info_set;
435 
436 public:
437     struct shared_state
438     {
439         //! Number of waiters blocked on the semaphore if >0, 0 if no threads are blocked, <0 when the blocked threads were signalled
440         int32_t m_waiters;
441         //! The semaphore generation
442         uint32_t m_generation;
443         //! Id of the semaphore which is used to block threads on
444         uint32_t m_semaphore_id;
445 
shared_stateboost::ipc::aux::interprocess_condition_variable::shared_state446         shared_state() BOOST_NOEXCEPT :
447             m_waiters(0),
448             m_generation(0u),
449             m_semaphore_id(0u)
450         {
451         }
452     };
453 
454 private:
455     //! The list of semaphores used for blocking. The list is in the order at which the semaphores are considered to be picked for being used.
456     semaphore_info_list m_semaphore_info_list;
457     //! The list of semaphores used for blocking. Used for searching for a particular semaphore by id.
458     semaphore_info_set m_semaphore_info_set;
459     //! The semaphore that is currently being used for blocking
460     semaphore_info* m_current_semaphore;
461     //! A string storage for formatting a semaphore name
462     std::wstring m_semaphore_name;
463     //! Permissions used to create new semaphores
464     permissions m_perms;
465     //! Process-shared state
466     shared_state* m_shared_state;
467     //! The next id for creating a new semaphore
468     uint32_t m_next_semaphore_id;
469 
470 public:
interprocess_condition_variable()471     interprocess_condition_variable() BOOST_NOEXCEPT :
472         m_current_semaphore(NULL),
473         m_shared_state(NULL),
474         m_next_semaphore_id(0u)
475     {
476     }
477 
~interprocess_condition_variable()478     ~interprocess_condition_variable()
479     {
480         m_semaphore_info_set.clear();
481         m_semaphore_info_list.clear_and_dispose(boost::checked_deleter< semaphore_info >());
482     }
483 
init(const wchar_t * name,shared_state * shared,permissions const & perms=permissions ())484     void init(const wchar_t* name, shared_state* shared, permissions const& perms = permissions())
485     {
486         m_perms = perms;
487         m_shared_state = shared;
488 
489         m_semaphore_name = name;
490         // Reserve space for generate_semaphore_name()
491         m_semaphore_name.append(L".sem00000000");
492 
493         m_current_semaphore = get_semaphore(m_shared_state->m_semaphore_id);
494     }
495 
notify_all()496     void notify_all()
497     {
498         const int32_t waiters = m_shared_state->m_waiters;
499         if (waiters > 0)
500         {
501             const uint32_t id = m_shared_state->m_semaphore_id;
502             if (m_current_semaphore->m_id != id)
503                 m_current_semaphore = get_semaphore(id);
504 
505             m_current_semaphore->m_semaphore.post(waiters);
506             m_shared_state->m_waiters = -waiters;
507         }
508     }
509 
510     bool wait(interprocess_mutex::optional_unlock& lock, boost::winapi::HANDLE_ abort_handle);
511 
512     BOOST_DELETED_FUNCTION(interprocess_condition_variable(interprocess_condition_variable const&))
513     BOOST_DELETED_FUNCTION(interprocess_condition_variable& operator=(interprocess_condition_variable const&))
514 
515 private:
516     //! Finds or opens a semaphore with the specified id
517     semaphore_info* get_semaphore(uint32_t id);
518     //! Finds or creates a semaphore with zero counter
519     semaphore_info* get_unused_semaphore();
520 
521     //! Marks the semaphore info as unused and moves to the end of list
522     void mark_unused(semaphore_info& info) BOOST_NOEXCEPT;
523 
524     //! Generates semaphore name according to id
525     void generate_semaphore_name(uint32_t id) BOOST_NOEXCEPT;
526 
527     //! Returns \c true if \a left is less than \a right considering possible integer overflow
is_overflow_less(uint32_t left,uint32_t right)528     static bool is_overflow_less(uint32_t left, uint32_t right) BOOST_NOEXCEPT
529     {
530         return ((left - right) & 0x80000000u) != 0u;
531     }
532 };
533 
534 } // namespace aux
535 
536 } // namespace ipc
537 
538 BOOST_LOG_CLOSE_NAMESPACE // namespace log
539 
540 } // namespace boost
541 
542 #include <boost/log/detail/footer.hpp>
543 
544 #endif // BOOST_LOG_WINDOWS_IPC_SYNC_WRAPPERS_HPP_INCLUDED_
545