1 // Copyright 2017 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "absl/synchronization/internal/waiter.h"
16
17 #include "absl/base/config.h"
18
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <pthread.h>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #endif
26
27 #ifdef __linux__
28 #include <linux/futex.h>
29 #include <sys/syscall.h>
30 #endif
31
32 #ifdef ABSL_HAVE_SEMAPHORE_H
33 #include <semaphore.h>
34 #endif
35
36 #include <errno.h>
37 #include <stdio.h>
38 #include <time.h>
39
40 #include <atomic>
41 #include <cassert>
42 #include <cstdint>
43 #include <new>
44 #include <type_traits>
45
46 #include "absl/base/internal/raw_logging.h"
47 #include "absl/base/internal/thread_identity.h"
48 #include "absl/base/optimization.h"
49 #include "absl/synchronization/internal/kernel_timeout.h"
50
51
52 namespace absl {
53 ABSL_NAMESPACE_BEGIN
54 namespace synchronization_internal {
55
MaybeBecomeIdle()56 static void MaybeBecomeIdle() {
57 base_internal::ThreadIdentity *identity =
58 base_internal::CurrentThreadIdentityIfPresent();
59 assert(identity != nullptr);
60 const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
61 const int ticker = identity->ticker.load(std::memory_order_relaxed);
62 const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
63 if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
64 identity->is_idle.store(true, std::memory_order_relaxed);
65 }
66 }
67
68 #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
69
Waiter()70 Waiter::Waiter() {
71 futex_.store(0, std::memory_order_relaxed);
72 }
73
74 Waiter::~Waiter() = default;
75
Wait(KernelTimeout t)76 bool Waiter::Wait(KernelTimeout t) {
77 // Loop until we can atomically decrement futex from a positive
78 // value, waiting on a futex while we believe it is zero.
79 // Note that, since the thread ticker is just reset, we don't need to check
80 // whether the thread is idle on the very first pass of the loop.
81 bool first_pass = true;
82
83 while (true) {
84 int32_t x = futex_.load(std::memory_order_relaxed);
85 while (x != 0) {
86 if (!futex_.compare_exchange_weak(x, x - 1,
87 std::memory_order_acquire,
88 std::memory_order_relaxed)) {
89 continue; // Raced with someone, retry.
90 }
91 return true; // Consumed a wakeup, we are done.
92 }
93
94 if (!first_pass) MaybeBecomeIdle();
95 const int err = Futex::WaitUntil(&futex_, 0, t);
96 if (err != 0) {
97 if (err == -EINTR || err == -EWOULDBLOCK) {
98 // Do nothing, the loop will retry.
99 } else if (err == -ETIMEDOUT) {
100 return false;
101 } else {
102 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
103 }
104 }
105 first_pass = false;
106 }
107 }
108
Post()109 void Waiter::Post() {
110 if (futex_.fetch_add(1, std::memory_order_release) == 0) {
111 // We incremented from 0, need to wake a potential waiter.
112 Poke();
113 }
114 }
115
Poke()116 void Waiter::Poke() {
117 // Wake one thread waiting on the futex.
118 const int err = Futex::Wake(&futex_, 1);
119 if (ABSL_PREDICT_FALSE(err < 0)) {
120 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
121 }
122 }
123
124 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
125
126 class PthreadMutexHolder {
127 public:
PthreadMutexHolder(pthread_mutex_t * mu)128 explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
129 const int err = pthread_mutex_lock(mu_);
130 if (err != 0) {
131 ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
132 }
133 }
134
135 PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
136 PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
137
~PthreadMutexHolder()138 ~PthreadMutexHolder() {
139 const int err = pthread_mutex_unlock(mu_);
140 if (err != 0) {
141 ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
142 }
143 }
144
145 private:
146 pthread_mutex_t *mu_;
147 };
148
Waiter()149 Waiter::Waiter() {
150 const int err = pthread_mutex_init(&mu_, 0);
151 if (err != 0) {
152 ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
153 }
154
155 const int err2 = pthread_cond_init(&cv_, 0);
156 if (err2 != 0) {
157 ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
158 }
159
160 waiter_count_ = 0;
161 wakeup_count_ = 0;
162 }
163
~Waiter()164 Waiter::~Waiter() {
165 const int err = pthread_mutex_destroy(&mu_);
166 if (err != 0) {
167 ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
168 }
169
170 const int err2 = pthread_cond_destroy(&cv_);
171 if (err2 != 0) {
172 ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
173 }
174 }
175
Wait(KernelTimeout t)176 bool Waiter::Wait(KernelTimeout t) {
177 struct timespec abs_timeout;
178 if (t.has_timeout()) {
179 abs_timeout = t.MakeAbsTimespec();
180 }
181
182 PthreadMutexHolder h(&mu_);
183 ++waiter_count_;
184 // Loop until we find a wakeup to consume or timeout.
185 // Note that, since the thread ticker is just reset, we don't need to check
186 // whether the thread is idle on the very first pass of the loop.
187 bool first_pass = true;
188 while (wakeup_count_ == 0) {
189 if (!first_pass) MaybeBecomeIdle();
190 // No wakeups available, time to wait.
191 if (!t.has_timeout()) {
192 const int err = pthread_cond_wait(&cv_, &mu_);
193 if (err != 0) {
194 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
195 }
196 } else {
197 const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
198 if (err == ETIMEDOUT) {
199 --waiter_count_;
200 return false;
201 }
202 if (err != 0) {
203 ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
204 }
205 }
206 first_pass = false;
207 }
208 // Consume a wakeup and we're done.
209 --wakeup_count_;
210 --waiter_count_;
211 return true;
212 }
213
Post()214 void Waiter::Post() {
215 PthreadMutexHolder h(&mu_);
216 ++wakeup_count_;
217 InternalCondVarPoke();
218 }
219
Poke()220 void Waiter::Poke() {
221 PthreadMutexHolder h(&mu_);
222 InternalCondVarPoke();
223 }
224
InternalCondVarPoke()225 void Waiter::InternalCondVarPoke() {
226 if (waiter_count_ != 0) {
227 const int err = pthread_cond_signal(&cv_);
228 if (ABSL_PREDICT_FALSE(err != 0)) {
229 ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
230 }
231 }
232 }
233
234 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
235
Waiter()236 Waiter::Waiter() {
237 if (sem_init(&sem_, 0, 0) != 0) {
238 ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
239 }
240 wakeups_.store(0, std::memory_order_relaxed);
241 }
242
~Waiter()243 Waiter::~Waiter() {
244 if (sem_destroy(&sem_) != 0) {
245 ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
246 }
247 }
248
Wait(KernelTimeout t)249 bool Waiter::Wait(KernelTimeout t) {
250 struct timespec abs_timeout;
251 if (t.has_timeout()) {
252 abs_timeout = t.MakeAbsTimespec();
253 }
254
255 // Loop until we timeout or consume a wakeup.
256 // Note that, since the thread ticker is just reset, we don't need to check
257 // whether the thread is idle on the very first pass of the loop.
258 bool first_pass = true;
259 while (true) {
260 int x = wakeups_.load(std::memory_order_relaxed);
261 while (x != 0) {
262 if (!wakeups_.compare_exchange_weak(x, x - 1,
263 std::memory_order_acquire,
264 std::memory_order_relaxed)) {
265 continue; // Raced with someone, retry.
266 }
267 // Successfully consumed a wakeup, we're done.
268 return true;
269 }
270
271 if (!first_pass) MaybeBecomeIdle();
272 // Nothing to consume, wait (looping on EINTR).
273 while (true) {
274 if (!t.has_timeout()) {
275 if (sem_wait(&sem_) == 0) break;
276 if (errno == EINTR) continue;
277 ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
278 } else {
279 if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
280 if (errno == EINTR) continue;
281 if (errno == ETIMEDOUT) return false;
282 ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
283 }
284 }
285 first_pass = false;
286 }
287 }
288
Post()289 void Waiter::Post() {
290 // Post a wakeup.
291 if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
292 // We incremented from 0, need to wake a potential waiter.
293 Poke();
294 }
295 }
296
Poke()297 void Waiter::Poke() {
298 if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
299 ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
300 }
301 }
302
303 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
304
305 class Waiter::WinHelper {
306 public:
GetLock(Waiter * w)307 static SRWLOCK *GetLock(Waiter *w) {
308 return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
309 }
310
GetCond(Waiter * w)311 static CONDITION_VARIABLE *GetCond(Waiter *w) {
312 return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
313 }
314
315 static_assert(sizeof(SRWLOCK) == sizeof(void *),
316 "`mu_storage_` does not have the same size as SRWLOCK");
317 static_assert(alignof(SRWLOCK) == alignof(void *),
318 "`mu_storage_` does not have the same alignment as SRWLOCK");
319
320 static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
321 "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
322 "as `CONDITION_VARIABLE`");
323 static_assert(
324 alignof(CONDITION_VARIABLE) == alignof(void *),
325 "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
326
327 // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
328 // and destructible because we never call their constructors or destructors.
329 static_assert(std::is_trivially_constructible<SRWLOCK>::value,
330 "The `SRWLOCK` type must be trivially constructible");
331 static_assert(
332 std::is_trivially_constructible<CONDITION_VARIABLE>::value,
333 "The `CONDITION_VARIABLE` type must be trivially constructible");
334 static_assert(std::is_trivially_destructible<SRWLOCK>::value,
335 "The `SRWLOCK` type must be trivially destructible");
336 static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
337 "The `CONDITION_VARIABLE` type must be trivially destructible");
338 };
339
340 class LockHolder {
341 public:
LockHolder(SRWLOCK * mu)342 explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
343 AcquireSRWLockExclusive(mu_);
344 }
345
346 LockHolder(const LockHolder&) = delete;
347 LockHolder& operator=(const LockHolder&) = delete;
348
~LockHolder()349 ~LockHolder() {
350 ReleaseSRWLockExclusive(mu_);
351 }
352
353 private:
354 SRWLOCK* mu_;
355 };
356
Waiter()357 Waiter::Waiter() {
358 auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
359 auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
360 InitializeSRWLock(mu);
361 InitializeConditionVariable(cv);
362 waiter_count_ = 0;
363 wakeup_count_ = 0;
364 }
365
366 // SRW locks and condition variables do not need to be explicitly destroyed.
367 // https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
368 // https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
369 Waiter::~Waiter() = default;
370
Wait(KernelTimeout t)371 bool Waiter::Wait(KernelTimeout t) {
372 SRWLOCK *mu = WinHelper::GetLock(this);
373 CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
374
375 LockHolder h(mu);
376 ++waiter_count_;
377
378 // Loop until we find a wakeup to consume or timeout.
379 // Note that, since the thread ticker is just reset, we don't need to check
380 // whether the thread is idle on the very first pass of the loop.
381 bool first_pass = true;
382 while (wakeup_count_ == 0) {
383 if (!first_pass) MaybeBecomeIdle();
384 // No wakeups available, time to wait.
385 if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
386 // GetLastError() returns a Win32 DWORD, but we assign to
387 // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
388 // initialization guarantees this is not a narrowing conversion.
389 const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
390 if (err == ERROR_TIMEOUT) {
391 --waiter_count_;
392 return false;
393 } else {
394 ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
395 }
396 }
397 first_pass = false;
398 }
399 // Consume a wakeup and we're done.
400 --wakeup_count_;
401 --waiter_count_;
402 return true;
403 }
404
Post()405 void Waiter::Post() {
406 LockHolder h(WinHelper::GetLock(this));
407 ++wakeup_count_;
408 InternalCondVarPoke();
409 }
410
Poke()411 void Waiter::Poke() {
412 LockHolder h(WinHelper::GetLock(this));
413 InternalCondVarPoke();
414 }
415
InternalCondVarPoke()416 void Waiter::InternalCondVarPoke() {
417 if (waiter_count_ != 0) {
418 WakeConditionVariable(WinHelper::GetCond(this));
419 }
420 }
421
422 #else
423 #error Unknown ABSL_WAITER_MODE
424 #endif
425
426 } // namespace synchronization_internal
427 ABSL_NAMESPACE_END
428 } // namespace absl
429