• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * A type which wraps a semaphore
3  *
4  * semaphore.c
5  *
6  * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7  */
8 
9 #include "multiprocessing.h"
10 
11 enum { RECURSIVE_MUTEX, SEMAPHORE };
12 
13 typedef struct {
14     PyObject_HEAD
15     SEM_HANDLE handle;
16     long last_tid;
17     int count;
18     int maxvalue;
19     int kind;
20 } SemLockObject;
21 
22 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
23 
24 
25 #ifdef MS_WINDOWS
26 
27 /*
28  * Windows definitions
29  */
30 
31 #define SEM_FAILED NULL
32 
33 #define SEM_CLEAR_ERROR() SetLastError(0)
34 #define SEM_GET_LAST_ERROR() GetLastError()
35 #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
36 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
37 #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
38 #define SEM_UNLINK(name) 0
39 
40 static int
_GetSemaphoreValue(HANDLE handle,long * value)41 _GetSemaphoreValue(HANDLE handle, long *value)
42 {
43     long previous;
44 
45     switch (WaitForSingleObject(handle, 0)) {
46     case WAIT_OBJECT_0:
47         if (!ReleaseSemaphore(handle, 1, &previous))
48             return MP_STANDARD_ERROR;
49         *value = previous + 1;
50         return 0;
51     case WAIT_TIMEOUT:
52         *value = 0;
53         return 0;
54     default:
55         return MP_STANDARD_ERROR;
56     }
57 }
58 
59 static PyObject *
semlock_acquire(SemLockObject * self,PyObject * args,PyObject * kwds)60 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
61 {
62     int blocking = 1;
63     double timeout;
64     PyObject *timeout_obj = Py_None;
65     DWORD res, full_msecs, msecs, start, ticks;
66 
67     static char *kwlist[] = {"block", "timeout", NULL};
68 
69     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
70                                      &blocking, &timeout_obj))
71         return NULL;
72 
73     /* calculate timeout */
74     if (!blocking) {
75         full_msecs = 0;
76     } else if (timeout_obj == Py_None) {
77         full_msecs = INFINITE;
78     } else {
79         timeout = PyFloat_AsDouble(timeout_obj);
80         if (PyErr_Occurred())
81             return NULL;
82         timeout *= 1000.0;      /* convert to millisecs */
83         if (timeout < 0.0) {
84             timeout = 0.0;
85         } else if (timeout >= 0.5 * INFINITE) { /* 25 days */
86             PyErr_SetString(PyExc_OverflowError,
87                             "timeout is too large");
88             return NULL;
89         }
90         full_msecs = (DWORD)(timeout + 0.5);
91     }
92 
93     /* check whether we already own the lock */
94     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
95         ++self->count;
96         Py_RETURN_TRUE;
97     }
98 
99     /* check whether we can acquire without blocking */
100     if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
101         self->last_tid = GetCurrentThreadId();
102         ++self->count;
103         Py_RETURN_TRUE;
104     }
105 
106     msecs = full_msecs;
107     start = GetTickCount();
108 
109     for ( ; ; ) {
110         HANDLE handles[2] = {self->handle, sigint_event};
111 
112         /* do the wait */
113         Py_BEGIN_ALLOW_THREADS
114         ResetEvent(sigint_event);
115         res = WaitForMultipleObjects(2, handles, FALSE, msecs);
116         Py_END_ALLOW_THREADS
117 
118         /* handle result */
119         if (res != WAIT_OBJECT_0 + 1)
120             break;
121 
122         /* got SIGINT so give signal handler a chance to run */
123         Sleep(1);
124 
125         /* if this is main thread let KeyboardInterrupt be raised */
126         if (PyErr_CheckSignals())
127             return NULL;
128 
129         /* recalculate timeout */
130         if (msecs != INFINITE) {
131             ticks = GetTickCount();
132             if ((DWORD)(ticks - start) >= full_msecs)
133                 Py_RETURN_FALSE;
134             msecs = full_msecs - (ticks - start);
135         }
136     }
137 
138     /* handle result */
139     switch (res) {
140     case WAIT_TIMEOUT:
141         Py_RETURN_FALSE;
142     case WAIT_OBJECT_0:
143         self->last_tid = GetCurrentThreadId();
144         ++self->count;
145         Py_RETURN_TRUE;
146     case WAIT_FAILED:
147         return PyErr_SetFromWindowsErr(0);
148     default:
149         PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
150                      "WaitForMultipleObjects() gave unrecognized "
151                      "value %d", res);
152         return NULL;
153     }
154 }
155 
156 static PyObject *
semlock_release(SemLockObject * self,PyObject * args)157 semlock_release(SemLockObject *self, PyObject *args)
158 {
159     if (self->kind == RECURSIVE_MUTEX) {
160         if (!ISMINE(self)) {
161             PyErr_SetString(PyExc_AssertionError, "attempt to "
162                             "release recursive lock not owned "
163                             "by thread");
164             return NULL;
165         }
166         if (self->count > 1) {
167             --self->count;
168             Py_RETURN_NONE;
169         }
170         assert(self->count == 1);
171     }
172 
173     if (!ReleaseSemaphore(self->handle, 1, NULL)) {
174         if (GetLastError() == ERROR_TOO_MANY_POSTS) {
175             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
176                             "released too many times");
177             return NULL;
178         } else {
179             return PyErr_SetFromWindowsErr(0);
180         }
181     }
182 
183     --self->count;
184     Py_RETURN_NONE;
185 }
186 
187 #else /* !MS_WINDOWS */
188 
189 /*
190  * Unix definitions
191  */
192 
193 #define SEM_CLEAR_ERROR()
194 #define SEM_GET_LAST_ERROR() 0
195 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
196 #define SEM_CLOSE(sem) sem_close(sem)
197 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
198 #define SEM_UNLINK(name) sem_unlink(name)
199 
200 /* OS X 10.4 defines SEM_FAILED as -1 instead of (sem_t *)-1;  this gives
201    compiler warnings, and (potentially) undefined behaviour. */
202 #ifdef __APPLE__
203 #  undef SEM_FAILED
204 #  define SEM_FAILED ((sem_t *)-1)
205 #endif
206 
207 #ifndef HAVE_SEM_UNLINK
208 #  define sem_unlink(name) 0
209 #endif
210 
211 #ifndef HAVE_SEM_TIMEDWAIT
212 #  define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
213 
214 int
sem_timedwait_save(sem_t * sem,struct timespec * deadline,PyThreadState * _save)215 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
216 {
217     int res;
218     unsigned long delay, difference;
219     struct timeval now, tvdeadline, tvdelay;
220 
221     errno = 0;
222     tvdeadline.tv_sec = deadline->tv_sec;
223     tvdeadline.tv_usec = deadline->tv_nsec / 1000;
224 
225     for (delay = 0 ; ; delay += 1000) {
226         /* poll */
227         if (sem_trywait(sem) == 0)
228             return 0;
229         else if (errno != EAGAIN)
230             return MP_STANDARD_ERROR;
231 
232         /* get current time */
233         if (gettimeofday(&now, NULL) < 0)
234             return MP_STANDARD_ERROR;
235 
236         /* check for timeout */
237         if (tvdeadline.tv_sec < now.tv_sec ||
238             (tvdeadline.tv_sec == now.tv_sec &&
239              tvdeadline.tv_usec <= now.tv_usec)) {
240             errno = ETIMEDOUT;
241             return MP_STANDARD_ERROR;
242         }
243 
244         /* calculate how much time is left */
245         difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
246             (tvdeadline.tv_usec - now.tv_usec);
247 
248         /* check delay not too long -- maximum is 20 msecs */
249         if (delay > 20000)
250             delay = 20000;
251         if (delay > difference)
252             delay = difference;
253 
254         /* sleep */
255         tvdelay.tv_sec = delay / 1000000;
256         tvdelay.tv_usec = delay % 1000000;
257         if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
258             return MP_STANDARD_ERROR;
259 
260         /* check for signals */
261         Py_BLOCK_THREADS
262         res = PyErr_CheckSignals();
263         Py_UNBLOCK_THREADS
264 
265         if (res) {
266             errno = EINTR;
267             return MP_EXCEPTION_HAS_BEEN_SET;
268         }
269     }
270 }
271 
272 #endif /* !HAVE_SEM_TIMEDWAIT */
273 
274 static PyObject *
semlock_acquire(SemLockObject * self,PyObject * args,PyObject * kwds)275 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
276 {
277     int blocking = 1, res;
278     double timeout;
279     PyObject *timeout_obj = Py_None;
280     struct timespec deadline = {0};
281     struct timeval now;
282     long sec, nsec;
283 
284     static char *kwlist[] = {"block", "timeout", NULL};
285 
286     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
287                                      &blocking, &timeout_obj))
288         return NULL;
289 
290     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
291         ++self->count;
292         Py_RETURN_TRUE;
293     }
294 
295     if (timeout_obj != Py_None) {
296         timeout = PyFloat_AsDouble(timeout_obj);
297         if (PyErr_Occurred())
298             return NULL;
299         if (timeout < 0.0)
300             timeout = 0.0;
301 
302         if (gettimeofday(&now, NULL) < 0) {
303             PyErr_SetFromErrno(PyExc_OSError);
304             return NULL;
305         }
306         sec = (long) timeout;
307         nsec = (long) (1e9 * (timeout - sec) + 0.5);
308         deadline.tv_sec = now.tv_sec + sec;
309         deadline.tv_nsec = now.tv_usec * 1000 + nsec;
310         deadline.tv_sec += (deadline.tv_nsec / 1000000000);
311         deadline.tv_nsec %= 1000000000;
312     }
313 
314     do {
315         Py_BEGIN_ALLOW_THREADS
316         if (blocking && timeout_obj == Py_None)
317             res = sem_wait(self->handle);
318         else if (!blocking)
319             res = sem_trywait(self->handle);
320         else
321             res = sem_timedwait(self->handle, &deadline);
322         Py_END_ALLOW_THREADS
323         if (res == MP_EXCEPTION_HAS_BEEN_SET)
324             break;
325     } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
326 
327     if (res < 0) {
328         if (errno == EAGAIN || errno == ETIMEDOUT)
329             Py_RETURN_FALSE;
330         else if (errno == EINTR)
331             return NULL;
332         else
333             return PyErr_SetFromErrno(PyExc_OSError);
334     }
335 
336     ++self->count;
337     self->last_tid = PyThread_get_thread_ident();
338 
339     Py_RETURN_TRUE;
340 }
341 
342 static PyObject *
semlock_release(SemLockObject * self,PyObject * args)343 semlock_release(SemLockObject *self, PyObject *args)
344 {
345     if (self->kind == RECURSIVE_MUTEX) {
346         if (!ISMINE(self)) {
347             PyErr_SetString(PyExc_AssertionError, "attempt to "
348                             "release recursive lock not owned "
349                             "by thread");
350             return NULL;
351         }
352         if (self->count > 1) {
353             --self->count;
354             Py_RETURN_NONE;
355         }
356         assert(self->count == 1);
357     } else {
358 #ifdef HAVE_BROKEN_SEM_GETVALUE
359         /* We will only check properly the maxvalue == 1 case */
360         if (self->maxvalue == 1) {
361             /* make sure that already locked */
362             if (sem_trywait(self->handle) < 0) {
363                 if (errno != EAGAIN) {
364                     PyErr_SetFromErrno(PyExc_OSError);
365                     return NULL;
366                 }
367                 /* it is already locked as expected */
368             } else {
369                 /* it was not locked so undo wait and raise  */
370                 if (sem_post(self->handle) < 0) {
371                     PyErr_SetFromErrno(PyExc_OSError);
372                     return NULL;
373                 }
374                 PyErr_SetString(PyExc_ValueError, "semaphore "
375                                 "or lock released too many "
376                                 "times");
377                 return NULL;
378             }
379         }
380 #else
381         int sval;
382 
383         /* This check is not an absolute guarantee that the semaphore
384            does not rise above maxvalue. */
385         if (sem_getvalue(self->handle, &sval) < 0) {
386             return PyErr_SetFromErrno(PyExc_OSError);
387         } else if (sval >= self->maxvalue) {
388             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
389                             "released too many times");
390             return NULL;
391         }
392 #endif
393     }
394 
395     if (sem_post(self->handle) < 0)
396         return PyErr_SetFromErrno(PyExc_OSError);
397 
398     --self->count;
399     Py_RETURN_NONE;
400 }
401 
402 #endif /* !MS_WINDOWS */
403 
404 /*
405  * All platforms
406  */
407 
408 static PyObject *
newsemlockobject(PyTypeObject * type,SEM_HANDLE handle,int kind,int maxvalue)409 newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
410 {
411     SemLockObject *self;
412 
413     self = PyObject_New(SemLockObject, type);
414     if (!self)
415         return NULL;
416     self->handle = handle;
417     self->kind = kind;
418     self->count = 0;
419     self->last_tid = 0;
420     self->maxvalue = maxvalue;
421     return (PyObject*)self;
422 }
423 
424 static PyObject *
semlock_new(PyTypeObject * type,PyObject * args,PyObject * kwds)425 semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
426 {
427     char buffer[256];
428     SEM_HANDLE handle = SEM_FAILED;
429     int kind, maxvalue, value;
430     PyObject *result;
431     static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
432     int try = 0;
433 
434     if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
435                                      &kind, &value, &maxvalue))
436         return NULL;
437 
438     if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
439         PyErr_SetString(PyExc_ValueError, "unrecognized kind");
440         return NULL;
441     }
442 
443     /* Create a semaphore with a unique name. The bytes returned by
444      * _PyOS_URandom() are treated as unsigned long to ensure that the filename
445      * is valid (no special characters). */
446     do {
447         unsigned long suffix;
448         _PyOS_URandom((char *)&suffix, sizeof(suffix));
449         PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%lu", (long)getpid(),
450                       suffix);
451         SEM_CLEAR_ERROR();
452         handle = SEM_CREATE(buffer, value, maxvalue);
453     } while ((handle == SEM_FAILED) && (errno == EEXIST) && (++try < 100));
454 
455     /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
456     if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
457         goto failure;
458 
459     if (SEM_UNLINK(buffer) < 0)
460         goto failure;
461 
462     result = newsemlockobject(type, handle, kind, maxvalue);
463     if (!result)
464         goto failure;
465 
466     return result;
467 
468   failure:
469     if (handle != SEM_FAILED)
470         SEM_CLOSE(handle);
471     mp_SetError(NULL, MP_STANDARD_ERROR);
472     return NULL;
473 }
474 
475 static PyObject *
semlock_rebuild(PyTypeObject * type,PyObject * args)476 semlock_rebuild(PyTypeObject *type, PyObject *args)
477 {
478     SEM_HANDLE handle;
479     int kind, maxvalue;
480 
481     if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
482                           &handle, &kind, &maxvalue))
483         return NULL;
484 
485     return newsemlockobject(type, handle, kind, maxvalue);
486 }
487 
488 static void
semlock_dealloc(SemLockObject * self)489 semlock_dealloc(SemLockObject* self)
490 {
491     if (self->handle != SEM_FAILED)
492         SEM_CLOSE(self->handle);
493     PyObject_Del(self);
494 }
495 
496 static PyObject *
semlock_count(SemLockObject * self)497 semlock_count(SemLockObject *self)
498 {
499     return PyInt_FromLong((long)self->count);
500 }
501 
502 static PyObject *
semlock_ismine(SemLockObject * self)503 semlock_ismine(SemLockObject *self)
504 {
505     /* only makes sense for a lock */
506     return PyBool_FromLong(ISMINE(self));
507 }
508 
509 static PyObject *
semlock_getvalue(SemLockObject * self)510 semlock_getvalue(SemLockObject *self)
511 {
512 #ifdef HAVE_BROKEN_SEM_GETVALUE
513     PyErr_SetNone(PyExc_NotImplementedError);
514     return NULL;
515 #else
516     int sval;
517     if (SEM_GETVALUE(self->handle, &sval) < 0)
518         return mp_SetError(NULL, MP_STANDARD_ERROR);
519     /* some posix implementations use negative numbers to indicate
520        the number of waiting threads */
521     if (sval < 0)
522         sval = 0;
523     return PyInt_FromLong((long)sval);
524 #endif
525 }
526 
527 static PyObject *
semlock_iszero(SemLockObject * self)528 semlock_iszero(SemLockObject *self)
529 {
530 #ifdef HAVE_BROKEN_SEM_GETVALUE
531     if (sem_trywait(self->handle) < 0) {
532         if (errno == EAGAIN)
533             Py_RETURN_TRUE;
534         return mp_SetError(NULL, MP_STANDARD_ERROR);
535     } else {
536         if (sem_post(self->handle) < 0)
537             return mp_SetError(NULL, MP_STANDARD_ERROR);
538         Py_RETURN_FALSE;
539     }
540 #else
541     int sval;
542     if (SEM_GETVALUE(self->handle, &sval) < 0)
543         return mp_SetError(NULL, MP_STANDARD_ERROR);
544     return PyBool_FromLong((long)sval == 0);
545 #endif
546 }
547 
548 static PyObject *
semlock_afterfork(SemLockObject * self)549 semlock_afterfork(SemLockObject *self)
550 {
551     self->count = 0;
552     Py_RETURN_NONE;
553 }
554 
555 /*
556  * Semaphore methods
557  */
558 
559 static PyMethodDef semlock_methods[] = {
560     {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
561      "acquire the semaphore/lock"},
562     {"release", (PyCFunction)semlock_release, METH_NOARGS,
563      "release the semaphore/lock"},
564     {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
565      "enter the semaphore/lock"},
566     {"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
567      "exit the semaphore/lock"},
568     {"_count", (PyCFunction)semlock_count, METH_NOARGS,
569      "num of `acquire()`s minus num of `release()`s for this process"},
570     {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS,
571      "whether the lock is owned by this thread"},
572     {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS,
573      "get the value of the semaphore"},
574     {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS,
575      "returns whether semaphore has value zero"},
576     {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS,
577      ""},
578     {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS,
579      "rezero the net acquisition count after fork()"},
580     {NULL}
581 };
582 
583 /*
584  * Member table
585  */
586 
587 static PyMemberDef semlock_members[] = {
588     {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY,
589      ""},
590     {"kind", T_INT, offsetof(SemLockObject, kind), READONLY,
591      ""},
592     {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
593      ""},
594     {NULL}
595 };
596 
597 /*
598  * Semaphore type
599  */
600 
601 PyTypeObject SemLockType = {
602     PyVarObject_HEAD_INIT(NULL, 0)
603     /* tp_name           */ "_multiprocessing.SemLock",
604     /* tp_basicsize      */ sizeof(SemLockObject),
605     /* tp_itemsize       */ 0,
606     /* tp_dealloc        */ (destructor)semlock_dealloc,
607     /* tp_print          */ 0,
608     /* tp_getattr        */ 0,
609     /* tp_setattr        */ 0,
610     /* tp_compare        */ 0,
611     /* tp_repr           */ 0,
612     /* tp_as_number      */ 0,
613     /* tp_as_sequence    */ 0,
614     /* tp_as_mapping     */ 0,
615     /* tp_hash           */ 0,
616     /* tp_call           */ 0,
617     /* tp_str            */ 0,
618     /* tp_getattro       */ 0,
619     /* tp_setattro       */ 0,
620     /* tp_as_buffer      */ 0,
621     /* tp_flags          */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
622     /* tp_doc            */ "Semaphore/Mutex type",
623     /* tp_traverse       */ 0,
624     /* tp_clear          */ 0,
625     /* tp_richcompare    */ 0,
626     /* tp_weaklistoffset */ 0,
627     /* tp_iter           */ 0,
628     /* tp_iternext       */ 0,
629     /* tp_methods        */ semlock_methods,
630     /* tp_members        */ semlock_members,
631     /* tp_getset         */ 0,
632     /* tp_base           */ 0,
633     /* tp_dict           */ 0,
634     /* tp_descr_get      */ 0,
635     /* tp_descr_set      */ 0,
636     /* tp_dictoffset     */ 0,
637     /* tp_init           */ 0,
638     /* tp_alloc          */ 0,
639     /* tp_new            */ semlock_new,
640 };
641