• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for lock.py"""
2
3import unittest
4from unittest import mock
5import re
6
7import asyncio
8
9STR_RGX_REPR = (
10    r'^<(?P<class>.*?) object at (?P<address>.*?)'
11    r'\[(?P<extras>'
12    r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?'
13    r')\]>\Z'
14)
15RGX_REPR = re.compile(STR_RGX_REPR)
16
17
18def tearDownModule():
19    asyncio.set_event_loop_policy(None)
20
21
22class LockTests(unittest.IsolatedAsyncioTestCase):
23
24    async def test_repr(self):
25        lock = asyncio.Lock()
26        self.assertTrue(repr(lock).endswith('[unlocked]>'))
27        self.assertTrue(RGX_REPR.match(repr(lock)))
28
29        await lock.acquire()
30        self.assertTrue(repr(lock).endswith('[locked]>'))
31        self.assertTrue(RGX_REPR.match(repr(lock)))
32
33    async def test_lock(self):
34        lock = asyncio.Lock()
35
36        with self.assertWarns(DeprecationWarning):
37            @asyncio.coroutine
38            def acquire_lock():
39                return (yield from lock)
40
41        with self.assertRaisesRegex(
42            TypeError,
43            "object is not iterable"
44        ):
45            await acquire_lock()
46
47        self.assertFalse(lock.locked())
48
49    async def test_lock_doesnt_accept_loop_parameter(self):
50        primitives_cls = [
51            asyncio.Lock,
52            asyncio.Condition,
53            asyncio.Event,
54            asyncio.Semaphore,
55            asyncio.BoundedSemaphore,
56        ]
57
58        loop = asyncio.get_running_loop()
59
60        for cls in primitives_cls:
61            with self.assertRaisesRegex(
62                TypeError,
63                rf'As of 3.10, the \*loop\* parameter was removed from '
64                rf'{cls.__name__}\(\) since it is no longer necessary'
65            ):
66                cls(loop=loop)
67
68    async def test_lock_by_with_statement(self):
69        primitives = [
70            asyncio.Lock(),
71            asyncio.Condition(),
72            asyncio.Semaphore(),
73            asyncio.BoundedSemaphore(),
74        ]
75
76        with self.assertWarns(DeprecationWarning):
77            @asyncio.coroutine
78            def test(lock):
79                yield from asyncio.sleep(0.01)
80                self.assertFalse(lock.locked())
81                with self.assertRaisesRegex(
82                    TypeError,
83                    "object is not iterable"
84                ):
85                    with (yield from lock):
86                        pass
87                self.assertFalse(lock.locked())
88
89        for lock in primitives:
90            await test(lock)
91            self.assertFalse(lock.locked())
92
93    async def test_acquire(self):
94        lock = asyncio.Lock()
95        result = []
96
97        self.assertTrue(await lock.acquire())
98
99        async def c1(result):
100            if await lock.acquire():
101                result.append(1)
102            return True
103
104        async def c2(result):
105            if await lock.acquire():
106                result.append(2)
107            return True
108
109        async def c3(result):
110            if await lock.acquire():
111                result.append(3)
112            return True
113
114        t1 = asyncio.create_task(c1(result))
115        t2 = asyncio.create_task(c2(result))
116
117        await asyncio.sleep(0)
118        self.assertEqual([], result)
119
120        lock.release()
121        await asyncio.sleep(0)
122        self.assertEqual([1], result)
123
124        await asyncio.sleep(0)
125        self.assertEqual([1], result)
126
127        t3 = asyncio.create_task(c3(result))
128
129        lock.release()
130        await asyncio.sleep(0)
131        self.assertEqual([1, 2], result)
132
133        lock.release()
134        await asyncio.sleep(0)
135        self.assertEqual([1, 2, 3], result)
136
137        self.assertTrue(t1.done())
138        self.assertTrue(t1.result())
139        self.assertTrue(t2.done())
140        self.assertTrue(t2.result())
141        self.assertTrue(t3.done())
142        self.assertTrue(t3.result())
143
144    async def test_acquire_cancel(self):
145        lock = asyncio.Lock()
146        self.assertTrue(await lock.acquire())
147
148        task = asyncio.create_task(lock.acquire())
149        asyncio.get_running_loop().call_soon(task.cancel)
150        with self.assertRaises(asyncio.CancelledError):
151            await task
152        self.assertFalse(lock._waiters)
153
154    async def test_cancel_race(self):
155        # Several tasks:
156        # - A acquires the lock
157        # - B is blocked in acquire()
158        # - C is blocked in acquire()
159        #
160        # Now, concurrently:
161        # - B is cancelled
162        # - A releases the lock
163        #
164        # If B's waiter is marked cancelled but not yet removed from
165        # _waiters, A's release() call will crash when trying to set
166        # B's waiter; instead, it should move on to C's waiter.
167
168        # Setup: A has the lock, b and c are waiting.
169        lock = asyncio.Lock()
170
171        async def lockit(name, blocker):
172            await lock.acquire()
173            try:
174                if blocker is not None:
175                    await blocker
176            finally:
177                lock.release()
178
179        fa = asyncio.get_running_loop().create_future()
180        ta = asyncio.create_task(lockit('A', fa))
181        await asyncio.sleep(0)
182        self.assertTrue(lock.locked())
183        tb = asyncio.create_task(lockit('B', None))
184        await asyncio.sleep(0)
185        self.assertEqual(len(lock._waiters), 1)
186        tc = asyncio.create_task(lockit('C', None))
187        await asyncio.sleep(0)
188        self.assertEqual(len(lock._waiters), 2)
189
190        # Create the race and check.
191        # Without the fix this failed at the last assert.
192        fa.set_result(None)
193        tb.cancel()
194        self.assertTrue(lock._waiters[0].cancelled())
195        await asyncio.sleep(0)
196        self.assertFalse(lock.locked())
197        self.assertTrue(ta.done())
198        self.assertTrue(tb.cancelled())
199        await tc
200
201    async def test_cancel_release_race(self):
202        # Issue 32734
203        # Acquire 4 locks, cancel second, release first
204        # and 2 locks are taken at once.
205        loop = asyncio.get_running_loop()
206        lock = asyncio.Lock()
207        lock_count = 0
208        call_count = 0
209
210        async def lockit():
211            nonlocal lock_count
212            nonlocal call_count
213            call_count += 1
214            await lock.acquire()
215            lock_count += 1
216
217        def trigger():
218            t1.cancel()
219            lock.release()
220
221        await lock.acquire()
222
223        t1 = asyncio.create_task(lockit())
224        t2 = asyncio.create_task(lockit())
225        t3 = asyncio.create_task(lockit())
226
227        # Start scheduled tasks
228        await asyncio.sleep(0)
229
230        loop.call_soon(trigger)
231        with self.assertRaises(asyncio.CancelledError):
232            # Wait for cancellation
233            await t1
234
235        # Make sure only one lock was taken
236        self.assertEqual(lock_count, 1)
237        # While 3 calls were made to lockit()
238        self.assertEqual(call_count, 3)
239        self.assertTrue(t1.cancelled() and t2.done())
240
241        # Cleanup the task that is stuck on acquire.
242        t3.cancel()
243        await asyncio.sleep(0)
244        self.assertTrue(t3.cancelled())
245
246    async def test_finished_waiter_cancelled(self):
247        lock = asyncio.Lock()
248
249        await lock.acquire()
250        self.assertTrue(lock.locked())
251
252        tb = asyncio.create_task(lock.acquire())
253        await asyncio.sleep(0)
254        self.assertEqual(len(lock._waiters), 1)
255
256        # Create a second waiter, wake up the first, and cancel it.
257        # Without the fix, the second was not woken up.
258        tc = asyncio.create_task(lock.acquire())
259        tb.cancel()
260        lock.release()
261        await asyncio.sleep(0)
262
263        self.assertTrue(lock.locked())
264        self.assertTrue(tb.cancelled())
265
266        # Cleanup
267        await tc
268
269    async def test_release_not_acquired(self):
270        lock = asyncio.Lock()
271
272        self.assertRaises(RuntimeError, lock.release)
273
274    async def test_release_no_waiters(self):
275        lock = asyncio.Lock()
276        await lock.acquire()
277        self.assertTrue(lock.locked())
278
279        lock.release()
280        self.assertFalse(lock.locked())
281
282    async def test_context_manager(self):
283        lock = asyncio.Lock()
284        self.assertFalse(lock.locked())
285
286        async with lock:
287            self.assertTrue(lock.locked())
288
289        self.assertFalse(lock.locked())
290
291
292class EventTests(unittest.IsolatedAsyncioTestCase):
293
294    def test_repr(self):
295        ev = asyncio.Event()
296        self.assertTrue(repr(ev).endswith('[unset]>'))
297        match = RGX_REPR.match(repr(ev))
298        self.assertEqual(match.group('extras'), 'unset')
299
300        ev.set()
301        self.assertTrue(repr(ev).endswith('[set]>'))
302        self.assertTrue(RGX_REPR.match(repr(ev)))
303
304        ev._waiters.append(mock.Mock())
305        self.assertTrue('waiters:1' in repr(ev))
306        self.assertTrue(RGX_REPR.match(repr(ev)))
307
308    async def test_wait(self):
309        ev = asyncio.Event()
310        self.assertFalse(ev.is_set())
311
312        result = []
313
314        async def c1(result):
315            if await ev.wait():
316                result.append(1)
317
318        async def c2(result):
319            if await ev.wait():
320                result.append(2)
321
322        async def c3(result):
323            if await ev.wait():
324                result.append(3)
325
326        t1 = asyncio.create_task(c1(result))
327        t2 = asyncio.create_task(c2(result))
328
329        await asyncio.sleep(0)
330        self.assertEqual([], result)
331
332        t3 = asyncio.create_task(c3(result))
333
334        ev.set()
335        await asyncio.sleep(0)
336        self.assertEqual([3, 1, 2], result)
337
338        self.assertTrue(t1.done())
339        self.assertIsNone(t1.result())
340        self.assertTrue(t2.done())
341        self.assertIsNone(t2.result())
342        self.assertTrue(t3.done())
343        self.assertIsNone(t3.result())
344
345    async def test_wait_on_set(self):
346        ev = asyncio.Event()
347        ev.set()
348
349        res = await ev.wait()
350        self.assertTrue(res)
351
352    async def test_wait_cancel(self):
353        ev = asyncio.Event()
354
355        wait = asyncio.create_task(ev.wait())
356        asyncio.get_running_loop().call_soon(wait.cancel)
357        with self.assertRaises(asyncio.CancelledError):
358            await wait
359        self.assertFalse(ev._waiters)
360
361    async def test_clear(self):
362        ev = asyncio.Event()
363        self.assertFalse(ev.is_set())
364
365        ev.set()
366        self.assertTrue(ev.is_set())
367
368        ev.clear()
369        self.assertFalse(ev.is_set())
370
371    async def test_clear_with_waiters(self):
372        ev = asyncio.Event()
373        result = []
374
375        async def c1(result):
376            if await ev.wait():
377                result.append(1)
378            return True
379
380        t = asyncio.create_task(c1(result))
381        await asyncio.sleep(0)
382        self.assertEqual([], result)
383
384        ev.set()
385        ev.clear()
386        self.assertFalse(ev.is_set())
387
388        ev.set()
389        ev.set()
390        self.assertEqual(1, len(ev._waiters))
391
392        await asyncio.sleep(0)
393        self.assertEqual([1], result)
394        self.assertEqual(0, len(ev._waiters))
395
396        self.assertTrue(t.done())
397        self.assertTrue(t.result())
398
399
400class ConditionTests(unittest.IsolatedAsyncioTestCase):
401
402    async def test_wait(self):
403        cond = asyncio.Condition()
404        result = []
405
406        async def c1(result):
407            await cond.acquire()
408            if await cond.wait():
409                result.append(1)
410            return True
411
412        async def c2(result):
413            await cond.acquire()
414            if await cond.wait():
415                result.append(2)
416            return True
417
418        async def c3(result):
419            await cond.acquire()
420            if await cond.wait():
421                result.append(3)
422            return True
423
424        t1 = asyncio.create_task(c1(result))
425        t2 = asyncio.create_task(c2(result))
426        t3 = asyncio.create_task(c3(result))
427
428        await asyncio.sleep(0)
429        self.assertEqual([], result)
430        self.assertFalse(cond.locked())
431
432        self.assertTrue(await cond.acquire())
433        cond.notify()
434        await asyncio.sleep(0)
435        self.assertEqual([], result)
436        self.assertTrue(cond.locked())
437
438        cond.release()
439        await asyncio.sleep(0)
440        self.assertEqual([1], result)
441        self.assertTrue(cond.locked())
442
443        cond.notify(2)
444        await asyncio.sleep(0)
445        self.assertEqual([1], result)
446        self.assertTrue(cond.locked())
447
448        cond.release()
449        await asyncio.sleep(0)
450        self.assertEqual([1, 2], result)
451        self.assertTrue(cond.locked())
452
453        cond.release()
454        await asyncio.sleep(0)
455        self.assertEqual([1, 2, 3], result)
456        self.assertTrue(cond.locked())
457
458        self.assertTrue(t1.done())
459        self.assertTrue(t1.result())
460        self.assertTrue(t2.done())
461        self.assertTrue(t2.result())
462        self.assertTrue(t3.done())
463        self.assertTrue(t3.result())
464
465    async def test_wait_cancel(self):
466        cond = asyncio.Condition()
467        await cond.acquire()
468
469        wait = asyncio.create_task(cond.wait())
470        asyncio.get_running_loop().call_soon(wait.cancel)
471        with self.assertRaises(asyncio.CancelledError):
472            await wait
473        self.assertFalse(cond._waiters)
474        self.assertTrue(cond.locked())
475
476    async def test_wait_cancel_contested(self):
477        cond = asyncio.Condition()
478
479        await cond.acquire()
480        self.assertTrue(cond.locked())
481
482        wait_task = asyncio.create_task(cond.wait())
483        await asyncio.sleep(0)
484        self.assertFalse(cond.locked())
485
486        # Notify, but contest the lock before cancelling
487        await cond.acquire()
488        self.assertTrue(cond.locked())
489        cond.notify()
490        asyncio.get_running_loop().call_soon(wait_task.cancel)
491        asyncio.get_running_loop().call_soon(cond.release)
492
493        try:
494            await wait_task
495        except asyncio.CancelledError:
496            # Should not happen, since no cancellation points
497            pass
498
499        self.assertTrue(cond.locked())
500
501    async def test_wait_cancel_after_notify(self):
502        # See bpo-32841
503        waited = False
504
505        cond = asyncio.Condition()
506
507        async def wait_on_cond():
508            nonlocal waited
509            async with cond:
510                waited = True  # Make sure this area was reached
511                await cond.wait()
512
513        waiter = asyncio.create_task(wait_on_cond())
514        await asyncio.sleep(0)  # Start waiting
515
516        await cond.acquire()
517        cond.notify()
518        await asyncio.sleep(0)  # Get to acquire()
519        waiter.cancel()
520        await asyncio.sleep(0)  # Activate cancellation
521        cond.release()
522        await asyncio.sleep(0)  # Cancellation should occur
523
524        self.assertTrue(waiter.cancelled())
525        self.assertTrue(waited)
526
527    async def test_wait_unacquired(self):
528        cond = asyncio.Condition()
529        with self.assertRaises(RuntimeError):
530            await cond.wait()
531
532    async def test_wait_for(self):
533        cond = asyncio.Condition()
534        presult = False
535
536        def predicate():
537            return presult
538
539        result = []
540
541        async def c1(result):
542            await cond.acquire()
543            if await cond.wait_for(predicate):
544                result.append(1)
545                cond.release()
546            return True
547
548        t = asyncio.create_task(c1(result))
549
550        await asyncio.sleep(0)
551        self.assertEqual([], result)
552
553        await cond.acquire()
554        cond.notify()
555        cond.release()
556        await asyncio.sleep(0)
557        self.assertEqual([], result)
558
559        presult = True
560        await cond.acquire()
561        cond.notify()
562        cond.release()
563        await asyncio.sleep(0)
564        self.assertEqual([1], result)
565
566        self.assertTrue(t.done())
567        self.assertTrue(t.result())
568
569    async def test_wait_for_unacquired(self):
570        cond = asyncio.Condition()
571
572        # predicate can return true immediately
573        res = await cond.wait_for(lambda: [1, 2, 3])
574        self.assertEqual([1, 2, 3], res)
575
576        with self.assertRaises(RuntimeError):
577            await cond.wait_for(lambda: False)
578
579    async def test_notify(self):
580        cond = asyncio.Condition()
581        result = []
582
583        async def c1(result):
584            await cond.acquire()
585            if await cond.wait():
586                result.append(1)
587                cond.release()
588            return True
589
590        async def c2(result):
591            await cond.acquire()
592            if await cond.wait():
593                result.append(2)
594                cond.release()
595            return True
596
597        async def c3(result):
598            await cond.acquire()
599            if await cond.wait():
600                result.append(3)
601                cond.release()
602            return True
603
604        t1 = asyncio.create_task(c1(result))
605        t2 = asyncio.create_task(c2(result))
606        t3 = asyncio.create_task(c3(result))
607
608        await asyncio.sleep(0)
609        self.assertEqual([], result)
610
611        await cond.acquire()
612        cond.notify(1)
613        cond.release()
614        await asyncio.sleep(0)
615        self.assertEqual([1], result)
616
617        await cond.acquire()
618        cond.notify(1)
619        cond.notify(2048)
620        cond.release()
621        await asyncio.sleep(0)
622        self.assertEqual([1, 2, 3], result)
623
624        self.assertTrue(t1.done())
625        self.assertTrue(t1.result())
626        self.assertTrue(t2.done())
627        self.assertTrue(t2.result())
628        self.assertTrue(t3.done())
629        self.assertTrue(t3.result())
630
631    async def test_notify_all(self):
632        cond = asyncio.Condition()
633
634        result = []
635
636        async def c1(result):
637            await cond.acquire()
638            if await cond.wait():
639                result.append(1)
640                cond.release()
641            return True
642
643        async def c2(result):
644            await cond.acquire()
645            if await cond.wait():
646                result.append(2)
647                cond.release()
648            return True
649
650        t1 = asyncio.create_task(c1(result))
651        t2 = asyncio.create_task(c2(result))
652
653        await asyncio.sleep(0)
654        self.assertEqual([], result)
655
656        await cond.acquire()
657        cond.notify_all()
658        cond.release()
659        await asyncio.sleep(0)
660        self.assertEqual([1, 2], result)
661
662        self.assertTrue(t1.done())
663        self.assertTrue(t1.result())
664        self.assertTrue(t2.done())
665        self.assertTrue(t2.result())
666
667    def test_notify_unacquired(self):
668        cond = asyncio.Condition()
669        self.assertRaises(RuntimeError, cond.notify)
670
671    def test_notify_all_unacquired(self):
672        cond = asyncio.Condition()
673        self.assertRaises(RuntimeError, cond.notify_all)
674
675    async def test_repr(self):
676        cond = asyncio.Condition()
677        self.assertTrue('unlocked' in repr(cond))
678        self.assertTrue(RGX_REPR.match(repr(cond)))
679
680        await cond.acquire()
681        self.assertTrue('locked' in repr(cond))
682
683        cond._waiters.append(mock.Mock())
684        self.assertTrue('waiters:1' in repr(cond))
685        self.assertTrue(RGX_REPR.match(repr(cond)))
686
687        cond._waiters.append(mock.Mock())
688        self.assertTrue('waiters:2' in repr(cond))
689        self.assertTrue(RGX_REPR.match(repr(cond)))
690
691    async def test_context_manager(self):
692        cond = asyncio.Condition()
693        self.assertFalse(cond.locked())
694        async with cond:
695            self.assertTrue(cond.locked())
696        self.assertFalse(cond.locked())
697
698    async def test_explicit_lock(self):
699        async def f(lock=None, cond=None):
700            if lock is None:
701                lock = asyncio.Lock()
702            if cond is None:
703                cond = asyncio.Condition(lock)
704            self.assertIs(cond._lock, lock)
705            self.assertFalse(lock.locked())
706            self.assertFalse(cond.locked())
707            async with cond:
708                self.assertTrue(lock.locked())
709                self.assertTrue(cond.locked())
710            self.assertFalse(lock.locked())
711            self.assertFalse(cond.locked())
712            async with lock:
713                self.assertTrue(lock.locked())
714                self.assertTrue(cond.locked())
715            self.assertFalse(lock.locked())
716            self.assertFalse(cond.locked())
717
718        # All should work in the same way.
719        await f()
720        await f(asyncio.Lock())
721        lock = asyncio.Lock()
722        await f(lock, asyncio.Condition(lock))
723
724    async def test_ambiguous_loops(self):
725        loop = asyncio.new_event_loop()
726        self.addCleanup(loop.close)
727
728        async def wrong_loop_in_lock():
729            with self.assertRaises(TypeError):
730                asyncio.Lock(loop=loop)  # actively disallowed since 3.10
731            lock = asyncio.Lock()
732            lock._loop = loop  # use private API for testing
733            async with lock:
734                # acquired immediately via the fast-path
735                # without interaction with any event loop.
736                cond = asyncio.Condition(lock)
737                # cond.acquire() will trigger waiting on the lock
738                # and it will discover the event loop mismatch.
739                with self.assertRaisesRegex(
740                    RuntimeError,
741                    "is bound to a different event loop",
742                ):
743                    await cond.acquire()
744
745        async def wrong_loop_in_cond():
746            # Same analogy here with the condition's loop.
747            lock = asyncio.Lock()
748            async with lock:
749                with self.assertRaises(TypeError):
750                    asyncio.Condition(lock, loop=loop)
751                cond = asyncio.Condition(lock)
752                cond._loop = loop
753                with self.assertRaisesRegex(
754                    RuntimeError,
755                    "is bound to a different event loop",
756                ):
757                    await cond.wait()
758
759        await wrong_loop_in_lock()
760        await wrong_loop_in_cond()
761
762    async def test_timeout_in_block(self):
763        condition = asyncio.Condition()
764        async with condition:
765            with self.assertRaises(asyncio.TimeoutError):
766                await asyncio.wait_for(condition.wait(), timeout=0.5)
767
768
769class SemaphoreTests(unittest.IsolatedAsyncioTestCase):
770
771    def test_initial_value_zero(self):
772        sem = asyncio.Semaphore(0)
773        self.assertTrue(sem.locked())
774
775    async def test_repr(self):
776        sem = asyncio.Semaphore()
777        self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
778        self.assertTrue(RGX_REPR.match(repr(sem)))
779
780        await sem.acquire()
781        self.assertTrue(repr(sem).endswith('[locked]>'))
782        self.assertTrue('waiters' not in repr(sem))
783        self.assertTrue(RGX_REPR.match(repr(sem)))
784
785        sem._waiters.append(mock.Mock())
786        self.assertTrue('waiters:1' in repr(sem))
787        self.assertTrue(RGX_REPR.match(repr(sem)))
788
789        sem._waiters.append(mock.Mock())
790        self.assertTrue('waiters:2' in repr(sem))
791        self.assertTrue(RGX_REPR.match(repr(sem)))
792
793    async def test_semaphore(self):
794        sem = asyncio.Semaphore()
795        self.assertEqual(1, sem._value)
796
797        with self.assertWarns(DeprecationWarning):
798            @asyncio.coroutine
799            def acquire_lock():
800                return (yield from sem)
801
802        with self.assertRaisesRegex(
803            TypeError,
804            "'Semaphore' object is not iterable",
805        ):
806            await acquire_lock()
807
808        self.assertFalse(sem.locked())
809        self.assertEqual(1, sem._value)
810
811    def test_semaphore_value(self):
812        self.assertRaises(ValueError, asyncio.Semaphore, -1)
813
814    async def test_acquire(self):
815        sem = asyncio.Semaphore(3)
816        result = []
817
818        self.assertTrue(await sem.acquire())
819        self.assertTrue(await sem.acquire())
820        self.assertFalse(sem.locked())
821
822        async def c1(result):
823            await sem.acquire()
824            result.append(1)
825            return True
826
827        async def c2(result):
828            await sem.acquire()
829            result.append(2)
830            return True
831
832        async def c3(result):
833            await sem.acquire()
834            result.append(3)
835            return True
836
837        async def c4(result):
838            await sem.acquire()
839            result.append(4)
840            return True
841
842        t1 = asyncio.create_task(c1(result))
843        t2 = asyncio.create_task(c2(result))
844        t3 = asyncio.create_task(c3(result))
845
846        await asyncio.sleep(0)
847        self.assertEqual([1], result)
848        self.assertTrue(sem.locked())
849        self.assertEqual(2, len(sem._waiters))
850        self.assertEqual(0, sem._value)
851
852        t4 = asyncio.create_task(c4(result))
853
854        sem.release()
855        sem.release()
856        self.assertEqual(2, sem._value)
857
858        await asyncio.sleep(0)
859        self.assertEqual(0, sem._value)
860        self.assertEqual(3, len(result))
861        self.assertTrue(sem.locked())
862        self.assertEqual(1, len(sem._waiters))
863        self.assertEqual(0, sem._value)
864
865        self.assertTrue(t1.done())
866        self.assertTrue(t1.result())
867        race_tasks = [t2, t3, t4]
868        done_tasks = [t for t in race_tasks if t.done() and t.result()]
869        self.assertTrue(2, len(done_tasks))
870
871        # cleanup locked semaphore
872        sem.release()
873        await asyncio.gather(*race_tasks)
874
875    async def test_acquire_cancel(self):
876        sem = asyncio.Semaphore()
877        await sem.acquire()
878
879        acquire = asyncio.create_task(sem.acquire())
880        asyncio.get_running_loop().call_soon(acquire.cancel)
881        with self.assertRaises(asyncio.CancelledError):
882            await acquire
883        self.assertTrue((not sem._waiters) or
884                        all(waiter.done() for waiter in sem._waiters))
885
886    async def test_acquire_cancel_before_awoken(self):
887        sem = asyncio.Semaphore(value=0)
888
889        t1 = asyncio.create_task(sem.acquire())
890        t2 = asyncio.create_task(sem.acquire())
891        t3 = asyncio.create_task(sem.acquire())
892        t4 = asyncio.create_task(sem.acquire())
893
894        await asyncio.sleep(0)
895
896        t1.cancel()
897        t2.cancel()
898        sem.release()
899
900        await asyncio.sleep(0)
901        num_done = sum(t.done() for t in [t3, t4])
902        self.assertEqual(num_done, 1)
903        self.assertTrue(t3.done())
904        self.assertFalse(t4.done())
905
906        t3.cancel()
907        t4.cancel()
908        await asyncio.sleep(0)
909
910    async def test_acquire_hang(self):
911        sem = asyncio.Semaphore(value=0)
912
913        t1 = asyncio.create_task(sem.acquire())
914        t2 = asyncio.create_task(sem.acquire())
915        await asyncio.sleep(0)
916
917        t1.cancel()
918        sem.release()
919        await asyncio.sleep(0)
920        self.assertTrue(sem.locked())
921        self.assertTrue(t2.done())
922
923    def test_release_not_acquired(self):
924        sem = asyncio.BoundedSemaphore()
925
926        self.assertRaises(ValueError, sem.release)
927
928    async def test_release_no_waiters(self):
929        sem = asyncio.Semaphore()
930        await sem.acquire()
931        self.assertTrue(sem.locked())
932
933        sem.release()
934        self.assertFalse(sem.locked())
935
936
937if __name__ == '__main__':
938    unittest.main()
939