• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "absl/synchronization/internal/waiter.h"
16 
17 #include "absl/base/config.h"
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31 
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39 
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45 
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50 
51 
52 namespace absl {
53 ABSL_NAMESPACE_BEGIN
54 namespace synchronization_internal {
55 
MaybeBecomeIdle()56 static void MaybeBecomeIdle() {
57   base_internal::ThreadIdentity *identity =
58       base_internal::CurrentThreadIdentityIfPresent();
59   assert(identity != nullptr);
60   const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
61   const int ticker = identity->ticker.load(std::memory_order_relaxed);
62   const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
63   if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
64     identity->is_idle.store(true, std::memory_order_relaxed);
65   }
66 }
67 
68 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
69 
Waiter()70 Waiter::Waiter() {
71   futex_.store(0, std::memory_order_relaxed);
72 }
73 
74 Waiter::~Waiter() = default;
75 
Wait(KernelTimeout t)76 bool Waiter::Wait(KernelTimeout t) {
77   // Loop until we can atomically decrement futex from a positive
78   // value, waiting on a futex while we believe it is zero.
79   // Note that, since the thread ticker is just reset, we don't need to check
80   // whether the thread is idle on the very first pass of the loop.
81   bool first_pass = true;
82 
83   while (true) {
84     int32_t x = futex_.load(std::memory_order_relaxed);
85     while (x != 0) {
86       if (!futex_.compare_exchange_weak(x, x - 1,
87                                         std::memory_order_acquire,
88                                         std::memory_order_relaxed)) {
89         continue;  // Raced with someone, retry.
90       }
91       return true;  // Consumed a wakeup, we are done.
92     }
93 
94     if (!first_pass) MaybeBecomeIdle();
95     const int err = Futex::WaitUntil(&futex_, 0, t);
96     if (err != 0) {
97       if (err == -EINTR || err == -EWOULDBLOCK) {
98         // Do nothing, the loop will retry.
99       } else if (err == -ETIMEDOUT) {
100         return false;
101       } else {
102         ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
103       }
104     }
105     first_pass = false;
106   }
107 }
108 
Post()109 void Waiter::Post() {
110   if (futex_.fetch_add(1, std::memory_order_release) == 0) {
111     // We incremented from 0, need to wake a potential waiter.
112     Poke();
113   }
114 }
115 
Poke()116 void Waiter::Poke() {
117   // Wake one thread waiting on the futex.
118   const int err = Futex::Wake(&futex_, 1);
119   if (ABSL_PREDICT_FALSE(err < 0)) {
120     ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
121   }
122 }
123 
124 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
125 
126 class PthreadMutexHolder {
127  public:
PthreadMutexHolder(pthread_mutex_t * mu)128   explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
129     const int err = pthread_mutex_lock(mu_);
130     if (err != 0) {
131       ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
132     }
133   }
134 
135   PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
136   PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
137 
~PthreadMutexHolder()138   ~PthreadMutexHolder() {
139     const int err = pthread_mutex_unlock(mu_);
140     if (err != 0) {
141       ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
142     }
143   }
144 
145  private:
146   pthread_mutex_t *mu_;
147 };
148 
Waiter()149 Waiter::Waiter() {
150   const int err = pthread_mutex_init(&mu_, 0);
151   if (err != 0) {
152     ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
153   }
154 
155   const int err2 = pthread_cond_init(&cv_, 0);
156   if (err2 != 0) {
157     ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
158   }
159 
160   waiter_count_ = 0;
161   wakeup_count_ = 0;
162 }
163 
~Waiter()164 Waiter::~Waiter() {
165   const int err = pthread_mutex_destroy(&mu_);
166   if (err != 0) {
167     ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
168   }
169 
170   const int err2 = pthread_cond_destroy(&cv_);
171   if (err2 != 0) {
172     ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
173   }
174 }
175 
Wait(KernelTimeout t)176 bool Waiter::Wait(KernelTimeout t) {
177   struct timespec abs_timeout;
178   if (t.has_timeout()) {
179     abs_timeout = t.MakeAbsTimespec();
180   }
181 
182   PthreadMutexHolder h(&mu_);
183   ++waiter_count_;
184   // Loop until we find a wakeup to consume or timeout.
185   // Note that, since the thread ticker is just reset, we don't need to check
186   // whether the thread is idle on the very first pass of the loop.
187   bool first_pass = true;
188   while (wakeup_count_ == 0) {
189     if (!first_pass) MaybeBecomeIdle();
190     // No wakeups available, time to wait.
191     if (!t.has_timeout()) {
192       const int err = pthread_cond_wait(&cv_, &mu_);
193       if (err != 0) {
194         ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
195       }
196     } else {
197       const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
198       if (err == ETIMEDOUT) {
199         --waiter_count_;
200         return false;
201       }
202       if (err != 0) {
203         ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
204       }
205     }
206     first_pass = false;
207   }
208   // Consume a wakeup and we're done.
209   --wakeup_count_;
210   --waiter_count_;
211   return true;
212 }
213 
Post()214 void Waiter::Post() {
215   PthreadMutexHolder h(&mu_);
216   ++wakeup_count_;
217   InternalCondVarPoke();
218 }
219 
Poke()220 void Waiter::Poke() {
221   PthreadMutexHolder h(&mu_);
222   InternalCondVarPoke();
223 }
224 
InternalCondVarPoke()225 void Waiter::InternalCondVarPoke() {
226   if (waiter_count_ != 0) {
227     const int err = pthread_cond_signal(&cv_);
228     if (ABSL_PREDICT_FALSE(err != 0)) {
229       ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
230     }
231   }
232 }
233 
234 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
235 
Waiter()236 Waiter::Waiter() {
237   if (sem_init(&sem_, 0, 0) != 0) {
238     ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
239   }
240   wakeups_.store(0, std::memory_order_relaxed);
241 }
242 
~Waiter()243 Waiter::~Waiter() {
244   if (sem_destroy(&sem_) != 0) {
245     ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
246   }
247 }
248 
Wait(KernelTimeout t)249 bool Waiter::Wait(KernelTimeout t) {
250   struct timespec abs_timeout;
251   if (t.has_timeout()) {
252     abs_timeout = t.MakeAbsTimespec();
253   }
254 
255   // Loop until we timeout or consume a wakeup.
256   // Note that, since the thread ticker is just reset, we don't need to check
257   // whether the thread is idle on the very first pass of the loop.
258   bool first_pass = true;
259   while (true) {
260     int x = wakeups_.load(std::memory_order_relaxed);
261     while (x != 0) {
262       if (!wakeups_.compare_exchange_weak(x, x - 1,
263                                           std::memory_order_acquire,
264                                           std::memory_order_relaxed)) {
265         continue;  // Raced with someone, retry.
266       }
267       // Successfully consumed a wakeup, we're done.
268       return true;
269     }
270 
271     if (!first_pass) MaybeBecomeIdle();
272     // Nothing to consume, wait (looping on EINTR).
273     while (true) {
274       if (!t.has_timeout()) {
275         if (sem_wait(&sem_) == 0) break;
276         if (errno == EINTR) continue;
277         ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
278       } else {
279         if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
280         if (errno == EINTR) continue;
281         if (errno == ETIMEDOUT) return false;
282         ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
283       }
284     }
285     first_pass = false;
286   }
287 }
288 
Post()289 void Waiter::Post() {
290   // Post a wakeup.
291   if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
292     // We incremented from 0, need to wake a potential waiter.
293     Poke();
294   }
295 }
296 
Poke()297 void Waiter::Poke() {
298   if (sem_post(&sem_) != 0) {  // Wake any semaphore waiter.
299     ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
300   }
301 }
302 
303 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
304 
305 class Waiter::WinHelper {
306  public:
GetLock(Waiter * w)307   static SRWLOCK *GetLock(Waiter *w) {
308     return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
309   }
310 
GetCond(Waiter * w)311   static CONDITION_VARIABLE *GetCond(Waiter *w) {
312     return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
313   }
314 
315   static_assert(sizeof(SRWLOCK) == sizeof(void *),
316                 "`mu_storage_` does not have the same size as SRWLOCK");
317   static_assert(alignof(SRWLOCK) == alignof(void *),
318                 "`mu_storage_` does not have the same alignment as SRWLOCK");
319 
320   static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
321                 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
322                 "as `CONDITION_VARIABLE`");
323   static_assert(
324       alignof(CONDITION_VARIABLE) == alignof(void *),
325       "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
326 
327   // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
328   // and destructible because we never call their constructors or destructors.
329   static_assert(std::is_trivially_constructible<SRWLOCK>::value,
330                 "The `SRWLOCK` type must be trivially constructible");
331   static_assert(
332       std::is_trivially_constructible<CONDITION_VARIABLE>::value,
333       "The `CONDITION_VARIABLE` type must be trivially constructible");
334   static_assert(std::is_trivially_destructible<SRWLOCK>::value,
335                 "The `SRWLOCK` type must be trivially destructible");
336   static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
337                 "The `CONDITION_VARIABLE` type must be trivially destructible");
338 };
339 
340 class LockHolder {
341  public:
LockHolder(SRWLOCK * mu)342   explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
343     AcquireSRWLockExclusive(mu_);
344   }
345 
346   LockHolder(const LockHolder&) = delete;
347   LockHolder& operator=(const LockHolder&) = delete;
348 
~LockHolder()349   ~LockHolder() {
350     ReleaseSRWLockExclusive(mu_);
351   }
352 
353  private:
354   SRWLOCK* mu_;
355 };
356 
Waiter()357 Waiter::Waiter() {
358   auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
359   auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
360   InitializeSRWLock(mu);
361   InitializeConditionVariable(cv);
362   waiter_count_ = 0;
363   wakeup_count_ = 0;
364 }
365 
366 // SRW locks and condition variables do not need to be explicitly destroyed.
367 // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
368 // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
369 Waiter::~Waiter() = default;
370 
Wait(KernelTimeout t)371 bool Waiter::Wait(KernelTimeout t) {
372   SRWLOCK *mu = WinHelper::GetLock(this);
373   CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
374 
375   LockHolder h(mu);
376   ++waiter_count_;
377 
378   // Loop until we find a wakeup to consume or timeout.
379   // Note that, since the thread ticker is just reset, we don't need to check
380   // whether the thread is idle on the very first pass of the loop.
381   bool first_pass = true;
382   while (wakeup_count_ == 0) {
383     if (!first_pass) MaybeBecomeIdle();
384     // No wakeups available, time to wait.
385     if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
386       // GetLastError() returns a Win32 DWORD, but we assign to
387       // unsigned long to simplify the ABSL_RAW_LOG case below.  The uniform
388       // initialization guarantees this is not a narrowing conversion.
389       const unsigned long err{GetLastError()};  // NOLINT(runtime/int)
390       if (err == ERROR_TIMEOUT) {
391         --waiter_count_;
392         return false;
393       } else {
394         ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
395       }
396     }
397     first_pass = false;
398   }
399   // Consume a wakeup and we're done.
400   --wakeup_count_;
401   --waiter_count_;
402   return true;
403 }
404 
Post()405 void Waiter::Post() {
406   LockHolder h(WinHelper::GetLock(this));
407   ++wakeup_count_;
408   InternalCondVarPoke();
409 }
410 
Poke()411 void Waiter::Poke() {
412   LockHolder h(WinHelper::GetLock(this));
413   InternalCondVarPoke();
414 }
415 
InternalCondVarPoke()416 void Waiter::InternalCondVarPoke() {
417   if (waiter_count_ != 0) {
418     WakeConditionVariable(WinHelper::GetCond(this));
419   }
420 }
421 
422 #else
423 #error Unknown ABSL_WAITER_MODE
424 #endif
425 
426 }  // namespace synchronization_internal
427 ABSL_NAMESPACE_END
428 }  // namespace absl
429