• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *              Copyright Andrey Semashev 2016.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 /*!
8  * \file   windows/ipc_sync_wrappers.cpp
9  * \author Andrey Semashev
10  * \date   23.01.2016
11  *
12  * \brief  This header is the Boost.Log library implementation, see the library documentation
13  *         at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
14  */
15 
16 #include <boost/log/detail/config.hpp>
17 #include <boost/winapi/access_rights.hpp>
18 #include <boost/winapi/handles.hpp>
19 #include <boost/winapi/event.hpp>
20 #include <boost/winapi/semaphore.hpp>
21 #include <boost/winapi/wait.hpp>
22 #include <boost/winapi/dll.hpp>
23 #include <boost/winapi/time.hpp>
24 #include <boost/winapi/get_last_error.hpp>
25 #include <boost/winapi/character_code_conversion.hpp>
26 #include <windows.h> // for error codes
27 #include <cstddef>
28 #include <limits>
29 #include <string>
30 #include <utility>
31 #include <boost/assert.hpp>
32 #include <boost/throw_exception.hpp>
33 #include <boost/checked_delete.hpp>
34 #include <boost/memory_order.hpp>
35 #include <boost/atomic/atomic.hpp>
36 #include <boost/log/detail/snprintf.hpp>
37 #include "unique_ptr.hpp"
38 #include "windows/ipc_sync_wrappers.hpp"
39 #include <boost/log/detail/header.hpp>
40 
41 namespace boost {
42 
43 BOOST_LOG_OPEN_NAMESPACE
44 
45 namespace aux {
46 
47 //! Hex character table, defined in dump.cpp
48 extern const char g_hex_char_table[2][16];
49 
50 } // namespace aux
51 
52 namespace ipc {
53 
54 namespace aux {
55 
create(const wchar_t * name,bool manual_reset,permissions const & perms)56 void interprocess_event::create(const wchar_t* name, bool manual_reset, permissions const& perms)
57 {
58 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
59     boost::winapi::HANDLE_ h = boost::winapi::CreateEventExW
60     (
61         reinterpret_cast< boost::winapi::SECURITY_ATTRIBUTES_* >(perms.get_native()),
62         name,
63         boost::winapi::CREATE_EVENT_MANUAL_RESET_ * manual_reset,
64         boost::winapi::SYNCHRONIZE_ | boost::winapi::EVENT_MODIFY_STATE_
65     );
66 #else
67     boost::winapi::HANDLE_ h = boost::winapi::CreateEventW
68     (
69         reinterpret_cast< boost::winapi::SECURITY_ATTRIBUTES_* >(perms.get_native()),
70         manual_reset,
71         false,
72         name
73     );
74 #endif
75     if (BOOST_UNLIKELY(h == NULL))
76     {
77         boost::winapi::DWORD_ err = boost::winapi::GetLastError();
78         BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to create an interprocess event object", (err));
79     }
80 
81     m_event.init(h);
82 }
83 
create_or_open(const wchar_t * name,bool manual_reset,permissions const & perms)84 void interprocess_event::create_or_open(const wchar_t* name, bool manual_reset, permissions const& perms)
85 {
86 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
87     boost::winapi::HANDLE_ h = boost::winapi::CreateEventExW
88     (
89         reinterpret_cast< boost::winapi::SECURITY_ATTRIBUTES_* >(perms.get_native()),
90         name,
91         boost::winapi::CREATE_EVENT_MANUAL_RESET_ * manual_reset,
92         boost::winapi::SYNCHRONIZE_ | boost::winapi::EVENT_MODIFY_STATE_
93     );
94 #else
95     boost::winapi::HANDLE_ h = boost::winapi::CreateEventW
96     (
97         reinterpret_cast< boost::winapi::SECURITY_ATTRIBUTES_* >(perms.get_native()),
98         manual_reset,
99         false,
100         name
101     );
102 #endif
103     if (h == NULL)
104     {
105         const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
106         if (BOOST_LIKELY(err == ERROR_ALREADY_EXISTS))
107         {
108             open(name);
109             return;
110         }
111         else
112         {
113             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to create an interprocess event object", (err));
114         }
115     }
116 
117     m_event.init(h);
118 }
119 
open(const wchar_t * name)120 void interprocess_event::open(const wchar_t* name)
121 {
122     boost::winapi::HANDLE_ h = boost::winapi::OpenEventW(boost::winapi::SYNCHRONIZE_ | boost::winapi::EVENT_MODIFY_STATE_, false, name);
123     if (BOOST_UNLIKELY(h == NULL))
124     {
125         const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
126         BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to open an interprocess event object", (err));
127     }
128 
129     m_event.init(h);
130 }
131 
132 boost::atomic< interprocess_semaphore::is_semaphore_zero_count_t > interprocess_semaphore::is_semaphore_zero_count(&interprocess_semaphore::is_semaphore_zero_count_init);
133 interprocess_semaphore::nt_query_semaphore_t interprocess_semaphore::nt_query_semaphore = NULL;
134 
create_or_open(const wchar_t * name,permissions const & perms)135 void interprocess_semaphore::create_or_open(const wchar_t* name, permissions const& perms)
136 {
137 #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6
138     boost::winapi::HANDLE_ h = boost::winapi::CreateSemaphoreExW
139     (
140         reinterpret_cast< boost::winapi::SECURITY_ATTRIBUTES_* >(perms.get_native()),
141         0, // initial count
142         (std::numeric_limits< boost::winapi::LONG_ >::max)(), // max count
143         name,
144         0u, // flags
145         boost::winapi::SYNCHRONIZE_ | boost::winapi::SEMAPHORE_MODIFY_STATE_ | boost::winapi::SEMAPHORE_QUERY_STATE_
146     );
147 #else
148     boost::winapi::HANDLE_ h = boost::winapi::CreateSemaphoreW
149     (
150         reinterpret_cast< boost::winapi::SECURITY_ATTRIBUTES_* >(perms.get_native()),
151         0, // initial count
152         (std::numeric_limits< boost::winapi::LONG_ >::max)(), // max count
153         name
154     );
155 #endif
156     if (h == NULL)
157     {
158         boost::winapi::DWORD_ err = boost::winapi::GetLastError();
159         if (BOOST_LIKELY(err == ERROR_ALREADY_EXISTS))
160         {
161             open(name);
162             return;
163         }
164         else
165         {
166             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to create an interprocess semaphore object", (err));
167         }
168     }
169 
170     m_sem.init(h);
171 }
172 
open(const wchar_t * name)173 void interprocess_semaphore::open(const wchar_t* name)
174 {
175     boost::winapi::HANDLE_ h = boost::winapi::OpenSemaphoreW(boost::winapi::SYNCHRONIZE_ | boost::winapi::SEMAPHORE_MODIFY_STATE_ | boost::winapi::SEMAPHORE_QUERY_STATE_, false, name);
176     if (BOOST_UNLIKELY(h == NULL))
177     {
178         const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
179         BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to open an interprocess semaphore object", (err));
180     }
181 
182     m_sem.init(h);
183 }
184 
is_semaphore_zero_count_init(boost::winapi::HANDLE_ h)185 bool interprocess_semaphore::is_semaphore_zero_count_init(boost::winapi::HANDLE_ h)
186 {
187     is_semaphore_zero_count_t impl = &interprocess_semaphore::is_semaphore_zero_count_emulated;
188 
189     // Check if ntdll.dll provides NtQuerySemaphore, see: http://undocumented.ntinternals.net/index.html?page=UserMode%2FUndocumented%20Functions%2FNT%20Objects%2FSemaphore%2FNtQuerySemaphore.html
190     boost::winapi::HMODULE_ ntdll = boost::winapi::GetModuleHandleW(L"ntdll.dll");
191     if (ntdll)
192     {
193         nt_query_semaphore_t ntqs = (nt_query_semaphore_t)boost::winapi::get_proc_address(ntdll, "NtQuerySemaphore");
194         if (ntqs)
195         {
196             nt_query_semaphore = ntqs;
197             impl = &interprocess_semaphore::is_semaphore_zero_count_nt_query_semaphore;
198         }
199     }
200 
201     is_semaphore_zero_count.store(impl, boost::memory_order_release);
202 
203     return impl(h);
204 }
205 
is_semaphore_zero_count_nt_query_semaphore(boost::winapi::HANDLE_ h)206 bool interprocess_semaphore::is_semaphore_zero_count_nt_query_semaphore(boost::winapi::HANDLE_ h)
207 {
208     semaphore_basic_information info = {};
209     NTSTATUS_ err = nt_query_semaphore
210     (
211         h,
212         0u, // SemaphoreBasicInformation
213         &info,
214         sizeof(info),
215         NULL
216     );
217     if (BOOST_UNLIKELY(err != 0u))
218     {
219         char buf[sizeof(unsigned int) * 2u + 4u];
220         boost::log::aux::snprintf(buf, sizeof(buf), "0x%08x", static_cast< unsigned int >(err));
221         BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, std::string("Failed to test an interprocess semaphore object for zero count, NT status: ") + buf, (ERROR_INVALID_HANDLE));
222     }
223 
224     return info.current_count == 0u;
225 }
226 
is_semaphore_zero_count_emulated(boost::winapi::HANDLE_ h)227 bool interprocess_semaphore::is_semaphore_zero_count_emulated(boost::winapi::HANDLE_ h)
228 {
229     const boost::winapi::DWORD_ retval = boost::winapi::WaitForSingleObject(h, 0u);
230     if (retval == boost::winapi::wait_timeout)
231     {
232         return true;
233     }
234     else if (BOOST_UNLIKELY(retval != boost::winapi::wait_object_0))
235     {
236         const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
237         BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to test an interprocess semaphore object for zero count", (err));
238     }
239 
240     // Restore the decremented counter
241     BOOST_VERIFY(!!boost::winapi::ReleaseSemaphore(h, 1, NULL));
242 
243     return false;
244 }
245 
246 #if !defined(BOOST_MSVC) || _MSC_VER >= 1800
247 BOOST_CONSTEXPR_OR_CONST uint32_t interprocess_mutex::lock_flag_bit;
248 BOOST_CONSTEXPR_OR_CONST uint32_t interprocess_mutex::event_set_flag_bit;
249 BOOST_CONSTEXPR_OR_CONST uint32_t interprocess_mutex::lock_flag_value;
250 BOOST_CONSTEXPR_OR_CONST uint32_t interprocess_mutex::event_set_flag_value;
251 BOOST_CONSTEXPR_OR_CONST uint32_t interprocess_mutex::waiter_count_mask;
252 #endif
253 
lock_slow()254 void interprocess_mutex::lock_slow()
255 {
256     uint32_t old_state = m_shared_state->m_lock_state.load(boost::memory_order_relaxed);
257     mark_waiting_and_try_lock(old_state);
258 
259     if ((old_state & lock_flag_value) != 0u) try
260     {
261         do
262         {
263             m_event.wait();
264             clear_waiting_and_try_lock(old_state);
265         }
266         while ((old_state & lock_flag_value) != 0u);
267     }
268     catch (...)
269     {
270         m_shared_state->m_lock_state.fetch_sub(1u, boost::memory_order_acq_rel);
271         throw;
272     }
273 }
274 
lock_slow(boost::winapi::HANDLE_ abort_handle)275 bool interprocess_mutex::lock_slow(boost::winapi::HANDLE_ abort_handle)
276 {
277     uint32_t old_state = m_shared_state->m_lock_state.load(boost::memory_order_relaxed);
278     mark_waiting_and_try_lock(old_state);
279 
280     if ((old_state & lock_flag_value) != 0u) try
281     {
282         do
283         {
284             if (!m_event.wait(abort_handle))
285             {
286                 // Wait was interrupted
287                 m_shared_state->m_lock_state.fetch_sub(1u, boost::memory_order_acq_rel);
288                 return false;
289             }
290 
291             clear_waiting_and_try_lock(old_state);
292         }
293         while ((old_state & lock_flag_value) != 0u);
294     }
295     catch (...)
296     {
297         m_shared_state->m_lock_state.fetch_sub(1u, boost::memory_order_acq_rel);
298         throw;
299     }
300 
301     return true;
302 }
303 
mark_waiting_and_try_lock(uint32_t & old_state)304 inline void interprocess_mutex::mark_waiting_and_try_lock(uint32_t& old_state)
305 {
306     uint32_t new_state;
307     do
308     {
309         uint32_t was_locked = (old_state & lock_flag_value);
310         if (was_locked)
311         {
312             // Avoid integer overflows
313             if (BOOST_UNLIKELY((old_state & waiter_count_mask) == waiter_count_mask))
314                 BOOST_LOG_THROW_DESCR(limitation_error, "Too many waiters on an interprocess mutex");
315 
316             new_state = old_state + 1u;
317         }
318         else
319         {
320             new_state = old_state | lock_flag_value;
321         }
322     }
323     while (!m_shared_state->m_lock_state.compare_exchange_weak(old_state, new_state, boost::memory_order_acq_rel, boost::memory_order_relaxed));
324 }
325 
clear_waiting_and_try_lock(uint32_t & old_state)326 inline void interprocess_mutex::clear_waiting_and_try_lock(uint32_t& old_state)
327 {
328     old_state &= ~lock_flag_value;
329     old_state |= event_set_flag_value;
330     uint32_t new_state;
331     do
332     {
333         new_state = ((old_state & lock_flag_value) ? old_state : ((old_state - 1u) | lock_flag_value)) & ~event_set_flag_value;
334     }
335     while (!m_shared_state->m_lock_state.compare_exchange_weak(old_state, new_state, boost::memory_order_acq_rel, boost::memory_order_relaxed));
336 }
337 
338 
wait(interprocess_mutex::optional_unlock & lock,boost::winapi::HANDLE_ abort_handle)339 bool interprocess_condition_variable::wait(interprocess_mutex::optional_unlock& lock, boost::winapi::HANDLE_ abort_handle)
340 {
341     int32_t waiters = m_shared_state->m_waiters;
342     if (waiters < 0)
343     {
344         // We need to select a new semaphore to block on
345         m_current_semaphore = get_unused_semaphore();
346         ++m_shared_state->m_generation;
347         m_shared_state->m_semaphore_id = m_current_semaphore->m_id;
348         waiters = 0;
349     }
350     else
351     {
352         // Avoid integer overflow
353         if (BOOST_UNLIKELY(waiters >= ((std::numeric_limits< int32_t >::max)() - 1)))
354             BOOST_LOG_THROW_DESCR(limitation_error, "Too many waiters on an interprocess condition variable");
355 
356         // Make sure we use the right semaphore to block on
357         const uint32_t id = m_shared_state->m_semaphore_id;
358         if (m_current_semaphore->m_id != id)
359             m_current_semaphore = get_semaphore(id);
360     }
361 
362     m_shared_state->m_waiters = waiters + 1;
363     const uint32_t generation = m_shared_state->m_generation;
364 
365     boost::winapi::HANDLE_ handles[2u] = { m_current_semaphore->m_semaphore.get_handle(), abort_handle };
366 
367     interprocess_mutex* const mutex = lock.disengage();
368     mutex->unlock();
369 
370     boost::winapi::DWORD_ retval = boost::winapi::WaitForMultipleObjects(2u, handles, false, boost::winapi::INFINITE_);
371 
372     if (BOOST_UNLIKELY(retval == boost::winapi::WAIT_FAILED_))
373     {
374         const boost::winapi::DWORD_ err = boost::winapi::GetLastError();
375 
376         // Although highly unrealistic, it is possible that it took so long for the current thread to enter WaitForMultipleObjects that
377         // another thread has managed to destroy the semaphore. This can happen if the semaphore remains in a non-zero state
378         // for too long, which means that another process died while being blocked on the semaphore, and the semaphore was signalled,
379         // and the non-zero state timeout has passed. In this case the most logical behavior for the wait function is to return as
380         // if because of a wakeup.
381         if (err == ERROR_INVALID_HANDLE)
382             retval = boost::winapi::WAIT_OBJECT_0_;
383         else
384             BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to block on an interprocess semaphore object", (err));
385     }
386 
387     // Have to unconditionally lock the mutex here
388     mutex->lock();
389     lock.engage(*mutex);
390 
391     if (generation == m_shared_state->m_generation && m_shared_state->m_waiters > 0)
392         --m_shared_state->m_waiters;
393 
394     return retval == boost::winapi::WAIT_OBJECT_0_;
395 }
396 
397 //! Finds or opens a semaphore with the specified id
get_semaphore(uint32_t id)398 interprocess_condition_variable::semaphore_info* interprocess_condition_variable::get_semaphore(uint32_t id)
399 {
400     semaphore_info_set::insert_commit_data insert_state;
401     std::pair< semaphore_info_set::iterator, bool > res = m_semaphore_info_set.insert_check(id, semaphore_info::order_by_id(), insert_state);
402     if (res.second)
403     {
404         // We need to open the semaphore. It is possible that the semaphore does not exist because all processes that had it opened terminated.
405         // Because of this we also attempt to create it.
406         boost::log::aux::unique_ptr< semaphore_info > p(new semaphore_info(id));
407         generate_semaphore_name(id);
408         p->m_semaphore.create_or_open(m_semaphore_name.c_str(), m_perms);
409 
410         res.first = m_semaphore_info_set.insert_commit(*p, insert_state);
411         m_semaphore_info_list.push_back(*p);
412 
413         return p.release();
414     }
415     else
416     {
417         // Move the semaphore to the end of the list so that the next time we are less likely to use it
418         semaphore_info& info = *res.first;
419         m_semaphore_info_list.erase(m_semaphore_info_list.iterator_to(info));
420         m_semaphore_info_list.push_back(info);
421 
422         return &info;
423     }
424 }
425 
426 //! Finds or creates a semaphore with zero counter
get_unused_semaphore()427 interprocess_condition_variable::semaphore_info* interprocess_condition_variable::get_unused_semaphore()
428 {
429     // Be optimistic, check the current semaphore first
430     if (m_current_semaphore && m_current_semaphore->m_semaphore.is_zero_count())
431     {
432         mark_unused(*m_current_semaphore);
433         return m_current_semaphore;
434     }
435 
436     const tick_count_clock::time_point now = tick_count_clock::now();
437 
438     semaphore_info_list::iterator it = m_semaphore_info_list.begin(), end = m_semaphore_info_list.end();
439     while (it != end)
440     {
441         if (is_overflow_less(m_next_semaphore_id, it->m_id) || m_next_semaphore_id == it->m_id)
442             m_next_semaphore_id = it->m_id + 1u;
443 
444         if (it->m_semaphore.is_zero_count())
445         {
446             semaphore_info& info = *it;
447             mark_unused(info);
448             return &info;
449         }
450         else if (it->check_non_zero_timeout(now))
451         {
452             // The semaphore is non-zero for too long. A blocked process must have crashed. Close it.
453             m_semaphore_info_set.erase(m_semaphore_info_set.iterator_to(*it));
454             m_semaphore_info_list.erase_and_dispose(it++, boost::checked_deleter< semaphore_info >());
455         }
456         else
457         {
458             ++it;
459         }
460     }
461 
462     // No semaphore found, create a new one
463     for (uint32_t semaphore_id = m_next_semaphore_id, semaphore_id_end = semaphore_id - 1u; semaphore_id != semaphore_id_end; ++semaphore_id)
464     {
465         interprocess_semaphore sem;
466         try
467         {
468             generate_semaphore_name(semaphore_id);
469             sem.create_or_open(m_semaphore_name.c_str(), m_perms);
470             if (!sem.is_zero_count())
471                 continue;
472         }
473         catch (...)
474         {
475             // Ignore errors, try the next one
476             continue;
477         }
478 
479         semaphore_info* p = NULL;
480         semaphore_info_set::insert_commit_data insert_state;
481         std::pair< semaphore_info_set::iterator, bool > res = m_semaphore_info_set.insert_check(semaphore_id, semaphore_info::order_by_id(), insert_state);
482         if (res.second)
483         {
484             p = new semaphore_info(semaphore_id);
485             p->m_semaphore.swap(sem);
486 
487             res.first = m_semaphore_info_set.insert_commit(*p, insert_state);
488             m_semaphore_info_list.push_back(*p);
489         }
490         else
491         {
492             // Some of our currently open semaphores must have been released by another thread
493             p = &*res.first;
494             mark_unused(*p);
495         }
496 
497         m_next_semaphore_id = semaphore_id + 1u;
498 
499         return p;
500     }
501 
502     BOOST_LOG_THROW_DESCR(limitation_error, "Too many semaphores are actively used for an interprocess condition variable");
503     BOOST_LOG_UNREACHABLE_RETURN(NULL);
504 }
505 
506 //! Marks the semaphore info as unused and moves to the end of list
mark_unused(semaphore_info & info)507 inline void interprocess_condition_variable::mark_unused(semaphore_info& info) BOOST_NOEXCEPT
508 {
509     // Restart the timeout for non-zero state next time we search for an unused semaphore
510     info.m_checked_for_zero = false;
511     // Move to the end of the list so that we consider this semaphore last
512     m_semaphore_info_list.erase(m_semaphore_info_list.iterator_to(info));
513     m_semaphore_info_list.push_back(info);
514 }
515 
516 //! Generates semaphore name according to id
generate_semaphore_name(uint32_t id)517 inline void interprocess_condition_variable::generate_semaphore_name(uint32_t id) BOOST_NOEXCEPT
518 {
519     // Note: avoid anything that involves locale to make semaphore names as stable as possible
520     BOOST_ASSERT(m_semaphore_name.size() >= 8u);
521 
522     wchar_t* p = &m_semaphore_name[m_semaphore_name.size() - 8u];
523     *p++ = boost::log::aux::g_hex_char_table[0][id >> 28];
524     *p++ = boost::log::aux::g_hex_char_table[0][(id >> 24) & 0x0000000Fu];
525 
526     *p++ = boost::log::aux::g_hex_char_table[0][(id >> 20) & 0x0000000Fu];
527     *p++ = boost::log::aux::g_hex_char_table[0][(id >> 16) & 0x0000000Fu];
528 
529     *p++ = boost::log::aux::g_hex_char_table[0][(id >> 12) & 0x0000000Fu];
530     *p++ = boost::log::aux::g_hex_char_table[0][(id >> 8) & 0x0000000Fu];
531 
532     *p++ = boost::log::aux::g_hex_char_table[0][(id >> 4) & 0x0000000Fu];
533     *p = boost::log::aux::g_hex_char_table[0][id & 0x0000000Fu];
534 }
535 
536 } // namespace aux
537 
538 } // namespace ipc
539 
540 BOOST_LOG_CLOSE_NAMESPACE // namespace log
541 
542 } // namespace boost
543 
544 #include <boost/log/detail/footer.hpp>
545