• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "Python.h"
2 
3 #include "pycore_llist.h"
4 #include "pycore_lock.h"          // _PyRawMutex
5 #include "pycore_parking_lot.h"
6 #include "pycore_pyerrors.h"      // _Py_FatalErrorFormat
7 #include "pycore_pystate.h"       // _PyThreadState_GET
8 #include "pycore_semaphore.h"     // _PySemaphore
9 #include "pycore_time.h"          // _PyTime_Add()
10 
11 #include <stdbool.h>
12 
13 
14 typedef struct {
15     // The mutex protects the waiter queue and the num_waiters counter.
16     _PyRawMutex mutex;
17 
18     // Linked list of `struct wait_entry` waiters in this bucket.
19     struct llist_node root;
20     size_t num_waiters;
21 } Bucket;
22 
23 struct wait_entry {
24     void *park_arg;
25     uintptr_t addr;
26     _PySemaphore sema;
27     struct llist_node node;
28     bool is_unparking;
29 };
30 
31 // Prime number to avoid correlations with memory addresses.
32 // We want this to be roughly proportional to the number of CPU cores
33 // to minimize contention on the bucket locks, but not too big to avoid
34 // wasting memory. The exact choice does not matter much.
35 #define NUM_BUCKETS 257
36 
37 #define BUCKET_INIT(b, i) [i] = { .root = LLIST_INIT(b[i].root) }
38 #define BUCKET_INIT_2(b, i)   BUCKET_INIT(b, i),     BUCKET_INIT(b, i+1)
39 #define BUCKET_INIT_4(b, i)   BUCKET_INIT_2(b, i),   BUCKET_INIT_2(b, i+2)
40 #define BUCKET_INIT_8(b, i)   BUCKET_INIT_4(b, i),   BUCKET_INIT_4(b, i+4)
41 #define BUCKET_INIT_16(b, i)  BUCKET_INIT_8(b, i),   BUCKET_INIT_8(b, i+8)
42 #define BUCKET_INIT_32(b, i)  BUCKET_INIT_16(b, i),  BUCKET_INIT_16(b, i+16)
43 #define BUCKET_INIT_64(b, i)  BUCKET_INIT_32(b, i),  BUCKET_INIT_32(b, i+32)
44 #define BUCKET_INIT_128(b, i) BUCKET_INIT_64(b, i),  BUCKET_INIT_64(b, i+64)
45 #define BUCKET_INIT_256(b, i) BUCKET_INIT_128(b, i), BUCKET_INIT_128(b, i+128)
46 
47 // Table of waiters (hashed by address)
48 static Bucket buckets[NUM_BUCKETS] = {
49     BUCKET_INIT_256(buckets, 0),
50     BUCKET_INIT(buckets, 256),
51 };
52 
53 void
_PySemaphore_Init(_PySemaphore * sema)54 _PySemaphore_Init(_PySemaphore *sema)
55 {
56 #if defined(MS_WINDOWS)
57     sema->platform_sem = CreateSemaphore(
58         NULL,   //  attributes
59         0,      //  initial count
60         10,     //  maximum count
61         NULL    //  unnamed
62     );
63     if (!sema->platform_sem) {
64         Py_FatalError("parking_lot: CreateSemaphore failed");
65     }
66 #elif defined(_Py_USE_SEMAPHORES)
67     if (sem_init(&sema->platform_sem, /*pshared=*/0, /*value=*/0) < 0) {
68         Py_FatalError("parking_lot: sem_init failed");
69     }
70 #else
71     if (pthread_mutex_init(&sema->mutex, NULL) != 0) {
72         Py_FatalError("parking_lot: pthread_mutex_init failed");
73     }
74     if (pthread_cond_init(&sema->cond, NULL)) {
75         Py_FatalError("parking_lot: pthread_cond_init failed");
76     }
77     sema->counter = 0;
78 #endif
79 }
80 
81 void
_PySemaphore_Destroy(_PySemaphore * sema)82 _PySemaphore_Destroy(_PySemaphore *sema)
83 {
84 #if defined(MS_WINDOWS)
85     CloseHandle(sema->platform_sem);
86 #elif defined(_Py_USE_SEMAPHORES)
87     sem_destroy(&sema->platform_sem);
88 #else
89     pthread_mutex_destroy(&sema->mutex);
90     pthread_cond_destroy(&sema->cond);
91 #endif
92 }
93 
94 static int
_PySemaphore_PlatformWait(_PySemaphore * sema,PyTime_t timeout)95 _PySemaphore_PlatformWait(_PySemaphore *sema, PyTime_t timeout)
96 {
97     int res;
98 #if defined(MS_WINDOWS)
99     DWORD wait;
100     DWORD millis = 0;
101     if (timeout < 0) {
102         millis = INFINITE;
103     }
104     else {
105         PyTime_t div = _PyTime_AsMilliseconds(timeout, _PyTime_ROUND_TIMEOUT);
106         // Prevent overflow with clamping the result
107         if ((PyTime_t)PY_DWORD_MAX < div) {
108             millis = PY_DWORD_MAX;
109         }
110         else {
111             millis = (DWORD) div;
112         }
113     }
114     wait = WaitForSingleObjectEx(sema->platform_sem, millis, FALSE);
115     if (wait == WAIT_OBJECT_0) {
116         res = Py_PARK_OK;
117     }
118     else if (wait == WAIT_TIMEOUT) {
119         res = Py_PARK_TIMEOUT;
120     }
121     else {
122         res = Py_PARK_INTR;
123     }
124 #elif defined(_Py_USE_SEMAPHORES)
125     int err;
126     if (timeout >= 0) {
127         struct timespec ts;
128 
129 #if defined(CLOCK_MONOTONIC) && defined(HAVE_SEM_CLOCKWAIT) && !defined(_Py_THREAD_SANITIZER)
130         PyTime_t now;
131         // silently ignore error: cannot report error to the caller
132         (void)PyTime_MonotonicRaw(&now);
133         PyTime_t deadline = _PyTime_Add(now, timeout);
134         _PyTime_AsTimespec_clamp(deadline, &ts);
135 
136         err = sem_clockwait(&sema->platform_sem, CLOCK_MONOTONIC, &ts);
137 #else
138         PyTime_t now;
139         // silently ignore error: cannot report error to the caller
140         (void)PyTime_TimeRaw(&now);
141         PyTime_t deadline = _PyTime_Add(now, timeout);
142 
143         _PyTime_AsTimespec_clamp(deadline, &ts);
144 
145         err = sem_timedwait(&sema->platform_sem, &ts);
146 #endif
147     }
148     else {
149         err = sem_wait(&sema->platform_sem);
150     }
151     if (err == -1) {
152         err = errno;
153         if (err == EINTR) {
154             res = Py_PARK_INTR;
155         }
156         else if (err == ETIMEDOUT) {
157             res = Py_PARK_TIMEOUT;
158         }
159         else {
160             _Py_FatalErrorFormat(__func__,
161                 "unexpected error from semaphore: %d",
162                 err);
163         }
164     }
165     else {
166         res = Py_PARK_OK;
167     }
168 #else
169     pthread_mutex_lock(&sema->mutex);
170     int err = 0;
171     if (sema->counter == 0) {
172         if (timeout >= 0) {
173             struct timespec ts;
174 #if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP)
175             _PyTime_AsTimespec_clamp(timeout, &ts);
176             err = pthread_cond_timedwait_relative_np(&sema->cond, &sema->mutex, &ts);
177 #else
178             PyTime_t now;
179             (void)PyTime_TimeRaw(&now);
180             PyTime_t deadline = _PyTime_Add(now, timeout);
181             _PyTime_AsTimespec_clamp(deadline, &ts);
182 
183             err = pthread_cond_timedwait(&sema->cond, &sema->mutex, &ts);
184 #endif // HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE_NP
185         }
186         else {
187             err = pthread_cond_wait(&sema->cond, &sema->mutex);
188         }
189     }
190     if (sema->counter > 0) {
191         sema->counter--;
192         res = Py_PARK_OK;
193     }
194     else if (err) {
195         res = Py_PARK_TIMEOUT;
196     }
197     else {
198         res = Py_PARK_INTR;
199     }
200     pthread_mutex_unlock(&sema->mutex);
201 #endif
202     return res;
203 }
204 
205 int
_PySemaphore_Wait(_PySemaphore * sema,PyTime_t timeout,int detach)206 _PySemaphore_Wait(_PySemaphore *sema, PyTime_t timeout, int detach)
207 {
208     PyThreadState *tstate = NULL;
209     if (detach) {
210         tstate = _PyThreadState_GET();
211         if (tstate && _Py_atomic_load_int_relaxed(&tstate->state) ==
212                           _Py_THREAD_ATTACHED) {
213             // Only detach if we are attached
214             PyEval_ReleaseThread(tstate);
215         }
216         else {
217             tstate = NULL;
218         }
219     }
220     int res = _PySemaphore_PlatformWait(sema, timeout);
221     if (tstate) {
222         PyEval_AcquireThread(tstate);
223     }
224     return res;
225 }
226 
227 void
_PySemaphore_Wakeup(_PySemaphore * sema)228 _PySemaphore_Wakeup(_PySemaphore *sema)
229 {
230 #if defined(MS_WINDOWS)
231     if (!ReleaseSemaphore(sema->platform_sem, 1, NULL)) {
232         Py_FatalError("parking_lot: ReleaseSemaphore failed");
233     }
234 #elif defined(_Py_USE_SEMAPHORES)
235     int err = sem_post(&sema->platform_sem);
236     if (err != 0) {
237         Py_FatalError("parking_lot: sem_post failed");
238     }
239 #else
240     pthread_mutex_lock(&sema->mutex);
241     sema->counter++;
242     pthread_cond_signal(&sema->cond);
243     pthread_mutex_unlock(&sema->mutex);
244 #endif
245 }
246 
247 static void
enqueue(Bucket * bucket,const void * address,struct wait_entry * wait)248 enqueue(Bucket *bucket, const void *address, struct wait_entry *wait)
249 {
250     llist_insert_tail(&bucket->root, &wait->node);
251     ++bucket->num_waiters;
252 }
253 
254 static struct wait_entry *
dequeue(Bucket * bucket,const void * address)255 dequeue(Bucket *bucket, const void *address)
256 {
257     // find the first waiter that is waiting on `address`
258     struct llist_node *root = &bucket->root;
259     struct llist_node *node;
260     llist_for_each(node, root) {
261         struct wait_entry *wait = llist_data(node, struct wait_entry, node);
262         if (wait->addr == (uintptr_t)address) {
263             llist_remove(node);
264             --bucket->num_waiters;
265             wait->is_unparking = true;
266             return wait;
267         }
268     }
269     return NULL;
270 }
271 
272 static void
dequeue_all(Bucket * bucket,const void * address,struct llist_node * dst)273 dequeue_all(Bucket *bucket, const void *address, struct llist_node *dst)
274 {
275     // remove and append all matching waiters to dst
276     struct llist_node *root = &bucket->root;
277     struct llist_node *node;
278     llist_for_each_safe(node, root) {
279         struct wait_entry *wait = llist_data(node, struct wait_entry, node);
280         if (wait->addr == (uintptr_t)address) {
281             llist_remove(node);
282             llist_insert_tail(dst, node);
283             --bucket->num_waiters;
284             wait->is_unparking = true;
285         }
286     }
287 }
288 
289 // Checks that `*addr == *expected` (only works for 1, 2, 4, or 8 bytes)
290 static int
atomic_memcmp(const void * addr,const void * expected,size_t addr_size)291 atomic_memcmp(const void *addr, const void *expected, size_t addr_size)
292 {
293     switch (addr_size) {
294     case 1: return _Py_atomic_load_uint8(addr) == *(const uint8_t *)expected;
295     case 2: return _Py_atomic_load_uint16(addr) == *(const uint16_t *)expected;
296     case 4: return _Py_atomic_load_uint32(addr) == *(const uint32_t *)expected;
297     case 8: return _Py_atomic_load_uint64(addr) == *(const uint64_t *)expected;
298     default: Py_UNREACHABLE();
299     }
300 }
301 
302 int
_PyParkingLot_Park(const void * addr,const void * expected,size_t size,PyTime_t timeout_ns,void * park_arg,int detach)303 _PyParkingLot_Park(const void *addr, const void *expected, size_t size,
304                    PyTime_t timeout_ns, void *park_arg, int detach)
305 {
306     struct wait_entry wait = {
307         .park_arg = park_arg,
308         .addr = (uintptr_t)addr,
309         .is_unparking = false,
310     };
311 
312     Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
313 
314     _PyRawMutex_Lock(&bucket->mutex);
315     if (!atomic_memcmp(addr, expected, size)) {
316         _PyRawMutex_Unlock(&bucket->mutex);
317         return Py_PARK_AGAIN;
318     }
319     _PySemaphore_Init(&wait.sema);
320     enqueue(bucket, addr, &wait);
321     _PyRawMutex_Unlock(&bucket->mutex);
322 
323     int res = _PySemaphore_Wait(&wait.sema, timeout_ns, detach);
324     if (res == Py_PARK_OK) {
325         goto done;
326     }
327 
328     // timeout or interrupt
329     _PyRawMutex_Lock(&bucket->mutex);
330     if (wait.is_unparking) {
331         _PyRawMutex_Unlock(&bucket->mutex);
332         // Another thread has started to unpark us. Wait until we process the
333         // wakeup signal.
334         do {
335             res = _PySemaphore_Wait(&wait.sema, -1, detach);
336         } while (res != Py_PARK_OK);
337         goto done;
338     }
339     else {
340         llist_remove(&wait.node);
341         --bucket->num_waiters;
342     }
343     _PyRawMutex_Unlock(&bucket->mutex);
344 
345 done:
346     _PySemaphore_Destroy(&wait.sema);
347     return res;
348 
349 }
350 
351 void
_PyParkingLot_Unpark(const void * addr,_Py_unpark_fn_t * fn,void * arg)352 _PyParkingLot_Unpark(const void *addr, _Py_unpark_fn_t *fn, void *arg)
353 {
354     Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
355 
356     // Find the first waiter that is waiting on `addr`
357     _PyRawMutex_Lock(&bucket->mutex);
358     struct wait_entry *waiter = dequeue(bucket, addr);
359     if (waiter) {
360         int has_more_waiters = (bucket->num_waiters > 0);
361         fn(arg, waiter->park_arg, has_more_waiters);
362     }
363     else {
364         fn(arg, NULL, 0);
365     }
366     _PyRawMutex_Unlock(&bucket->mutex);
367 
368     if (waiter) {
369         // Wakeup the waiter outside of the bucket lock
370         _PySemaphore_Wakeup(&waiter->sema);
371     }
372 }
373 
374 void
_PyParkingLot_UnparkAll(const void * addr)375 _PyParkingLot_UnparkAll(const void *addr)
376 {
377     struct llist_node head = LLIST_INIT(head);
378     Bucket *bucket = &buckets[((uintptr_t)addr) % NUM_BUCKETS];
379 
380     _PyRawMutex_Lock(&bucket->mutex);
381     dequeue_all(bucket, addr, &head);
382     _PyRawMutex_Unlock(&bucket->mutex);
383 
384     struct llist_node *node;
385     llist_for_each_safe(node, &head) {
386         struct wait_entry *waiter = llist_data(node, struct wait_entry, node);
387         llist_remove(node);
388         _PySemaphore_Wakeup(&waiter->sema);
389     }
390 }
391 
392 void
_PyParkingLot_AfterFork(void)393 _PyParkingLot_AfterFork(void)
394 {
395     // After a fork only one thread remains. That thread cannot be blocked
396     // so all entries in the parking lot are for dead threads.
397     memset(buckets, 0, sizeof(buckets));
398     for (Py_ssize_t i = 0; i < NUM_BUCKETS; i++) {
399         llist_init(&buckets[i].root);
400     }
401 }
402