• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "absl/synchronization/internal/waiter.h"
16 
17 #include "absl/base/config.h"
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31 
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39 
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45 
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50 
51 namespace absl {
52 ABSL_NAMESPACE_BEGIN
53 namespace synchronization_internal {
54 
MaybeBecomeIdle()55 static void MaybeBecomeIdle() {
56   base_internal::ThreadIdentity *identity =
57       base_internal::CurrentThreadIdentityIfPresent();
58   assert(identity != nullptr);
59   const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
60   const int ticker = identity->ticker.load(std::memory_order_relaxed);
61   const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
62   if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
63     identity->is_idle.store(true, std::memory_order_relaxed);
64   }
65 }
66 
67 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
68 
69 // Some Android headers are missing these definitions even though they
70 // support these futex operations.
71 #ifdef __BIONIC__
72 #ifndef SYS_futex
73 #define SYS_futex __NR_futex
74 #endif
75 #ifndef FUTEX_WAIT_BITSET
76 #define FUTEX_WAIT_BITSET 9
77 #endif
78 #ifndef FUTEX_PRIVATE_FLAG
79 #define FUTEX_PRIVATE_FLAG 128
80 #endif
81 #ifndef FUTEX_CLOCK_REALTIME
82 #define FUTEX_CLOCK_REALTIME 256
83 #endif
84 #ifndef FUTEX_BITSET_MATCH_ANY
85 #define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
86 #endif
87 #endif
88 
89 #if defined(__NR_futex_time64) && !defined(SYS_futex_time64)
90 #define SYS_futex_time64 __NR_futex_time64
91 #endif
92 
93 #if defined(SYS_futex_time64) && !defined(SYS_futex)
94 #define SYS_futex SYS_futex_time64
95 #endif
96 
97 class Futex {
98  public:
WaitUntil(std::atomic<int32_t> * v,int32_t val,KernelTimeout t)99   static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
100                        KernelTimeout t) {
101     int err = 0;
102     if (t.has_timeout()) {
103       // https://locklessinc.com/articles/futex_cheat_sheet/
104       // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
105       struct timespec abs_timeout = t.MakeAbsTimespec();
106       // Atomically check that the futex value is still 0, and if it
107       // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
108       err = syscall(
109           SYS_futex, reinterpret_cast<int32_t *>(v),
110           FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
111           &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
112     } else {
113       // Atomically check that the futex value is still 0, and if it
114       // is, sleep until woken by FUTEX_WAKE.
115       err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
116                     FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
117     }
118     if (err != 0) {
119       err = -errno;
120     }
121     return err;
122   }
123 
Wake(std::atomic<int32_t> * v,int32_t count)124   static int Wake(std::atomic<int32_t> *v, int32_t count) {
125     int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
126                       FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
127     if (ABSL_PREDICT_FALSE(err < 0)) {
128       err = -errno;
129     }
130     return err;
131   }
132 };
133 
Waiter()134 Waiter::Waiter() {
135   futex_.store(0, std::memory_order_relaxed);
136 }
137 
138 Waiter::~Waiter() = default;
139 
Wait(KernelTimeout t)140 bool Waiter::Wait(KernelTimeout t) {
141   // Loop until we can atomically decrement futex from a positive
142   // value, waiting on a futex while we believe it is zero.
143   // Note that, since the thread ticker is just reset, we don't need to check
144   // whether the thread is idle on the very first pass of the loop.
145   bool first_pass = true;
146   while (true) {
147     int32_t x = futex_.load(std::memory_order_relaxed);
148     while (x != 0) {
149       if (!futex_.compare_exchange_weak(x, x - 1,
150                                         std::memory_order_acquire,
151                                         std::memory_order_relaxed)) {
152         continue;  // Raced with someone, retry.
153       }
154       return true;  // Consumed a wakeup, we are done.
155     }
156 
157 
158     if (!first_pass) MaybeBecomeIdle();
159     const int err = Futex::WaitUntil(&futex_, 0, t);
160     if (err != 0) {
161       if (err == -EINTR || err == -EWOULDBLOCK) {
162         // Do nothing, the loop will retry.
163       } else if (err == -ETIMEDOUT) {
164         return false;
165       } else {
166         ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
167       }
168     }
169     first_pass = false;
170   }
171 }
172 
Post()173 void Waiter::Post() {
174   if (futex_.fetch_add(1, std::memory_order_release) == 0) {
175     // We incremented from 0, need to wake a potential waiter.
176     Poke();
177   }
178 }
179 
Poke()180 void Waiter::Poke() {
181   // Wake one thread waiting on the futex.
182   const int err = Futex::Wake(&futex_, 1);
183   if (ABSL_PREDICT_FALSE(err < 0)) {
184     ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
185   }
186 }
187 
188 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
189 
190 class PthreadMutexHolder {
191  public:
PthreadMutexHolder(pthread_mutex_t * mu)192   explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
193     const int err = pthread_mutex_lock(mu_);
194     if (err != 0) {
195       ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
196     }
197   }
198 
199   PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
200   PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
201 
~PthreadMutexHolder()202   ~PthreadMutexHolder() {
203     const int err = pthread_mutex_unlock(mu_);
204     if (err != 0) {
205       ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
206     }
207   }
208 
209  private:
210   pthread_mutex_t *mu_;
211 };
212 
Waiter()213 Waiter::Waiter() {
214   const int err = pthread_mutex_init(&mu_, 0);
215   if (err != 0) {
216     ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
217   }
218 
219   const int err2 = pthread_cond_init(&cv_, 0);
220   if (err2 != 0) {
221     ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
222   }
223 
224   waiter_count_ = 0;
225   wakeup_count_ = 0;
226 }
227 
~Waiter()228 Waiter::~Waiter() {
229   const int err = pthread_mutex_destroy(&mu_);
230   if (err != 0) {
231     ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
232   }
233 
234   const int err2 = pthread_cond_destroy(&cv_);
235   if (err2 != 0) {
236     ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
237   }
238 }
239 
Wait(KernelTimeout t)240 bool Waiter::Wait(KernelTimeout t) {
241   struct timespec abs_timeout;
242   if (t.has_timeout()) {
243     abs_timeout = t.MakeAbsTimespec();
244   }
245 
246   PthreadMutexHolder h(&mu_);
247   ++waiter_count_;
248   // Loop until we find a wakeup to consume or timeout.
249   // Note that, since the thread ticker is just reset, we don't need to check
250   // whether the thread is idle on the very first pass of the loop.
251   bool first_pass = true;
252   while (wakeup_count_ == 0) {
253     if (!first_pass) MaybeBecomeIdle();
254     // No wakeups available, time to wait.
255     if (!t.has_timeout()) {
256       const int err = pthread_cond_wait(&cv_, &mu_);
257       if (err != 0) {
258         ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
259       }
260     } else {
261       const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
262       if (err == ETIMEDOUT) {
263         --waiter_count_;
264         return false;
265       }
266       if (err != 0) {
267         ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
268       }
269     }
270     first_pass = false;
271   }
272   // Consume a wakeup and we're done.
273   --wakeup_count_;
274   --waiter_count_;
275   return true;
276 }
277 
Post()278 void Waiter::Post() {
279   PthreadMutexHolder h(&mu_);
280   ++wakeup_count_;
281   InternalCondVarPoke();
282 }
283 
Poke()284 void Waiter::Poke() {
285   PthreadMutexHolder h(&mu_);
286   InternalCondVarPoke();
287 }
288 
InternalCondVarPoke()289 void Waiter::InternalCondVarPoke() {
290   if (waiter_count_ != 0) {
291     const int err = pthread_cond_signal(&cv_);
292     if (ABSL_PREDICT_FALSE(err != 0)) {
293       ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
294     }
295   }
296 }
297 
298 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
299 
Waiter()300 Waiter::Waiter() {
301   if (sem_init(&sem_, 0, 0) != 0) {
302     ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
303   }
304   wakeups_.store(0, std::memory_order_relaxed);
305 }
306 
~Waiter()307 Waiter::~Waiter() {
308   if (sem_destroy(&sem_) != 0) {
309     ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
310   }
311 }
312 
Wait(KernelTimeout t)313 bool Waiter::Wait(KernelTimeout t) {
314   struct timespec abs_timeout;
315   if (t.has_timeout()) {
316     abs_timeout = t.MakeAbsTimespec();
317   }
318 
319   // Loop until we timeout or consume a wakeup.
320   // Note that, since the thread ticker is just reset, we don't need to check
321   // whether the thread is idle on the very first pass of the loop.
322   bool first_pass = true;
323   while (true) {
324     int x = wakeups_.load(std::memory_order_relaxed);
325     while (x != 0) {
326       if (!wakeups_.compare_exchange_weak(x, x - 1,
327                                           std::memory_order_acquire,
328                                           std::memory_order_relaxed)) {
329         continue;  // Raced with someone, retry.
330       }
331       // Successfully consumed a wakeup, we're done.
332       return true;
333     }
334 
335     if (!first_pass) MaybeBecomeIdle();
336     // Nothing to consume, wait (looping on EINTR).
337     while (true) {
338       if (!t.has_timeout()) {
339         if (sem_wait(&sem_) == 0) break;
340         if (errno == EINTR) continue;
341         ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
342       } else {
343         if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
344         if (errno == EINTR) continue;
345         if (errno == ETIMEDOUT) return false;
346         ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
347       }
348     }
349     first_pass = false;
350   }
351 }
352 
Post()353 void Waiter::Post() {
354   // Post a wakeup.
355   if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
356     // We incremented from 0, need to wake a potential waiter.
357     Poke();
358   }
359 }
360 
Poke()361 void Waiter::Poke() {
362   if (sem_post(&sem_) != 0) {  // Wake any semaphore waiter.
363     ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
364   }
365 }
366 
367 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
368 
369 class Waiter::WinHelper {
370  public:
GetLock(Waiter * w)371   static SRWLOCK *GetLock(Waiter *w) {
372     return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
373   }
374 
GetCond(Waiter * w)375   static CONDITION_VARIABLE *GetCond(Waiter *w) {
376     return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
377   }
378 
379   static_assert(sizeof(SRWLOCK) == sizeof(void *),
380                 "`mu_storage_` does not have the same size as SRWLOCK");
381   static_assert(alignof(SRWLOCK) == alignof(void *),
382                 "`mu_storage_` does not have the same alignment as SRWLOCK");
383 
384   static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
385                 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
386                 "as `CONDITION_VARIABLE`");
387   static_assert(
388       alignof(CONDITION_VARIABLE) == alignof(void *),
389       "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
390 
391   // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
392   // and destructible because we never call their constructors or destructors.
393   static_assert(std::is_trivially_constructible<SRWLOCK>::value,
394                 "The `SRWLOCK` type must be trivially constructible");
395   static_assert(
396       std::is_trivially_constructible<CONDITION_VARIABLE>::value,
397       "The `CONDITION_VARIABLE` type must be trivially constructible");
398   static_assert(std::is_trivially_destructible<SRWLOCK>::value,
399                 "The `SRWLOCK` type must be trivially destructible");
400   static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
401                 "The `CONDITION_VARIABLE` type must be trivially destructible");
402 };
403 
404 class LockHolder {
405  public:
LockHolder(SRWLOCK * mu)406   explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
407     AcquireSRWLockExclusive(mu_);
408   }
409 
410   LockHolder(const LockHolder&) = delete;
411   LockHolder& operator=(const LockHolder&) = delete;
412 
~LockHolder()413   ~LockHolder() {
414     ReleaseSRWLockExclusive(mu_);
415   }
416 
417  private:
418   SRWLOCK* mu_;
419 };
420 
Waiter()421 Waiter::Waiter() {
422   auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
423   auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
424   InitializeSRWLock(mu);
425   InitializeConditionVariable(cv);
426   waiter_count_ = 0;
427   wakeup_count_ = 0;
428 }
429 
430 // SRW locks and condition variables do not need to be explicitly destroyed.
431 // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
432 // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
433 Waiter::~Waiter() = default;
434 
Wait(KernelTimeout t)435 bool Waiter::Wait(KernelTimeout t) {
436   SRWLOCK *mu = WinHelper::GetLock(this);
437   CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
438 
439   LockHolder h(mu);
440   ++waiter_count_;
441 
442   // Loop until we find a wakeup to consume or timeout.
443   // Note that, since the thread ticker is just reset, we don't need to check
444   // whether the thread is idle on the very first pass of the loop.
445   bool first_pass = true;
446   while (wakeup_count_ == 0) {
447     if (!first_pass) MaybeBecomeIdle();
448     // No wakeups available, time to wait.
449     if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
450       // GetLastError() returns a Win32 DWORD, but we assign to
451       // unsigned long to simplify the ABSL_RAW_LOG case below.  The uniform
452       // initialization guarantees this is not a narrowing conversion.
453       const unsigned long err{GetLastError()};  // NOLINT(runtime/int)
454       if (err == ERROR_TIMEOUT) {
455         --waiter_count_;
456         return false;
457       } else {
458         ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
459       }
460     }
461     first_pass = false;
462   }
463   // Consume a wakeup and we're done.
464   --wakeup_count_;
465   --waiter_count_;
466   return true;
467 }
468 
Post()469 void Waiter::Post() {
470   LockHolder h(WinHelper::GetLock(this));
471   ++wakeup_count_;
472   InternalCondVarPoke();
473 }
474 
Poke()475 void Waiter::Poke() {
476   LockHolder h(WinHelper::GetLock(this));
477   InternalCondVarPoke();
478 }
479 
InternalCondVarPoke()480 void Waiter::InternalCondVarPoke() {
481   if (waiter_count_ != 0) {
482     WakeConditionVariable(WinHelper::GetCond(this));
483   }
484 }
485 
486 #else
487 #error Unknown ABSL_WAITER_MODE
488 #endif
489 
490 }  // namespace synchronization_internal
491 ABSL_NAMESPACE_END
492 }  // namespace absl
493