1 #include "Python.h"
2
3 #include "pycore_llist.h"
4 #include "pycore_lock.h" // _PyRawMutex
5 #include "pycore_parking_lot.h"
6 #include "pycore_pyerrors.h" // _Py_FatalErrorFormat
7 #include "pycore_pystate.h" // _PyThreadState_GET
8 #include "pycore_semaphore.h" // _PySemaphore
9 #include "pycore_time.h" // _PyTime_Add()
10
11 #include <stdbool.h>
12
13
14 typedef struct {
15 // The mutex protects the waiter queue and the num_waiters counter.
16 _PyRawMutex mutex;
17
18 // Linked list of `struct wait_entry` waiters in this bucket.
19 struct llist_node root;
20 size_t num_waiters;
21 } Bucket;
22
23 struct wait_entry {
24 void *park_arg;
25 uintptr_t addr;
26 _PySemaphore sema;
27 struct llist_node node;
28 bool is_unparking;
29 };
30
31 // Prime number to avoid correlations with memory addresses.
32 // We want this to be roughly proportional to the number of CPU cores
33 // to minimize contention on the bucket locks, but not too big to avoid
34 // wasting memory. The exact choice does not matter much.
35 #define NUM_BUCKETS 257
36
37 #define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) }
38 #define BUCKET_INIT_2(b, i) BUCKET_INIT(b, i), BUCKET_INIT(b, i+1)
39 #define BUCKET_INIT_4(b, i) BUCKET_INIT_2(b, i), BUCKET_INIT_2(b, i+2)
40 #define BUCKET_INIT_8(b, i) BUCKET_INIT_4(b, i), BUCKET_INIT_4(b, i+4)
41 #define BUCKET_INIT_16(b, i) BUCKET_INIT_8(b, i), BUCKET_INIT_8(b, i+8)
42 #define BUCKET_INIT_32(b, i) BUCKET_INIT_16(b, i), BUCKET_INIT_16(b, i+16)
43 #define BUCKET_INIT_64(b, i) BUCKET_INIT_32(b, i), BUCKET_INIT_32(b, i+32)
44 #define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i), BUCKET_INIT_64(b, i+64)
45 #define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128)
46
47 // Table of waiters (hashed by address)
48 static Bucket buckets[NUM_BUCKETS] = {
49 BUCKET_INIT_256(buckets, 0),
50 BUCKET_INIT(buckets, 256),
51 };
52
53 void
_PySemaphore_Init(_PySemaphore * sema)54 _PySemaphore_Init(_PySemaphore *sema)
55 {
56 #if defined(MS_WINDOWS)
57 sema->platform_sem = CreateSemaphore(
58 NULL, // attributes
59 0, // initial count
60 10, // maximum count
61 NULL // unnamed
62 );
63 if (!sema->platform_sem) {
64 Py_FatalError("parking_lot: CreateSemaphore failed");
65 }
66 #elif defined(_Py_USE_SEMAPHORES)
67 if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) {
68 Py_FatalError("parking_lot: sem_init failed");
69 }
70 #else
71 if (pthread_mutex_init(&sema->mutex, NULL) != 0) {
72 Py_FatalError("parking_lot: pthread_mutex_init failed");
73 }
74 if (pthread_cond_init(&sema->cond, NULL)) {
75 Py_FatalError("parking_lot: pthread_cond_init failed");
76 }
77 sema->counter = 0;
78 #endif
79 }
80
81 void
_PySemaphore_Destroy(_PySemaphore * sema)82 _PySemaphore_Destroy(_PySemaphore *sema)
83 {
84 #if defined(MS_WINDOWS)
85 CloseHandle(sema->platform_sem);
86 #elif defined(_Py_USE_SEMAPHORES)
87 sem_destroy(&sema->platform_sem);
88 #else
89 pthread_mutex_destroy(&sema->mutex);
90 pthread_cond_destroy(&sema->cond);
91 #endif
92 }
93
94 static int
_PySemaphore_PlatformWait(_PySemaphore * sema,PyTime_t timeout)95 _PySemaphore_PlatformWait(_PySemaphore *sema, PyTime_t timeout)
96 {
97 int res;
98 #if defined(MS_WINDOWS)
99 DWORD wait;
100 DWORD millis = 0;
101 if (timeout < 0) {
102 millis = INFINITE;
103 }
104 else {
105 PyTime_t div = _PyTime_AsMilliseconds(timeout, _PyTime_ROUND_TIMEOUT);
106 // Prevent overflow with clamping the result
107 if ((PyTime_t)PY_DWORD_MAX < div) {
108 millis = PY_DWORD_MAX;
109 }
110 else {
111 millis = (DWORD) div;
112 }
113 }
114 wait = WaitForSingleObjectEx(sema->platform_sem, millis, FALSE);
115 if (wait == WAIT_OBJECT_0) {
116 res = Py_PARK_OK;
117 }
118 else if (wait == WAIT_TIMEOUT) {
119 res = Py_PARK_TIMEOUT;
120 }
121 else {
122 res = Py_PARK_INTR;
123 }
124 #elif defined(_Py_USE_SEMAPHORES)
125 int err;
126 if (timeout >= 0) {
127 struct timespec ts;
128
129 #if defined(CLOCK_MONOTONIC) && defined(HAVE_SEM_CLOCKWAIT) && !defined(_Py_THREAD_SANITIZER)
130 PyTime_t now;
131 // silently ignore error: cannot report error to the caller
132 (void)PyTime_MonotonicRaw(&now);
133 PyTime_t deadline = _PyTime_Add(now, timeout);
134 _PyTime_AsTimespec_clamp(deadline, &ts);
135
136 err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts);
137 #else
138 PyTime_t now;
139 // silently ignore error: cannot report error to the caller
140 (void)PyTime_TimeRaw(&now);
141 PyTime_t deadline = _PyTime_Add(now, timeout);
142
143 _PyTime_AsTimespec_clamp(deadline, &ts);
144
145 err = sem_timedwait(&sema->platform_sem, &ts);
146 #endif
147 }
148 else {
149 err = sem_wait(&sema->platform_sem);
150 }
151 if (err == -1) {
152 err = errno;
153 if (err == EINTR) {
154 res = Py_PARK_INTR;
155 }
156 else if (err == ETIMEDOUT) {
157 res = Py_PARK_TIMEOUT;
158 }
159 else {
160 _Py_FatalErrorFormat(__func__,
161 "unexpected error from semaphore: %d",
162 err);
163 }
164 }
165 else {
166 res = Py_PARK_OK;
167 }
168 #else
169 pthread_mutex_lock(&sema->mutex);
170 int err = 0;
171 if (sema->counter == 0) {
172 if (timeout >= 0) {
173 struct timespec ts;
174 #if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
175 _PyTime_AsTimespec_clamp(timeout, &ts);
176 err = pthread_cond_timedwait_relative_np(&sema->cond, &sema->mutex, &ts);
177 #else
178 PyTime_t now;
179 (void)PyTime_TimeRaw(&now);
180 PyTime_t deadline = _PyTime_Add(now, timeout);
181 _PyTime_AsTimespec_clamp(deadline, &ts);
182
183 err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts);
184 #endif // HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP
185 }
186 else {
187 err = pthread_cond_wait(&sema->cond, &sema->mutex);
188 }
189 }
190 if (sema->counter > 0) {
191 sema->counter--;
192 res = Py_PARK_OK;
193 }
194 else if (err) {
195 res = Py_PARK_TIMEOUT;
196 }
197 else {
198 res = Py_PARK_INTR;
199 }
200 pthread_mutex_unlock(&sema->mutex);
201 #endif
202 return res;
203 }
204
205 int
_PySemaphore_Wait(_PySemaphore * sema,PyTime_t timeout,int detach)206 _PySemaphore_Wait(_PySemaphore *sema, PyTime_t timeout, int detach)
207 {
208 PyThreadState *tstate = NULL;
209 if (detach) {
210 tstate = _PyThreadState_GET();
211 if (tstate && _Py_atomic_load_int_relaxed(&tstate->state) ==
212 _Py_THREAD_ATTACHED) {
213 // Only detach if we are attached
214 PyEval_ReleaseThread(tstate);
215 }
216 else {
217 tstate = NULL;
218 }
219 }
220 int res = _PySemaphore_PlatformWait(sema, timeout);
221 if (tstate) {
222 PyEval_AcquireThread(tstate);
223 }
224 return res;
225 }
226
227 void
_PySemaphore_Wakeup(_PySemaphore * sema)228 _PySemaphore_Wakeup(_PySemaphore *sema)
229 {
230 #if defined(MS_WINDOWS)
231 if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) {
232 Py_FatalError("parking_lot: ReleaseSemaphore failed");
233 }
234 #elif defined(_Py_USE_SEMAPHORES)
235 int err = sem_post(&sema->platform_sem);
236 if (err != 0) {
237 Py_FatalError("parking_lot: sem_post failed");
238 }
239 #else
240 pthread_mutex_lock(&sema->mutex);
241 sema->counter++;
242 pthread_cond_signal(&sema->cond);
243 pthread_mutex_unlock(&sema->mutex);
244 #endif
245 }
246
247 static void
enqueue(Bucket * bucket,const void * address,struct wait_entry * wait)248 enqueue(Bucket *bucket, const void *address, struct wait_entry *wait)
249 {
250 llist_insert_tail(&bucket->root, &wait->node);
251 ++bucket->num_waiters;
252 }
253
254 static struct wait_entry *
dequeue(Bucket * bucket,const void * address)255 dequeue(Bucket *bucket, const void *address)
256 {
257 // find the first waiter that is waiting on `address`
258 struct llist_node *root = &bucket->root;
259 struct llist_node *node;
260 llist_for_each(node, root) {
261 struct wait_entry *wait = llist_data(node, struct wait_entry, node);
262 if (wait->addr == (uintptr_t)address) {
263 llist_remove(node);
264 --bucket->num_waiters;
265 wait->is_unparking = true;
266 return wait;
267 }
268 }
269 return NULL;
270 }
271
272 static void
dequeue_all(Bucket * bucket,const void * address,struct llist_node * dst)273 dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst)
274 {
275 // remove and append all matching waiters to dst
276 struct llist_node *root = &bucket->root;
277 struct llist_node *node;
278 llist_for_each_safe(node, root) {
279 struct wait_entry *wait = llist_data(node, struct wait_entry, node);
280 if (wait->addr == (uintptr_t)address) {
281 llist_remove(node);
282 llist_insert_tail(dst, node);
283 --bucket->num_waiters;
284 wait->is_unparking = true;
285 }
286 }
287 }
288
289 // Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes)
290 static int
atomic_memcmp(const void * addr,const void * expected,size_t addr_size)291 atomic_memcmp(const void *addr, const void *expected, size_t addr_size)
292 {
293 switch (addr_size) {
294 case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected;
295 case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected;
296 case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected;
297 case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected;
298 default: Py_UNREACHABLE();
299 }
300 }
301
302 int
_PyParkingLot_Park(const void * addr,const void * expected,size_t size,PyTime_t timeout_ns,void * park_arg,int detach)303 _PyParkingLot_Park(const void *addr, const void *expected, size_t size,
304 PyTime_t timeout_ns, void *park_arg, int detach)
305 {
306 struct wait_entry wait = {
307 .park_arg = park_arg,
308 .addr = (uintptr_t)addr,
309 .is_unparking = false,
310 };
311
312 Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
313
314 _PyRawMutex_Lock(&bucket->mutex);
315 if (!atomic_memcmp(addr, expected, size)) {
316 _PyRawMutex_Unlock(&bucket->mutex);
317 return Py_PARK_AGAIN;
318 }
319 _PySemaphore_Init(&wait.sema);
320 enqueue(bucket, addr, &wait);
321 _PyRawMutex_Unlock(&bucket->mutex);
322
323 int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach);
324 if (res == Py_PARK_OK) {
325 goto done;
326 }
327
328 // timeout or interrupt
329 _PyRawMutex_Lock(&bucket->mutex);
330 if (wait.is_unparking) {
331 _PyRawMutex_Unlock(&bucket->mutex);
332 // Another thread has started to unpark us. Wait until we process the
333 // wakeup signal.
334 do {
335 res = _PySemaphore_Wait(&wait.sema, -1, detach);
336 } while (res != Py_PARK_OK);
337 goto done;
338 }
339 else {
340 llist_remove(&wait.node);
341 --bucket->num_waiters;
342 }
343 _PyRawMutex_Unlock(&bucket->mutex);
344
345 done:
346 _PySemaphore_Destroy(&wait.sema);
347 return res;
348
349 }
350
351 void
_PyParkingLot_Unpark(const void * addr,_Py_unpark_fn_t * fn,void * arg)352 _PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg)
353 {
354 Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
355
356 // Find the first waiter that is waiting on `addr`
357 _PyRawMutex_Lock(&bucket->mutex);
358 struct wait_entry *waiter = dequeue(bucket, addr);
359 if (waiter) {
360 int has_more_waiters = (bucket->num_waiters > 0);
361 fn(arg, waiter->park_arg, has_more_waiters);
362 }
363 else {
364 fn(arg, NULL, 0);
365 }
366 _PyRawMutex_Unlock(&bucket->mutex);
367
368 if (waiter) {
369 // Wakeup the waiter outside of the bucket lock
370 _PySemaphore_Wakeup(&waiter->sema);
371 }
372 }
373
374 void
_PyParkingLot_UnparkAll(const void * addr)375 _PyParkingLot_UnparkAll(const void *addr)
376 {
377 struct llist_node head = LLIST_INIT(head);
378 Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
379
380 _PyRawMutex_Lock(&bucket->mutex);
381 dequeue_all(bucket, addr, &head);
382 _PyRawMutex_Unlock(&bucket->mutex);
383
384 struct llist_node *node;
385 llist_for_each_safe(node, &head) {
386 struct wait_entry *waiter = llist_data(node, struct wait_entry, node);
387 llist_remove(node);
388 _PySemaphore_Wakeup(&waiter->sema);
389 }
390 }
391
392 void
_PyParkingLot_AfterFork(void)393 _PyParkingLot_AfterFork(void)
394 {
395 // After a fork only one thread remains. That thread cannot be blocked
396 // so all entries in the parking lot are for dead threads.
397 memset(buckets, 0, sizeof(buckets));
398 for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) {
399 llist_init(&buckets[i].root);
400 }
401 }
402