• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for lock.py"""
2
3import unittest
4from unittest import mock
5import re
6
7import asyncio
8from test.test_asyncio import utils as test_utils
9
10STR_RGX_REPR = (
11    r'^<(?P<class>.*?) object at (?P<address>.*?)'
12    r'\[(?P<extras>'
13    r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?'
14    r')\]>\Z'
15)
16RGX_REPR = re.compile(STR_RGX_REPR)
17
18
19def tearDownModule():
20    asyncio.set_event_loop_policy(None)
21
22
23class LockTests(test_utils.TestCase):
24
25    def setUp(self):
26        super().setUp()
27        self.loop = self.new_test_loop()
28
29    def test_ctor_loop(self):
30        loop = mock.Mock()
31        with self.assertWarns(DeprecationWarning):
32            lock = asyncio.Lock(loop=loop)
33        self.assertIs(lock._loop, loop)
34
35        with self.assertWarns(DeprecationWarning):
36            lock = asyncio.Lock(loop=self.loop)
37        self.assertIs(lock._loop, self.loop)
38
39    def test_ctor_noloop(self):
40        asyncio.set_event_loop(self.loop)
41        lock = asyncio.Lock()
42        self.assertIs(lock._loop, self.loop)
43
44    def test_repr(self):
45        with self.assertWarns(DeprecationWarning):
46            lock = asyncio.Lock(loop=self.loop)
47        self.assertTrue(repr(lock).endswith('[unlocked]>'))
48        self.assertTrue(RGX_REPR.match(repr(lock)))
49
50        self.loop.run_until_complete(lock.acquire())
51        self.assertTrue(repr(lock).endswith('[locked]>'))
52        self.assertTrue(RGX_REPR.match(repr(lock)))
53
54    def test_lock(self):
55        with self.assertWarns(DeprecationWarning):
56            lock = asyncio.Lock(loop=self.loop)
57
58            @asyncio.coroutine
59            def acquire_lock():
60                return (yield from lock)
61
62        with self.assertRaisesRegex(
63            TypeError,
64            "object is not iterable"
65        ):
66            self.loop.run_until_complete(acquire_lock())
67
68        self.assertFalse(lock.locked())
69
70    def test_lock_by_with_statement(self):
71        loop = asyncio.new_event_loop()  # don't use TestLoop quirks
72        self.set_event_loop(loop)
73        with self.assertWarns(DeprecationWarning):
74            primitives = [
75                asyncio.Lock(loop=loop),
76                asyncio.Condition(loop=loop),
77                asyncio.Semaphore(loop=loop),
78                asyncio.BoundedSemaphore(loop=loop),
79            ]
80
81            @asyncio.coroutine
82            def test(lock):
83                yield from asyncio.sleep(0.01)
84                self.assertFalse(lock.locked())
85                with self.assertRaisesRegex(
86                    TypeError,
87                    "object is not iterable"
88                ):
89                    with (yield from lock):
90                        pass
91                self.assertFalse(lock.locked())
92
93        for primitive in primitives:
94            loop.run_until_complete(test(primitive))
95            self.assertFalse(primitive.locked())
96
97    def test_acquire(self):
98        with self.assertWarns(DeprecationWarning):
99            lock = asyncio.Lock(loop=self.loop)
100        result = []
101
102        self.assertTrue(self.loop.run_until_complete(lock.acquire()))
103
104        async def c1(result):
105            if await lock.acquire():
106                result.append(1)
107            return True
108
109        async def c2(result):
110            if await lock.acquire():
111                result.append(2)
112            return True
113
114        async def c3(result):
115            if await lock.acquire():
116                result.append(3)
117            return True
118
119        t1 = self.loop.create_task(c1(result))
120        t2 = self.loop.create_task(c2(result))
121
122        test_utils.run_briefly(self.loop)
123        self.assertEqual([], result)
124
125        lock.release()
126        test_utils.run_briefly(self.loop)
127        self.assertEqual([1], result)
128
129        test_utils.run_briefly(self.loop)
130        self.assertEqual([1], result)
131
132        t3 = self.loop.create_task(c3(result))
133
134        lock.release()
135        test_utils.run_briefly(self.loop)
136        self.assertEqual([1, 2], result)
137
138        lock.release()
139        test_utils.run_briefly(self.loop)
140        self.assertEqual([1, 2, 3], result)
141
142        self.assertTrue(t1.done())
143        self.assertTrue(t1.result())
144        self.assertTrue(t2.done())
145        self.assertTrue(t2.result())
146        self.assertTrue(t3.done())
147        self.assertTrue(t3.result())
148
149    def test_acquire_cancel(self):
150        with self.assertWarns(DeprecationWarning):
151            lock = asyncio.Lock(loop=self.loop)
152        self.assertTrue(self.loop.run_until_complete(lock.acquire()))
153
154        task = self.loop.create_task(lock.acquire())
155        self.loop.call_soon(task.cancel)
156        self.assertRaises(
157            asyncio.CancelledError,
158            self.loop.run_until_complete, task)
159        self.assertFalse(lock._waiters)
160
161    def test_cancel_race(self):
162        # Several tasks:
163        # - A acquires the lock
164        # - B is blocked in acquire()
165        # - C is blocked in acquire()
166        #
167        # Now, concurrently:
168        # - B is cancelled
169        # - A releases the lock
170        #
171        # If B's waiter is marked cancelled but not yet removed from
172        # _waiters, A's release() call will crash when trying to set
173        # B's waiter; instead, it should move on to C's waiter.
174
175        # Setup: A has the lock, b and c are waiting.
176        with self.assertWarns(DeprecationWarning):
177            lock = asyncio.Lock(loop=self.loop)
178
179        async def lockit(name, blocker):
180            await lock.acquire()
181            try:
182                if blocker is not None:
183                    await blocker
184            finally:
185                lock.release()
186
187        fa = self.loop.create_future()
188        ta = self.loop.create_task(lockit('A', fa))
189        test_utils.run_briefly(self.loop)
190        self.assertTrue(lock.locked())
191        tb = self.loop.create_task(lockit('B', None))
192        test_utils.run_briefly(self.loop)
193        self.assertEqual(len(lock._waiters), 1)
194        tc = self.loop.create_task(lockit('C', None))
195        test_utils.run_briefly(self.loop)
196        self.assertEqual(len(lock._waiters), 2)
197
198        # Create the race and check.
199        # Without the fix this failed at the last assert.
200        fa.set_result(None)
201        tb.cancel()
202        self.assertTrue(lock._waiters[0].cancelled())
203        test_utils.run_briefly(self.loop)
204        self.assertFalse(lock.locked())
205        self.assertTrue(ta.done())
206        self.assertTrue(tb.cancelled())
207        self.assertTrue(tc.done())
208
209    def test_cancel_release_race(self):
210        # Issue 32734
211        # Acquire 4 locks, cancel second, release first
212        # and 2 locks are taken at once.
213        with self.assertWarns(DeprecationWarning):
214            lock = asyncio.Lock(loop=self.loop)
215        lock_count = 0
216        call_count = 0
217
218        async def lockit():
219            nonlocal lock_count
220            nonlocal call_count
221            call_count += 1
222            await lock.acquire()
223            lock_count += 1
224
225        async def lockandtrigger():
226            await lock.acquire()
227            self.loop.call_soon(trigger)
228
229        def trigger():
230            t1.cancel()
231            lock.release()
232
233        t0 = self.loop.create_task(lockandtrigger())
234        t1 = self.loop.create_task(lockit())
235        t2 = self.loop.create_task(lockit())
236        t3 = self.loop.create_task(lockit())
237
238        # First loop acquires all
239        test_utils.run_briefly(self.loop)
240        self.assertTrue(t0.done())
241
242        # Second loop calls trigger
243        test_utils.run_briefly(self.loop)
244        # Third loop calls cancellation
245        test_utils.run_briefly(self.loop)
246
247        # Make sure only one lock was taken
248        self.assertEqual(lock_count, 1)
249        # While 3 calls were made to lockit()
250        self.assertEqual(call_count, 3)
251        self.assertTrue(t1.cancelled() and t2.done())
252
253        # Cleanup the task that is stuck on acquire.
254        t3.cancel()
255        test_utils.run_briefly(self.loop)
256        self.assertTrue(t3.cancelled())
257
258    def test_finished_waiter_cancelled(self):
259        with self.assertWarns(DeprecationWarning):
260            lock = asyncio.Lock(loop=self.loop)
261
262        ta = self.loop.create_task(lock.acquire())
263        test_utils.run_briefly(self.loop)
264        self.assertTrue(lock.locked())
265
266        tb = self.loop.create_task(lock.acquire())
267        test_utils.run_briefly(self.loop)
268        self.assertEqual(len(lock._waiters), 1)
269
270        # Create a second waiter, wake up the first, and cancel it.
271        # Without the fix, the second was not woken up.
272        tc = self.loop.create_task(lock.acquire())
273        lock.release()
274        tb.cancel()
275        test_utils.run_briefly(self.loop)
276
277        self.assertTrue(lock.locked())
278        self.assertTrue(ta.done())
279        self.assertTrue(tb.cancelled())
280
281    def test_release_not_acquired(self):
282        with self.assertWarns(DeprecationWarning):
283            lock = asyncio.Lock(loop=self.loop)
284
285        self.assertRaises(RuntimeError, lock.release)
286
287    def test_release_no_waiters(self):
288        with self.assertWarns(DeprecationWarning):
289            lock = asyncio.Lock(loop=self.loop)
290        self.loop.run_until_complete(lock.acquire())
291        self.assertTrue(lock.locked())
292
293        lock.release()
294        self.assertFalse(lock.locked())
295
296    def test_context_manager(self):
297        async def f():
298            lock = asyncio.Lock()
299            self.assertFalse(lock.locked())
300
301            async with lock:
302                self.assertTrue(lock.locked())
303
304            self.assertFalse(lock.locked())
305
306        self.loop.run_until_complete(f())
307
308
309class EventTests(test_utils.TestCase):
310
311    def setUp(self):
312        super().setUp()
313        self.loop = self.new_test_loop()
314
315    def test_ctor_loop(self):
316        loop = mock.Mock()
317        with self.assertWarns(DeprecationWarning):
318            ev = asyncio.Event(loop=loop)
319        self.assertIs(ev._loop, loop)
320
321        with self.assertWarns(DeprecationWarning):
322            ev = asyncio.Event(loop=self.loop)
323        self.assertIs(ev._loop, self.loop)
324
325    def test_ctor_noloop(self):
326        asyncio.set_event_loop(self.loop)
327        ev = asyncio.Event()
328        self.assertIs(ev._loop, self.loop)
329
330    def test_repr(self):
331        with self.assertWarns(DeprecationWarning):
332            ev = asyncio.Event(loop=self.loop)
333        self.assertTrue(repr(ev).endswith('[unset]>'))
334        match = RGX_REPR.match(repr(ev))
335        self.assertEqual(match.group('extras'), 'unset')
336
337        ev.set()
338        self.assertTrue(repr(ev).endswith('[set]>'))
339        self.assertTrue(RGX_REPR.match(repr(ev)))
340
341        ev._waiters.append(mock.Mock())
342        self.assertTrue('waiters:1' in repr(ev))
343        self.assertTrue(RGX_REPR.match(repr(ev)))
344
345    def test_wait(self):
346        with self.assertWarns(DeprecationWarning):
347            ev = asyncio.Event(loop=self.loop)
348        self.assertFalse(ev.is_set())
349
350        result = []
351
352        async def c1(result):
353            if await ev.wait():
354                result.append(1)
355
356        async def c2(result):
357            if await ev.wait():
358                result.append(2)
359
360        async def c3(result):
361            if await ev.wait():
362                result.append(3)
363
364        t1 = self.loop.create_task(c1(result))
365        t2 = self.loop.create_task(c2(result))
366
367        test_utils.run_briefly(self.loop)
368        self.assertEqual([], result)
369
370        t3 = self.loop.create_task(c3(result))
371
372        ev.set()
373        test_utils.run_briefly(self.loop)
374        self.assertEqual([3, 1, 2], result)
375
376        self.assertTrue(t1.done())
377        self.assertIsNone(t1.result())
378        self.assertTrue(t2.done())
379        self.assertIsNone(t2.result())
380        self.assertTrue(t3.done())
381        self.assertIsNone(t3.result())
382
383    def test_wait_on_set(self):
384        with self.assertWarns(DeprecationWarning):
385            ev = asyncio.Event(loop=self.loop)
386        ev.set()
387
388        res = self.loop.run_until_complete(ev.wait())
389        self.assertTrue(res)
390
391    def test_wait_cancel(self):
392        with self.assertWarns(DeprecationWarning):
393            ev = asyncio.Event(loop=self.loop)
394
395        wait = self.loop.create_task(ev.wait())
396        self.loop.call_soon(wait.cancel)
397        self.assertRaises(
398            asyncio.CancelledError,
399            self.loop.run_until_complete, wait)
400        self.assertFalse(ev._waiters)
401
402    def test_clear(self):
403        with self.assertWarns(DeprecationWarning):
404            ev = asyncio.Event(loop=self.loop)
405        self.assertFalse(ev.is_set())
406
407        ev.set()
408        self.assertTrue(ev.is_set())
409
410        ev.clear()
411        self.assertFalse(ev.is_set())
412
413    def test_clear_with_waiters(self):
414        with self.assertWarns(DeprecationWarning):
415            ev = asyncio.Event(loop=self.loop)
416        result = []
417
418        async def c1(result):
419            if await ev.wait():
420                result.append(1)
421            return True
422
423        t = self.loop.create_task(c1(result))
424        test_utils.run_briefly(self.loop)
425        self.assertEqual([], result)
426
427        ev.set()
428        ev.clear()
429        self.assertFalse(ev.is_set())
430
431        ev.set()
432        ev.set()
433        self.assertEqual(1, len(ev._waiters))
434
435        test_utils.run_briefly(self.loop)
436        self.assertEqual([1], result)
437        self.assertEqual(0, len(ev._waiters))
438
439        self.assertTrue(t.done())
440        self.assertTrue(t.result())
441
442
443class ConditionTests(test_utils.TestCase):
444
445    def setUp(self):
446        super().setUp()
447        self.loop = self.new_test_loop()
448
449    def test_ctor_loop(self):
450        loop = mock.Mock()
451        with self.assertWarns(DeprecationWarning):
452            cond = asyncio.Condition(loop=loop)
453            self.assertIs(cond._loop, loop)
454
455            cond = asyncio.Condition(loop=self.loop)
456            self.assertIs(cond._loop, self.loop)
457
458    def test_ctor_noloop(self):
459        asyncio.set_event_loop(self.loop)
460        cond = asyncio.Condition()
461        self.assertIs(cond._loop, self.loop)
462
463    def test_wait(self):
464        with self.assertWarns(DeprecationWarning):
465            cond = asyncio.Condition(loop=self.loop)
466        result = []
467
468        async def c1(result):
469            await cond.acquire()
470            if await cond.wait():
471                result.append(1)
472            return True
473
474        async def c2(result):
475            await cond.acquire()
476            if await cond.wait():
477                result.append(2)
478            return True
479
480        async def c3(result):
481            await cond.acquire()
482            if await cond.wait():
483                result.append(3)
484            return True
485
486        t1 = self.loop.create_task(c1(result))
487        t2 = self.loop.create_task(c2(result))
488        t3 = self.loop.create_task(c3(result))
489
490        test_utils.run_briefly(self.loop)
491        self.assertEqual([], result)
492        self.assertFalse(cond.locked())
493
494        self.assertTrue(self.loop.run_until_complete(cond.acquire()))
495        cond.notify()
496        test_utils.run_briefly(self.loop)
497        self.assertEqual([], result)
498        self.assertTrue(cond.locked())
499
500        cond.release()
501        test_utils.run_briefly(self.loop)
502        self.assertEqual([1], result)
503        self.assertTrue(cond.locked())
504
505        cond.notify(2)
506        test_utils.run_briefly(self.loop)
507        self.assertEqual([1], result)
508        self.assertTrue(cond.locked())
509
510        cond.release()
511        test_utils.run_briefly(self.loop)
512        self.assertEqual([1, 2], result)
513        self.assertTrue(cond.locked())
514
515        cond.release()
516        test_utils.run_briefly(self.loop)
517        self.assertEqual([1, 2, 3], result)
518        self.assertTrue(cond.locked())
519
520        self.assertTrue(t1.done())
521        self.assertTrue(t1.result())
522        self.assertTrue(t2.done())
523        self.assertTrue(t2.result())
524        self.assertTrue(t3.done())
525        self.assertTrue(t3.result())
526
527    def test_wait_cancel(self):
528        with self.assertWarns(DeprecationWarning):
529            cond = asyncio.Condition(loop=self.loop)
530        self.loop.run_until_complete(cond.acquire())
531
532        wait = self.loop.create_task(cond.wait())
533        self.loop.call_soon(wait.cancel)
534        self.assertRaises(
535            asyncio.CancelledError,
536            self.loop.run_until_complete, wait)
537        self.assertFalse(cond._waiters)
538        self.assertTrue(cond.locked())
539
540    def test_wait_cancel_contested(self):
541        with self.assertWarns(DeprecationWarning):
542            cond = asyncio.Condition(loop=self.loop)
543
544        self.loop.run_until_complete(cond.acquire())
545        self.assertTrue(cond.locked())
546
547        wait_task = self.loop.create_task(cond.wait())
548        test_utils.run_briefly(self.loop)
549        self.assertFalse(cond.locked())
550
551        # Notify, but contest the lock before cancelling
552        self.loop.run_until_complete(cond.acquire())
553        self.assertTrue(cond.locked())
554        cond.notify()
555        self.loop.call_soon(wait_task.cancel)
556        self.loop.call_soon(cond.release)
557
558        try:
559            self.loop.run_until_complete(wait_task)
560        except asyncio.CancelledError:
561            # Should not happen, since no cancellation points
562            pass
563
564        self.assertTrue(cond.locked())
565
566    def test_wait_cancel_after_notify(self):
567        # See bpo-32841
568        with self.assertWarns(DeprecationWarning):
569            cond = asyncio.Condition(loop=self.loop)
570        waited = False
571
572        async def wait_on_cond():
573            nonlocal waited
574            async with cond:
575                waited = True  # Make sure this area was reached
576                await cond.wait()
577
578        waiter = asyncio.ensure_future(wait_on_cond(), loop=self.loop)
579        test_utils.run_briefly(self.loop)  # Start waiting
580
581        self.loop.run_until_complete(cond.acquire())
582        cond.notify()
583        test_utils.run_briefly(self.loop)  # Get to acquire()
584        waiter.cancel()
585        test_utils.run_briefly(self.loop)  # Activate cancellation
586        cond.release()
587        test_utils.run_briefly(self.loop)  # Cancellation should occur
588
589        self.assertTrue(waiter.cancelled())
590        self.assertTrue(waited)
591
592    def test_wait_unacquired(self):
593        with self.assertWarns(DeprecationWarning):
594            cond = asyncio.Condition(loop=self.loop)
595        self.assertRaises(
596            RuntimeError,
597            self.loop.run_until_complete, cond.wait())
598
599    def test_wait_for(self):
600        with self.assertWarns(DeprecationWarning):
601            cond = asyncio.Condition(loop=self.loop)
602        presult = False
603
604        def predicate():
605            return presult
606
607        result = []
608
609        async def c1(result):
610            await cond.acquire()
611            if await cond.wait_for(predicate):
612                result.append(1)
613                cond.release()
614            return True
615
616        t = self.loop.create_task(c1(result))
617
618        test_utils.run_briefly(self.loop)
619        self.assertEqual([], result)
620
621        self.loop.run_until_complete(cond.acquire())
622        cond.notify()
623        cond.release()
624        test_utils.run_briefly(self.loop)
625        self.assertEqual([], result)
626
627        presult = True
628        self.loop.run_until_complete(cond.acquire())
629        cond.notify()
630        cond.release()
631        test_utils.run_briefly(self.loop)
632        self.assertEqual([1], result)
633
634        self.assertTrue(t.done())
635        self.assertTrue(t.result())
636
637    def test_wait_for_unacquired(self):
638        with self.assertWarns(DeprecationWarning):
639            cond = asyncio.Condition(loop=self.loop)
640
641        # predicate can return true immediately
642        res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3]))
643        self.assertEqual([1, 2, 3], res)
644
645        self.assertRaises(
646            RuntimeError,
647            self.loop.run_until_complete,
648            cond.wait_for(lambda: False))
649
650    def test_notify(self):
651        with self.assertWarns(DeprecationWarning):
652            cond = asyncio.Condition(loop=self.loop)
653        result = []
654
655        async def c1(result):
656            await cond.acquire()
657            if await cond.wait():
658                result.append(1)
659                cond.release()
660            return True
661
662        async def c2(result):
663            await cond.acquire()
664            if await cond.wait():
665                result.append(2)
666                cond.release()
667            return True
668
669        async def c3(result):
670            await cond.acquire()
671            if await cond.wait():
672                result.append(3)
673                cond.release()
674            return True
675
676        t1 = self.loop.create_task(c1(result))
677        t2 = self.loop.create_task(c2(result))
678        t3 = self.loop.create_task(c3(result))
679
680        test_utils.run_briefly(self.loop)
681        self.assertEqual([], result)
682
683        self.loop.run_until_complete(cond.acquire())
684        cond.notify(1)
685        cond.release()
686        test_utils.run_briefly(self.loop)
687        self.assertEqual([1], result)
688
689        self.loop.run_until_complete(cond.acquire())
690        cond.notify(1)
691        cond.notify(2048)
692        cond.release()
693        test_utils.run_briefly(self.loop)
694        self.assertEqual([1, 2, 3], result)
695
696        self.assertTrue(t1.done())
697        self.assertTrue(t1.result())
698        self.assertTrue(t2.done())
699        self.assertTrue(t2.result())
700        self.assertTrue(t3.done())
701        self.assertTrue(t3.result())
702
703    def test_notify_all(self):
704        with self.assertWarns(DeprecationWarning):
705            cond = asyncio.Condition(loop=self.loop)
706
707        result = []
708
709        async def c1(result):
710            await cond.acquire()
711            if await cond.wait():
712                result.append(1)
713                cond.release()
714            return True
715
716        async def c2(result):
717            await cond.acquire()
718            if await cond.wait():
719                result.append(2)
720                cond.release()
721            return True
722
723        t1 = self.loop.create_task(c1(result))
724        t2 = self.loop.create_task(c2(result))
725
726        test_utils.run_briefly(self.loop)
727        self.assertEqual([], result)
728
729        self.loop.run_until_complete(cond.acquire())
730        cond.notify_all()
731        cond.release()
732        test_utils.run_briefly(self.loop)
733        self.assertEqual([1, 2], result)
734
735        self.assertTrue(t1.done())
736        self.assertTrue(t1.result())
737        self.assertTrue(t2.done())
738        self.assertTrue(t2.result())
739
740    def test_notify_unacquired(self):
741        with self.assertWarns(DeprecationWarning):
742            cond = asyncio.Condition(loop=self.loop)
743        self.assertRaises(RuntimeError, cond.notify)
744
745    def test_notify_all_unacquired(self):
746        with self.assertWarns(DeprecationWarning):
747            cond = asyncio.Condition(loop=self.loop)
748        self.assertRaises(RuntimeError, cond.notify_all)
749
750    def test_repr(self):
751        with self.assertWarns(DeprecationWarning):
752            cond = asyncio.Condition(loop=self.loop)
753        self.assertTrue('unlocked' in repr(cond))
754        self.assertTrue(RGX_REPR.match(repr(cond)))
755
756        self.loop.run_until_complete(cond.acquire())
757        self.assertTrue('locked' in repr(cond))
758
759        cond._waiters.append(mock.Mock())
760        self.assertTrue('waiters:1' in repr(cond))
761        self.assertTrue(RGX_REPR.match(repr(cond)))
762
763        cond._waiters.append(mock.Mock())
764        self.assertTrue('waiters:2' in repr(cond))
765        self.assertTrue(RGX_REPR.match(repr(cond)))
766
767    def test_context_manager(self):
768        async def f():
769            cond = asyncio.Condition()
770            self.assertFalse(cond.locked())
771            async with cond:
772                self.assertTrue(cond.locked())
773            self.assertFalse(cond.locked())
774
775        self.loop.run_until_complete(f())
776
777    def test_explicit_lock(self):
778        with self.assertWarns(DeprecationWarning):
779            lock = asyncio.Lock(loop=self.loop)
780            cond = asyncio.Condition(lock, loop=self.loop)
781
782        self.assertIs(cond._lock, lock)
783        self.assertIs(cond._loop, lock._loop)
784
785    def test_ambiguous_loops(self):
786        loop = self.new_test_loop()
787        self.addCleanup(loop.close)
788        with self.assertWarns(DeprecationWarning):
789            lock = asyncio.Lock(loop=self.loop)
790            with self.assertRaises(ValueError):
791                asyncio.Condition(lock, loop=loop)
792
793    def test_timeout_in_block(self):
794        loop = asyncio.new_event_loop()
795        self.addCleanup(loop.close)
796
797        async def task_timeout():
798            condition = asyncio.Condition(loop=loop)
799            async with condition:
800                with self.assertRaises(asyncio.TimeoutError):
801                    await asyncio.wait_for(condition.wait(), timeout=0.5)
802
803        with self.assertWarns(DeprecationWarning):
804            loop.run_until_complete(task_timeout())
805
806
807class SemaphoreTests(test_utils.TestCase):
808
809    def setUp(self):
810        super().setUp()
811        self.loop = self.new_test_loop()
812
813    def test_ctor_loop(self):
814        loop = mock.Mock()
815        with self.assertWarns(DeprecationWarning):
816            sem = asyncio.Semaphore(loop=loop)
817        self.assertIs(sem._loop, loop)
818
819        with self.assertWarns(DeprecationWarning):
820            sem = asyncio.Semaphore(loop=self.loop)
821        self.assertIs(sem._loop, self.loop)
822
823    def test_ctor_noloop(self):
824        asyncio.set_event_loop(self.loop)
825        sem = asyncio.Semaphore()
826        self.assertIs(sem._loop, self.loop)
827
828    def test_initial_value_zero(self):
829        with self.assertWarns(DeprecationWarning):
830            sem = asyncio.Semaphore(0, loop=self.loop)
831        self.assertTrue(sem.locked())
832
833    def test_repr(self):
834        with self.assertWarns(DeprecationWarning):
835            sem = asyncio.Semaphore(loop=self.loop)
836        self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
837        self.assertTrue(RGX_REPR.match(repr(sem)))
838
839        self.loop.run_until_complete(sem.acquire())
840        self.assertTrue(repr(sem).endswith('[locked]>'))
841        self.assertTrue('waiters' not in repr(sem))
842        self.assertTrue(RGX_REPR.match(repr(sem)))
843
844        sem._waiters.append(mock.Mock())
845        self.assertTrue('waiters:1' in repr(sem))
846        self.assertTrue(RGX_REPR.match(repr(sem)))
847
848        sem._waiters.append(mock.Mock())
849        self.assertTrue('waiters:2' in repr(sem))
850        self.assertTrue(RGX_REPR.match(repr(sem)))
851
852    def test_semaphore(self):
853        with self.assertWarns(DeprecationWarning):
854            sem = asyncio.Semaphore(loop=self.loop)
855        self.assertEqual(1, sem._value)
856
857        with self.assertWarns(DeprecationWarning):
858            @asyncio.coroutine
859            def acquire_lock():
860                return (yield from sem)
861
862        with self.assertRaisesRegex(
863            TypeError,
864            "'Semaphore' object is not iterable",
865        ):
866            self.loop.run_until_complete(acquire_lock())
867
868        self.assertFalse(sem.locked())
869        self.assertEqual(1, sem._value)
870
871    def test_semaphore_value(self):
872        self.assertRaises(ValueError, asyncio.Semaphore, -1)
873
874    def test_acquire(self):
875        with self.assertWarns(DeprecationWarning):
876            sem = asyncio.Semaphore(3, loop=self.loop)
877        result = []
878
879        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
880        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
881        self.assertFalse(sem.locked())
882
883        async def c1(result):
884            await sem.acquire()
885            result.append(1)
886            return True
887
888        async def c2(result):
889            await sem.acquire()
890            result.append(2)
891            return True
892
893        async def c3(result):
894            await sem.acquire()
895            result.append(3)
896            return True
897
898        async def c4(result):
899            await sem.acquire()
900            result.append(4)
901            return True
902
903        t1 = self.loop.create_task(c1(result))
904        t2 = self.loop.create_task(c2(result))
905        t3 = self.loop.create_task(c3(result))
906
907        test_utils.run_briefly(self.loop)
908        self.assertEqual([1], result)
909        self.assertTrue(sem.locked())
910        self.assertEqual(2, len(sem._waiters))
911        self.assertEqual(0, sem._value)
912
913        t4 = self.loop.create_task(c4(result))
914
915        sem.release()
916        sem.release()
917        self.assertEqual(2, sem._value)
918
919        test_utils.run_briefly(self.loop)
920        self.assertEqual(0, sem._value)
921        self.assertEqual(3, len(result))
922        self.assertTrue(sem.locked())
923        self.assertEqual(1, len(sem._waiters))
924        self.assertEqual(0, sem._value)
925
926        self.assertTrue(t1.done())
927        self.assertTrue(t1.result())
928        race_tasks = [t2, t3, t4]
929        done_tasks = [t for t in race_tasks if t.done() and t.result()]
930        self.assertTrue(2, len(done_tasks))
931
932        # cleanup locked semaphore
933        sem.release()
934        self.loop.run_until_complete(asyncio.gather(*race_tasks))
935
936    def test_acquire_cancel(self):
937        with self.assertWarns(DeprecationWarning):
938            sem = asyncio.Semaphore(loop=self.loop)
939        self.loop.run_until_complete(sem.acquire())
940
941        acquire = self.loop.create_task(sem.acquire())
942        self.loop.call_soon(acquire.cancel)
943        self.assertRaises(
944            asyncio.CancelledError,
945            self.loop.run_until_complete, acquire)
946        self.assertTrue((not sem._waiters) or
947                        all(waiter.done() for waiter in sem._waiters))
948
949    def test_acquire_cancel_before_awoken(self):
950        with self.assertWarns(DeprecationWarning):
951            sem = asyncio.Semaphore(value=0, loop=self.loop)
952
953        t1 = self.loop.create_task(sem.acquire())
954        t2 = self.loop.create_task(sem.acquire())
955        t3 = self.loop.create_task(sem.acquire())
956        t4 = self.loop.create_task(sem.acquire())
957
958        test_utils.run_briefly(self.loop)
959
960        sem.release()
961        t1.cancel()
962        t2.cancel()
963
964        test_utils.run_briefly(self.loop)
965        num_done = sum(t.done() for t in [t3, t4])
966        self.assertEqual(num_done, 1)
967
968        t3.cancel()
969        t4.cancel()
970        test_utils.run_briefly(self.loop)
971
972    def test_acquire_hang(self):
973        with self.assertWarns(DeprecationWarning):
974            sem = asyncio.Semaphore(value=0, loop=self.loop)
975
976        t1 = self.loop.create_task(sem.acquire())
977        t2 = self.loop.create_task(sem.acquire())
978
979        test_utils.run_briefly(self.loop)
980
981        sem.release()
982        t1.cancel()
983
984        test_utils.run_briefly(self.loop)
985        self.assertTrue(sem.locked())
986
987    def test_release_not_acquired(self):
988        with self.assertWarns(DeprecationWarning):
989            sem = asyncio.BoundedSemaphore(loop=self.loop)
990
991        self.assertRaises(ValueError, sem.release)
992
993    def test_release_no_waiters(self):
994        with self.assertWarns(DeprecationWarning):
995            sem = asyncio.Semaphore(loop=self.loop)
996        self.loop.run_until_complete(sem.acquire())
997        self.assertTrue(sem.locked())
998
999        sem.release()
1000        self.assertFalse(sem.locked())
1001
1002
1003if __name__ == '__main__':
1004    unittest.main()
1005