1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "absl/synchronization/internal/waiter.h"
16
17 #include "absl/base/config.h"
18
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50
51 namespace absl {
52 ABSL_NAMESPACE_BEGIN
53 namespace synchronization_internal {
54
MaybeBecomeIdle()55 static void MaybeBecomeIdle() {
56 base_internal::ThreadIdentity *identity =
57 base_internal::CurrentThreadIdentityIfPresent();
58 assert(identity != nullptr);
59 const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
60 const int ticker = identity->ticker.load(std::memory_order_relaxed);
61 const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
62 if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
63 identity->is_idle.store(true, std::memory_order_relaxed);
64 }
65 }
66
67 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
68
69 // Some Android headers are missing these definitions even though they
70 // support these futex operations.
71 #ifdef __BIONIC__
72 #ifndef SYS_futex
73 #define SYS_futex __NR_futex
74 #endif
75 #ifndef FUTEX_WAIT_BITSET
76 #define FUTEX_WAIT_BITSET 9
77 #endif
78 #ifndef FUTEX_PRIVATE_FLAG
79 #define FUTEX_PRIVATE_FLAG 128
80 #endif
81 #ifndef FUTEX_CLOCK_REALTIME
82 #define FUTEX_CLOCK_REALTIME 256
83 #endif
84 #ifndef FUTEX_BITSET_MATCH_ANY
85 #define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
86 #endif
87 #endif
88
89 class Futex {
90 public:
WaitUntil(std::atomic<int32_t> * v,int32_t val,KernelTimeout t)91 static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
92 KernelTimeout t) {
93 int err = 0;
94 if (t.has_timeout()) {
95 // https://locklessinc.com/articles/futex_cheat_sheet/
96 // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
97 struct timespec abs_timeout = t.MakeAbsTimespec();
98 // Atomically check that the futex value is still 0, and if it
99 // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
100 err = syscall(
101 SYS_futex, reinterpret_cast<int32_t *>(v),
102 FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
103 &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
104 } else {
105 // Atomically check that the futex value is still 0, and if it
106 // is, sleep until woken by FUTEX_WAKE.
107 err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
108 FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
109 }
110 if (err != 0) {
111 err = -errno;
112 }
113 return err;
114 }
115
Wake(std::atomic<int32_t> * v,int32_t count)116 static int Wake(std::atomic<int32_t> *v, int32_t count) {
117 int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
118 FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
119 if (ABSL_PREDICT_FALSE(err < 0)) {
120 err = -errno;
121 }
122 return err;
123 }
124 };
125
Waiter()126 Waiter::Waiter() {
127 futex_.store(0, std::memory_order_relaxed);
128 }
129
130 Waiter::~Waiter() = default;
131
Wait(KernelTimeout t)132 bool Waiter::Wait(KernelTimeout t) {
133 // Loop until we can atomically decrement futex from a positive
134 // value, waiting on a futex while we believe it is zero.
135 // Note that, since the thread ticker is just reset, we don't need to check
136 // whether the thread is idle on the very first pass of the loop.
137 bool first_pass = true;
138 while (true) {
139 int32_t x = futex_.load(std::memory_order_relaxed);
140 while (x != 0) {
141 if (!futex_.compare_exchange_weak(x, x - 1,
142 std::memory_order_acquire,
143 std::memory_order_relaxed)) {
144 continue; // Raced with someone, retry.
145 }
146 return true; // Consumed a wakeup, we are done.
147 }
148
149
150 if (!first_pass) MaybeBecomeIdle();
151 const int err = Futex::WaitUntil(&futex_, 0, t);
152 if (err != 0) {
153 if (err == -EINTR || err == -EWOULDBLOCK) {
154 // Do nothing, the loop will retry.
155 } else if (err == -ETIMEDOUT) {
156 return false;
157 } else {
158 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
159 }
160 }
161 first_pass = false;
162 }
163 }
164
Post()165 void Waiter::Post() {
166 if (futex_.fetch_add(1, std::memory_order_release) == 0) {
167 // We incremented from 0, need to wake a potential waiter.
168 Poke();
169 }
170 }
171
Poke()172 void Waiter::Poke() {
173 // Wake one thread waiting on the futex.
174 const int err = Futex::Wake(&futex_, 1);
175 if (ABSL_PREDICT_FALSE(err < 0)) {
176 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
177 }
178 }
179
180 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
181
182 class PthreadMutexHolder {
183 public:
PthreadMutexHolder(pthread_mutex_t * mu)184 explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
185 const int err = pthread_mutex_lock(mu_);
186 if (err != 0) {
187 ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
188 }
189 }
190
191 PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
192 PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
193
~PthreadMutexHolder()194 ~PthreadMutexHolder() {
195 const int err = pthread_mutex_unlock(mu_);
196 if (err != 0) {
197 ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
198 }
199 }
200
201 private:
202 pthread_mutex_t *mu_;
203 };
204
Waiter()205 Waiter::Waiter() {
206 const int err = pthread_mutex_init(&mu_, 0);
207 if (err != 0) {
208 ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
209 }
210
211 const int err2 = pthread_cond_init(&cv_, 0);
212 if (err2 != 0) {
213 ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
214 }
215
216 waiter_count_ = 0;
217 wakeup_count_ = 0;
218 }
219
~Waiter()220 Waiter::~Waiter() {
221 const int err = pthread_mutex_destroy(&mu_);
222 if (err != 0) {
223 ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
224 }
225
226 const int err2 = pthread_cond_destroy(&cv_);
227 if (err2 != 0) {
228 ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
229 }
230 }
231
Wait(KernelTimeout t)232 bool Waiter::Wait(KernelTimeout t) {
233 struct timespec abs_timeout;
234 if (t.has_timeout()) {
235 abs_timeout = t.MakeAbsTimespec();
236 }
237
238 PthreadMutexHolder h(&mu_);
239 ++waiter_count_;
240 // Loop until we find a wakeup to consume or timeout.
241 // Note that, since the thread ticker is just reset, we don't need to check
242 // whether the thread is idle on the very first pass of the loop.
243 bool first_pass = true;
244 while (wakeup_count_ == 0) {
245 if (!first_pass) MaybeBecomeIdle();
246 // No wakeups available, time to wait.
247 if (!t.has_timeout()) {
248 const int err = pthread_cond_wait(&cv_, &mu_);
249 if (err != 0) {
250 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
251 }
252 } else {
253 const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
254 if (err == ETIMEDOUT) {
255 --waiter_count_;
256 return false;
257 }
258 if (err != 0) {
259 ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
260 }
261 }
262 first_pass = false;
263 }
264 // Consume a wakeup and we're done.
265 --wakeup_count_;
266 --waiter_count_;
267 return true;
268 }
269
Post()270 void Waiter::Post() {
271 PthreadMutexHolder h(&mu_);
272 ++wakeup_count_;
273 InternalCondVarPoke();
274 }
275
Poke()276 void Waiter::Poke() {
277 PthreadMutexHolder h(&mu_);
278 InternalCondVarPoke();
279 }
280
InternalCondVarPoke()281 void Waiter::InternalCondVarPoke() {
282 if (waiter_count_ != 0) {
283 const int err = pthread_cond_signal(&cv_);
284 if (ABSL_PREDICT_FALSE(err != 0)) {
285 ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
286 }
287 }
288 }
289
290 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
291
Waiter()292 Waiter::Waiter() {
293 if (sem_init(&sem_, 0, 0) != 0) {
294 ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
295 }
296 wakeups_.store(0, std::memory_order_relaxed);
297 }
298
~Waiter()299 Waiter::~Waiter() {
300 if (sem_destroy(&sem_) != 0) {
301 ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
302 }
303 }
304
Wait(KernelTimeout t)305 bool Waiter::Wait(KernelTimeout t) {
306 struct timespec abs_timeout;
307 if (t.has_timeout()) {
308 abs_timeout = t.MakeAbsTimespec();
309 }
310
311 // Loop until we timeout or consume a wakeup.
312 // Note that, since the thread ticker is just reset, we don't need to check
313 // whether the thread is idle on the very first pass of the loop.
314 bool first_pass = true;
315 while (true) {
316 int x = wakeups_.load(std::memory_order_relaxed);
317 while (x != 0) {
318 if (!wakeups_.compare_exchange_weak(x, x - 1,
319 std::memory_order_acquire,
320 std::memory_order_relaxed)) {
321 continue; // Raced with someone, retry.
322 }
323 // Successfully consumed a wakeup, we're done.
324 return true;
325 }
326
327 if (!first_pass) MaybeBecomeIdle();
328 // Nothing to consume, wait (looping on EINTR).
329 while (true) {
330 if (!t.has_timeout()) {
331 if (sem_wait(&sem_) == 0) break;
332 if (errno == EINTR) continue;
333 ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
334 } else {
335 if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
336 if (errno == EINTR) continue;
337 if (errno == ETIMEDOUT) return false;
338 ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
339 }
340 }
341 first_pass = false;
342 }
343 }
344
Post()345 void Waiter::Post() {
346 // Post a wakeup.
347 if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
348 // We incremented from 0, need to wake a potential waiter.
349 Poke();
350 }
351 }
352
Poke()353 void Waiter::Poke() {
354 if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
355 ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
356 }
357 }
358
359 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
360
361 class Waiter::WinHelper {
362 public:
GetLock(Waiter * w)363 static SRWLOCK *GetLock(Waiter *w) {
364 return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
365 }
366
GetCond(Waiter * w)367 static CONDITION_VARIABLE *GetCond(Waiter *w) {
368 return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
369 }
370
371 static_assert(sizeof(SRWLOCK) == sizeof(void *),
372 "`mu_storage_` does not have the same size as SRWLOCK");
373 static_assert(alignof(SRWLOCK) == alignof(void *),
374 "`mu_storage_` does not have the same alignment as SRWLOCK");
375
376 static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
377 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
378 "as `CONDITION_VARIABLE`");
379 static_assert(
380 alignof(CONDITION_VARIABLE) == alignof(void *),
381 "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
382
383 // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
384 // and destructible because we never call their constructors or destructors.
385 static_assert(std::is_trivially_constructible<SRWLOCK>::value,
386 "The `SRWLOCK` type must be trivially constructible");
387 static_assert(
388 std::is_trivially_constructible<CONDITION_VARIABLE>::value,
389 "The `CONDITION_VARIABLE` type must be trivially constructible");
390 static_assert(std::is_trivially_destructible<SRWLOCK>::value,
391 "The `SRWLOCK` type must be trivially destructible");
392 static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
393 "The `CONDITION_VARIABLE` type must be trivially destructible");
394 };
395
396 class LockHolder {
397 public:
LockHolder(SRWLOCK * mu)398 explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
399 AcquireSRWLockExclusive(mu_);
400 }
401
402 LockHolder(const LockHolder&) = delete;
403 LockHolder& operator=(const LockHolder&) = delete;
404
~LockHolder()405 ~LockHolder() {
406 ReleaseSRWLockExclusive(mu_);
407 }
408
409 private:
410 SRWLOCK* mu_;
411 };
412
Waiter()413 Waiter::Waiter() {
414 auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
415 auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
416 InitializeSRWLock(mu);
417 InitializeConditionVariable(cv);
418 waiter_count_ = 0;
419 wakeup_count_ = 0;
420 }
421
422 // SRW locks and condition variables do not need to be explicitly destroyed.
423 // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
424 // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
425 Waiter::~Waiter() = default;
426
Wait(KernelTimeout t)427 bool Waiter::Wait(KernelTimeout t) {
428 SRWLOCK *mu = WinHelper::GetLock(this);
429 CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
430
431 LockHolder h(mu);
432 ++waiter_count_;
433
434 // Loop until we find a wakeup to consume or timeout.
435 // Note that, since the thread ticker is just reset, we don't need to check
436 // whether the thread is idle on the very first pass of the loop.
437 bool first_pass = true;
438 while (wakeup_count_ == 0) {
439 if (!first_pass) MaybeBecomeIdle();
440 // No wakeups available, time to wait.
441 if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
442 // GetLastError() returns a Win32 DWORD, but we assign to
443 // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
444 // initialization guarantees this is not a narrowing conversion.
445 const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
446 if (err == ERROR_TIMEOUT) {
447 --waiter_count_;
448 return false;
449 } else {
450 ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
451 }
452 }
453 first_pass = false;
454 }
455 // Consume a wakeup and we're done.
456 --wakeup_count_;
457 --waiter_count_;
458 return true;
459 }
460
Post()461 void Waiter::Post() {
462 LockHolder h(WinHelper::GetLock(this));
463 ++wakeup_count_;
464 InternalCondVarPoke();
465 }
466
Poke()467 void Waiter::Poke() {
468 LockHolder h(WinHelper::GetLock(this));
469 InternalCondVarPoke();
470 }
471
InternalCondVarPoke()472 void Waiter::InternalCondVarPoke() {
473 if (waiter_count_ != 0) {
474 WakeConditionVariable(WinHelper::GetCond(this));
475 }
476 }
477
478 #else
479 #error Unknown ABSL_WAITER_MODE
480 #endif
481
482 } // namespace synchronization_internal
483 ABSL_NAMESPACE_END
484 } // namespace absl
485