• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "absl/synchronization/internal/waiter.h"
16 
17 #include "absl/base/config.h"
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26 
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31 
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39 
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45 
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50 
51 namespace absl {
52 ABSL_NAMESPACE_BEGIN
53 namespace synchronization_internal {
54 
MaybeBecomeIdle()55 static void MaybeBecomeIdle() {
56   base_internal::ThreadIdentity *identity =
57       base_internal::CurrentThreadIdentityIfPresent();
58   assert(identity != nullptr);
59   const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
60   const int ticker = identity->ticker.load(std::memory_order_relaxed);
61   const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
62   if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
63     identity->is_idle.store(true, std::memory_order_relaxed);
64   }
65 }
66 
67 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
68 
69 // Some Android headers are missing these definitions even though they
70 // support these futex operations.
71 #ifdef __BIONIC__
72 #ifndef SYS_futex
73 #define SYS_futex __NR_futex
74 #endif
75 #ifndef FUTEX_WAIT_BITSET
76 #define FUTEX_WAIT_BITSET 9
77 #endif
78 #ifndef FUTEX_PRIVATE_FLAG
79 #define FUTEX_PRIVATE_FLAG 128
80 #endif
81 #ifndef FUTEX_CLOCK_REALTIME
82 #define FUTEX_CLOCK_REALTIME 256
83 #endif
84 #ifndef FUTEX_BITSET_MATCH_ANY
85 #define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
86 #endif
87 #endif
88 
89 class Futex {
90  public:
WaitUntil(std::atomic<int32_t> * v,int32_t val,KernelTimeout t)91   static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
92                        KernelTimeout t) {
93     int err = 0;
94     if (t.has_timeout()) {
95       // https://locklessinc.com/articles/futex_cheat_sheet/
96       // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
97       struct timespec abs_timeout = t.MakeAbsTimespec();
98       // Atomically check that the futex value is still 0, and if it
99       // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
100       err = syscall(
101           SYS_futex, reinterpret_cast<int32_t *>(v),
102           FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
103           &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
104     } else {
105       // Atomically check that the futex value is still 0, and if it
106       // is, sleep until woken by FUTEX_WAKE.
107       err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
108                     FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
109     }
110     if (err != 0) {
111       err = -errno;
112     }
113     return err;
114   }
115 
Wake(std::atomic<int32_t> * v,int32_t count)116   static int Wake(std::atomic<int32_t> *v, int32_t count) {
117     int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
118                       FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
119     if (ABSL_PREDICT_FALSE(err < 0)) {
120       err = -errno;
121     }
122     return err;
123   }
124 };
125 
Waiter()126 Waiter::Waiter() {
127   futex_.store(0, std::memory_order_relaxed);
128 }
129 
130 Waiter::~Waiter() = default;
131 
Wait(KernelTimeout t)132 bool Waiter::Wait(KernelTimeout t) {
133   // Loop until we can atomically decrement futex from a positive
134   // value, waiting on a futex while we believe it is zero.
135   // Note that, since the thread ticker is just reset, we don't need to check
136   // whether the thread is idle on the very first pass of the loop.
137   bool first_pass = true;
138   while (true) {
139     int32_t x = futex_.load(std::memory_order_relaxed);
140     while (x != 0) {
141       if (!futex_.compare_exchange_weak(x, x - 1,
142                                         std::memory_order_acquire,
143                                         std::memory_order_relaxed)) {
144         continue;  // Raced with someone, retry.
145       }
146       return true;  // Consumed a wakeup, we are done.
147     }
148 
149 
150     if (!first_pass) MaybeBecomeIdle();
151     const int err = Futex::WaitUntil(&futex_, 0, t);
152     if (err != 0) {
153       if (err == -EINTR || err == -EWOULDBLOCK) {
154         // Do nothing, the loop will retry.
155       } else if (err == -ETIMEDOUT) {
156         return false;
157       } else {
158         ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
159       }
160     }
161     first_pass = false;
162   }
163 }
164 
Post()165 void Waiter::Post() {
166   if (futex_.fetch_add(1, std::memory_order_release) == 0) {
167     // We incremented from 0, need to wake a potential waiter.
168     Poke();
169   }
170 }
171 
Poke()172 void Waiter::Poke() {
173   // Wake one thread waiting on the futex.
174   const int err = Futex::Wake(&futex_, 1);
175   if (ABSL_PREDICT_FALSE(err < 0)) {
176     ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
177   }
178 }
179 
180 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
181 
182 class PthreadMutexHolder {
183  public:
PthreadMutexHolder(pthread_mutex_t * mu)184   explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
185     const int err = pthread_mutex_lock(mu_);
186     if (err != 0) {
187       ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
188     }
189   }
190 
191   PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
192   PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
193 
~PthreadMutexHolder()194   ~PthreadMutexHolder() {
195     const int err = pthread_mutex_unlock(mu_);
196     if (err != 0) {
197       ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
198     }
199   }
200 
201  private:
202   pthread_mutex_t *mu_;
203 };
204 
Waiter()205 Waiter::Waiter() {
206   const int err = pthread_mutex_init(&mu_, 0);
207   if (err != 0) {
208     ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
209   }
210 
211   const int err2 = pthread_cond_init(&cv_, 0);
212   if (err2 != 0) {
213     ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
214   }
215 
216   waiter_count_ = 0;
217   wakeup_count_ = 0;
218 }
219 
~Waiter()220 Waiter::~Waiter() {
221   const int err = pthread_mutex_destroy(&mu_);
222   if (err != 0) {
223     ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
224   }
225 
226   const int err2 = pthread_cond_destroy(&cv_);
227   if (err2 != 0) {
228     ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
229   }
230 }
231 
Wait(KernelTimeout t)232 bool Waiter::Wait(KernelTimeout t) {
233   struct timespec abs_timeout;
234   if (t.has_timeout()) {
235     abs_timeout = t.MakeAbsTimespec();
236   }
237 
238   PthreadMutexHolder h(&mu_);
239   ++waiter_count_;
240   // Loop until we find a wakeup to consume or timeout.
241   // Note that, since the thread ticker is just reset, we don't need to check
242   // whether the thread is idle on the very first pass of the loop.
243   bool first_pass = true;
244   while (wakeup_count_ == 0) {
245     if (!first_pass) MaybeBecomeIdle();
246     // No wakeups available, time to wait.
247     if (!t.has_timeout()) {
248       const int err = pthread_cond_wait(&cv_, &mu_);
249       if (err != 0) {
250         ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
251       }
252     } else {
253       const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
254       if (err == ETIMEDOUT) {
255         --waiter_count_;
256         return false;
257       }
258       if (err != 0) {
259         ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
260       }
261     }
262     first_pass = false;
263   }
264   // Consume a wakeup and we're done.
265   --wakeup_count_;
266   --waiter_count_;
267   return true;
268 }
269 
Post()270 void Waiter::Post() {
271   PthreadMutexHolder h(&mu_);
272   ++wakeup_count_;
273   InternalCondVarPoke();
274 }
275 
Poke()276 void Waiter::Poke() {
277   PthreadMutexHolder h(&mu_);
278   InternalCondVarPoke();
279 }
280 
InternalCondVarPoke()281 void Waiter::InternalCondVarPoke() {
282   if (waiter_count_ != 0) {
283     const int err = pthread_cond_signal(&cv_);
284     if (ABSL_PREDICT_FALSE(err != 0)) {
285       ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
286     }
287   }
288 }
289 
290 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
291 
Waiter()292 Waiter::Waiter() {
293   if (sem_init(&sem_, 0, 0) != 0) {
294     ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
295   }
296   wakeups_.store(0, std::memory_order_relaxed);
297 }
298 
~Waiter()299 Waiter::~Waiter() {
300   if (sem_destroy(&sem_) != 0) {
301     ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
302   }
303 }
304 
Wait(KernelTimeout t)305 bool Waiter::Wait(KernelTimeout t) {
306   struct timespec abs_timeout;
307   if (t.has_timeout()) {
308     abs_timeout = t.MakeAbsTimespec();
309   }
310 
311   // Loop until we timeout or consume a wakeup.
312   // Note that, since the thread ticker is just reset, we don't need to check
313   // whether the thread is idle on the very first pass of the loop.
314   bool first_pass = true;
315   while (true) {
316     int x = wakeups_.load(std::memory_order_relaxed);
317     while (x != 0) {
318       if (!wakeups_.compare_exchange_weak(x, x - 1,
319                                           std::memory_order_acquire,
320                                           std::memory_order_relaxed)) {
321         continue;  // Raced with someone, retry.
322       }
323       // Successfully consumed a wakeup, we're done.
324       return true;
325     }
326 
327     if (!first_pass) MaybeBecomeIdle();
328     // Nothing to consume, wait (looping on EINTR).
329     while (true) {
330       if (!t.has_timeout()) {
331         if (sem_wait(&sem_) == 0) break;
332         if (errno == EINTR) continue;
333         ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
334       } else {
335         if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
336         if (errno == EINTR) continue;
337         if (errno == ETIMEDOUT) return false;
338         ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
339       }
340     }
341     first_pass = false;
342   }
343 }
344 
Post()345 void Waiter::Post() {
346   // Post a wakeup.
347   if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
348     // We incremented from 0, need to wake a potential waiter.
349     Poke();
350   }
351 }
352 
Poke()353 void Waiter::Poke() {
354   if (sem_post(&sem_) != 0) {  // Wake any semaphore waiter.
355     ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
356   }
357 }
358 
359 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
360 
361 class Waiter::WinHelper {
362  public:
GetLock(Waiter * w)363   static SRWLOCK *GetLock(Waiter *w) {
364     return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
365   }
366 
GetCond(Waiter * w)367   static CONDITION_VARIABLE *GetCond(Waiter *w) {
368     return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
369   }
370 
371   static_assert(sizeof(SRWLOCK) == sizeof(void *),
372                 "`mu_storage_` does not have the same size as SRWLOCK");
373   static_assert(alignof(SRWLOCK) == alignof(void *),
374                 "`mu_storage_` does not have the same alignment as SRWLOCK");
375 
376   static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
377                 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
378                 "as `CONDITION_VARIABLE`");
379   static_assert(
380       alignof(CONDITION_VARIABLE) == alignof(void *),
381       "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
382 
383   // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
384   // and destructible because we never call their constructors or destructors.
385   static_assert(std::is_trivially_constructible<SRWLOCK>::value,
386                 "The `SRWLOCK` type must be trivially constructible");
387   static_assert(
388       std::is_trivially_constructible<CONDITION_VARIABLE>::value,
389       "The `CONDITION_VARIABLE` type must be trivially constructible");
390   static_assert(std::is_trivially_destructible<SRWLOCK>::value,
391                 "The `SRWLOCK` type must be trivially destructible");
392   static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
393                 "The `CONDITION_VARIABLE` type must be trivially destructible");
394 };
395 
396 class LockHolder {
397  public:
LockHolder(SRWLOCK * mu)398   explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
399     AcquireSRWLockExclusive(mu_);
400   }
401 
402   LockHolder(const LockHolder&) = delete;
403   LockHolder& operator=(const LockHolder&) = delete;
404 
~LockHolder()405   ~LockHolder() {
406     ReleaseSRWLockExclusive(mu_);
407   }
408 
409  private:
410   SRWLOCK* mu_;
411 };
412 
Waiter()413 Waiter::Waiter() {
414   auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
415   auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
416   InitializeSRWLock(mu);
417   InitializeConditionVariable(cv);
418   waiter_count_ = 0;
419   wakeup_count_ = 0;
420 }
421 
422 // SRW locks and condition variables do not need to be explicitly destroyed.
423 // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
424 // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
425 Waiter::~Waiter() = default;
426 
Wait(KernelTimeout t)427 bool Waiter::Wait(KernelTimeout t) {
428   SRWLOCK *mu = WinHelper::GetLock(this);
429   CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
430 
431   LockHolder h(mu);
432   ++waiter_count_;
433 
434   // Loop until we find a wakeup to consume or timeout.
435   // Note that, since the thread ticker is just reset, we don't need to check
436   // whether the thread is idle on the very first pass of the loop.
437   bool first_pass = true;
438   while (wakeup_count_ == 0) {
439     if (!first_pass) MaybeBecomeIdle();
440     // No wakeups available, time to wait.
441     if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
442       // GetLastError() returns a Win32 DWORD, but we assign to
443       // unsigned long to simplify the ABSL_RAW_LOG case below.  The uniform
444       // initialization guarantees this is not a narrowing conversion.
445       const unsigned long err{GetLastError()};  // NOLINT(runtime/int)
446       if (err == ERROR_TIMEOUT) {
447         --waiter_count_;
448         return false;
449       } else {
450         ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
451       }
452     }
453     first_pass = false;
454   }
455   // Consume a wakeup and we're done.
456   --wakeup_count_;
457   --waiter_count_;
458   return true;
459 }
460 
Post()461 void Waiter::Post() {
462   LockHolder h(WinHelper::GetLock(this));
463   ++wakeup_count_;
464   InternalCondVarPoke();
465 }
466 
Poke()467 void Waiter::Poke() {
468   LockHolder h(WinHelper::GetLock(this));
469   InternalCondVarPoke();
470 }
471 
InternalCondVarPoke()472 void Waiter::InternalCondVarPoke() {
473   if (waiter_count_ != 0) {
474     WakeConditionVariable(WinHelper::GetCond(this));
475   }
476 }
477 
478 #else
479 #error Unknown ABSL_WAITER_MODE
480 #endif
481 
482 }  // namespace synchronization_internal
483 ABSL_NAMESPACE_END
484 }  // namespace absl
485