1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "absl/synchronization/internal/waiter.h"
16
17 #include "absl/base/config.h"
18
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50
51 namespace absl {
52 ABSL_NAMESPACE_BEGIN
53 namespace synchronization_internal {
54
MaybeBecomeIdle()55 static void MaybeBecomeIdle() {
56 base_internal::ThreadIdentity *identity =
57 base_internal::CurrentThreadIdentityIfPresent();
58 assert(identity != nullptr);
59 const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
60 const int ticker = identity->ticker.load(std::memory_order_relaxed);
61 const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
62 if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
63 identity->is_idle.store(true, std::memory_order_relaxed);
64 }
65 }
66
67 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
68
69 // Some Android headers are missing these definitions even though they
70 // support these futex operations.
71 #ifdef __BIONIC__
72 #ifndef SYS_futex
73 #define SYS_futex __NR_futex
74 #endif
75 #ifndef FUTEX_WAIT_BITSET
76 #define FUTEX_WAIT_BITSET 9
77 #endif
78 #ifndef FUTEX_PRIVATE_FLAG
79 #define FUTEX_PRIVATE_FLAG 128
80 #endif
81 #ifndef FUTEX_CLOCK_REALTIME
82 #define FUTEX_CLOCK_REALTIME 256
83 #endif
84 #ifndef FUTEX_BITSET_MATCH_ANY
85 #define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
86 #endif
87 #endif
88
89 #if defined(__NR_futex_time64) && !defined(SYS_futex_time64)
90 #define SYS_futex_time64 __NR_futex_time64
91 #endif
92
93 #if defined(SYS_futex_time64) && !defined(SYS_futex)
94 #define SYS_futex SYS_futex_time64
95 #endif
96
97 class Futex {
98 public:
WaitUntil(std::atomic<int32_t> * v,int32_t val,KernelTimeout t)99 static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
100 KernelTimeout t) {
101 int err = 0;
102 if (t.has_timeout()) {
103 // https://locklessinc.com/articles/futex_cheat_sheet/
104 // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
105 struct timespec abs_timeout = t.MakeAbsTimespec();
106 // Atomically check that the futex value is still 0, and if it
107 // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
108 err = syscall(
109 SYS_futex, reinterpret_cast<int32_t *>(v),
110 FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
111 &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
112 } else {
113 // Atomically check that the futex value is still 0, and if it
114 // is, sleep until woken by FUTEX_WAKE.
115 err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
116 FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
117 }
118 if (err != 0) {
119 err = -errno;
120 }
121 return err;
122 }
123
Wake(std::atomic<int32_t> * v,int32_t count)124 static int Wake(std::atomic<int32_t> *v, int32_t count) {
125 int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
126 FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
127 if (ABSL_PREDICT_FALSE(err < 0)) {
128 err = -errno;
129 }
130 return err;
131 }
132 };
133
Waiter()134 Waiter::Waiter() {
135 futex_.store(0, std::memory_order_relaxed);
136 }
137
138 Waiter::~Waiter() = default;
139
Wait(KernelTimeout t)140 bool Waiter::Wait(KernelTimeout t) {
141 // Loop until we can atomically decrement futex from a positive
142 // value, waiting on a futex while we believe it is zero.
143 // Note that, since the thread ticker is just reset, we don't need to check
144 // whether the thread is idle on the very first pass of the loop.
145 bool first_pass = true;
146 while (true) {
147 int32_t x = futex_.load(std::memory_order_relaxed);
148 while (x != 0) {
149 if (!futex_.compare_exchange_weak(x, x - 1,
150 std::memory_order_acquire,
151 std::memory_order_relaxed)) {
152 continue; // Raced with someone, retry.
153 }
154 return true; // Consumed a wakeup, we are done.
155 }
156
157
158 if (!first_pass) MaybeBecomeIdle();
159 const int err = Futex::WaitUntil(&futex_, 0, t);
160 if (err != 0) {
161 if (err == -EINTR || err == -EWOULDBLOCK) {
162 // Do nothing, the loop will retry.
163 } else if (err == -ETIMEDOUT) {
164 return false;
165 } else {
166 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
167 }
168 }
169 first_pass = false;
170 }
171 }
172
Post()173 void Waiter::Post() {
174 if (futex_.fetch_add(1, std::memory_order_release) == 0) {
175 // We incremented from 0, need to wake a potential waiter.
176 Poke();
177 }
178 }
179
Poke()180 void Waiter::Poke() {
181 // Wake one thread waiting on the futex.
182 const int err = Futex::Wake(&futex_, 1);
183 if (ABSL_PREDICT_FALSE(err < 0)) {
184 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
185 }
186 }
187
188 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
189
190 class PthreadMutexHolder {
191 public:
PthreadMutexHolder(pthread_mutex_t * mu)192 explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
193 const int err = pthread_mutex_lock(mu_);
194 if (err != 0) {
195 ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
196 }
197 }
198
199 PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
200 PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
201
~PthreadMutexHolder()202 ~PthreadMutexHolder() {
203 const int err = pthread_mutex_unlock(mu_);
204 if (err != 0) {
205 ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
206 }
207 }
208
209 private:
210 pthread_mutex_t *mu_;
211 };
212
Waiter()213 Waiter::Waiter() {
214 const int err = pthread_mutex_init(&mu_, 0);
215 if (err != 0) {
216 ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
217 }
218
219 const int err2 = pthread_cond_init(&cv_, 0);
220 if (err2 != 0) {
221 ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
222 }
223
224 waiter_count_ = 0;
225 wakeup_count_ = 0;
226 }
227
~Waiter()228 Waiter::~Waiter() {
229 const int err = pthread_mutex_destroy(&mu_);
230 if (err != 0) {
231 ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
232 }
233
234 const int err2 = pthread_cond_destroy(&cv_);
235 if (err2 != 0) {
236 ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
237 }
238 }
239
Wait(KernelTimeout t)240 bool Waiter::Wait(KernelTimeout t) {
241 struct timespec abs_timeout;
242 if (t.has_timeout()) {
243 abs_timeout = t.MakeAbsTimespec();
244 }
245
246 PthreadMutexHolder h(&mu_);
247 ++waiter_count_;
248 // Loop until we find a wakeup to consume or timeout.
249 // Note that, since the thread ticker is just reset, we don't need to check
250 // whether the thread is idle on the very first pass of the loop.
251 bool first_pass = true;
252 while (wakeup_count_ == 0) {
253 if (!first_pass) MaybeBecomeIdle();
254 // No wakeups available, time to wait.
255 if (!t.has_timeout()) {
256 const int err = pthread_cond_wait(&cv_, &mu_);
257 if (err != 0) {
258 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
259 }
260 } else {
261 const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
262 if (err == ETIMEDOUT) {
263 --waiter_count_;
264 return false;
265 }
266 if (err != 0) {
267 ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
268 }
269 }
270 first_pass = false;
271 }
272 // Consume a wakeup and we're done.
273 --wakeup_count_;
274 --waiter_count_;
275 return true;
276 }
277
Post()278 void Waiter::Post() {
279 PthreadMutexHolder h(&mu_);
280 ++wakeup_count_;
281 InternalCondVarPoke();
282 }
283
Poke()284 void Waiter::Poke() {
285 PthreadMutexHolder h(&mu_);
286 InternalCondVarPoke();
287 }
288
InternalCondVarPoke()289 void Waiter::InternalCondVarPoke() {
290 if (waiter_count_ != 0) {
291 const int err = pthread_cond_signal(&cv_);
292 if (ABSL_PREDICT_FALSE(err != 0)) {
293 ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
294 }
295 }
296 }
297
298 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
299
Waiter()300 Waiter::Waiter() {
301 if (sem_init(&sem_, 0, 0) != 0) {
302 ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
303 }
304 wakeups_.store(0, std::memory_order_relaxed);
305 }
306
~Waiter()307 Waiter::~Waiter() {
308 if (sem_destroy(&sem_) != 0) {
309 ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
310 }
311 }
312
Wait(KernelTimeout t)313 bool Waiter::Wait(KernelTimeout t) {
314 struct timespec abs_timeout;
315 if (t.has_timeout()) {
316 abs_timeout = t.MakeAbsTimespec();
317 }
318
319 // Loop until we timeout or consume a wakeup.
320 // Note that, since the thread ticker is just reset, we don't need to check
321 // whether the thread is idle on the very first pass of the loop.
322 bool first_pass = true;
323 while (true) {
324 int x = wakeups_.load(std::memory_order_relaxed);
325 while (x != 0) {
326 if (!wakeups_.compare_exchange_weak(x, x - 1,
327 std::memory_order_acquire,
328 std::memory_order_relaxed)) {
329 continue; // Raced with someone, retry.
330 }
331 // Successfully consumed a wakeup, we're done.
332 return true;
333 }
334
335 if (!first_pass) MaybeBecomeIdle();
336 // Nothing to consume, wait (looping on EINTR).
337 while (true) {
338 if (!t.has_timeout()) {
339 if (sem_wait(&sem_) == 0) break;
340 if (errno == EINTR) continue;
341 ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
342 } else {
343 if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
344 if (errno == EINTR) continue;
345 if (errno == ETIMEDOUT) return false;
346 ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
347 }
348 }
349 first_pass = false;
350 }
351 }
352
Post()353 void Waiter::Post() {
354 // Post a wakeup.
355 if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
356 // We incremented from 0, need to wake a potential waiter.
357 Poke();
358 }
359 }
360
Poke()361 void Waiter::Poke() {
362 if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
363 ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
364 }
365 }
366
367 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
368
369 class Waiter::WinHelper {
370 public:
GetLock(Waiter * w)371 static SRWLOCK *GetLock(Waiter *w) {
372 return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
373 }
374
GetCond(Waiter * w)375 static CONDITION_VARIABLE *GetCond(Waiter *w) {
376 return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
377 }
378
379 static_assert(sizeof(SRWLOCK) == sizeof(void *),
380 "`mu_storage_` does not have the same size as SRWLOCK");
381 static_assert(alignof(SRWLOCK) == alignof(void *),
382 "`mu_storage_` does not have the same alignment as SRWLOCK");
383
384 static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
385 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
386 "as `CONDITION_VARIABLE`");
387 static_assert(
388 alignof(CONDITION_VARIABLE) == alignof(void *),
389 "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
390
391 // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
392 // and destructible because we never call their constructors or destructors.
393 static_assert(std::is_trivially_constructible<SRWLOCK>::value,
394 "The `SRWLOCK` type must be trivially constructible");
395 static_assert(
396 std::is_trivially_constructible<CONDITION_VARIABLE>::value,
397 "The `CONDITION_VARIABLE` type must be trivially constructible");
398 static_assert(std::is_trivially_destructible<SRWLOCK>::value,
399 "The `SRWLOCK` type must be trivially destructible");
400 static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
401 "The `CONDITION_VARIABLE` type must be trivially destructible");
402 };
403
404 class LockHolder {
405 public:
LockHolder(SRWLOCK * mu)406 explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
407 AcquireSRWLockExclusive(mu_);
408 }
409
410 LockHolder(const LockHolder&) = delete;
411 LockHolder& operator=(const LockHolder&) = delete;
412
~LockHolder()413 ~LockHolder() {
414 ReleaseSRWLockExclusive(mu_);
415 }
416
417 private:
418 SRWLOCK* mu_;
419 };
420
Waiter()421 Waiter::Waiter() {
422 auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
423 auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
424 InitializeSRWLock(mu);
425 InitializeConditionVariable(cv);
426 waiter_count_ = 0;
427 wakeup_count_ = 0;
428 }
429
430 // SRW locks and condition variables do not need to be explicitly destroyed.
431 // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
432 // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
433 Waiter::~Waiter() = default;
434
Wait(KernelTimeout t)435 bool Waiter::Wait(KernelTimeout t) {
436 SRWLOCK *mu = WinHelper::GetLock(this);
437 CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
438
439 LockHolder h(mu);
440 ++waiter_count_;
441
442 // Loop until we find a wakeup to consume or timeout.
443 // Note that, since the thread ticker is just reset, we don't need to check
444 // whether the thread is idle on the very first pass of the loop.
445 bool first_pass = true;
446 while (wakeup_count_ == 0) {
447 if (!first_pass) MaybeBecomeIdle();
448 // No wakeups available, time to wait.
449 if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
450 // GetLastError() returns a Win32 DWORD, but we assign to
451 // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
452 // initialization guarantees this is not a narrowing conversion.
453 const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
454 if (err == ERROR_TIMEOUT) {
455 --waiter_count_;
456 return false;
457 } else {
458 ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
459 }
460 }
461 first_pass = false;
462 }
463 // Consume a wakeup and we're done.
464 --wakeup_count_;
465 --waiter_count_;
466 return true;
467 }
468
Post()469 void Waiter::Post() {
470 LockHolder h(WinHelper::GetLock(this));
471 ++wakeup_count_;
472 InternalCondVarPoke();
473 }
474
Poke()475 void Waiter::Poke() {
476 LockHolder h(WinHelper::GetLock(this));
477 InternalCondVarPoke();
478 }
479
InternalCondVarPoke()480 void Waiter::InternalCondVarPoke() {
481 if (waiter_count_ != 0) {
482 WakeConditionVariable(WinHelper::GetCond(this));
483 }
484 }
485
486 #else
487 #error Unknown ABSL_WAITER_MODE
488 #endif
489
490 } // namespace synchronization_internal
491 ABSL_NAMESPACE_END
492 } // namespace absl
493