• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for lock.py"""
2
3import unittest
4from unittest import mock
5import re
6
7import asyncio
8from test.test_asyncio import utils as test_utils
9
10STR_RGX_REPR = (
11    r'^<(?P<class>.*?) object at (?P<address>.*?)'
12    r'\[(?P<extras>'
13    r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?'
14    r')\]>\Z'
15)
16RGX_REPR = re.compile(STR_RGX_REPR)
17
18
19def tearDownModule():
20    asyncio.set_event_loop_policy(None)
21
22
23class LockTests(test_utils.TestCase):
24
25    def setUp(self):
26        super().setUp()
27        self.loop = self.new_test_loop()
28
29    def test_ctor_loop(self):
30        loop = mock.Mock()
31        with self.assertWarns(DeprecationWarning):
32            lock = asyncio.Lock(loop=loop)
33        self.assertIs(lock._loop, loop)
34
35        with self.assertWarns(DeprecationWarning):
36            lock = asyncio.Lock(loop=self.loop)
37        self.assertIs(lock._loop, self.loop)
38
39    def test_ctor_noloop(self):
40        asyncio.set_event_loop(self.loop)
41        lock = asyncio.Lock()
42        self.assertIs(lock._loop, self.loop)
43
44    def test_repr(self):
45        with self.assertWarns(DeprecationWarning):
46            lock = asyncio.Lock(loop=self.loop)
47        self.assertTrue(repr(lock).endswith('[unlocked]>'))
48        self.assertTrue(RGX_REPR.match(repr(lock)))
49
50        with self.assertWarns(DeprecationWarning):
51            @asyncio.coroutine
52            def acquire_lock():
53                with self.assertWarns(DeprecationWarning):
54                    yield from lock
55
56        self.loop.run_until_complete(acquire_lock())
57        self.assertTrue(repr(lock).endswith('[locked]>'))
58        self.assertTrue(RGX_REPR.match(repr(lock)))
59
60    def test_lock(self):
61        with self.assertWarns(DeprecationWarning):
62            lock = asyncio.Lock(loop=self.loop)
63
64
65            @asyncio.coroutine
66            def acquire_lock():
67                with self.assertWarns(DeprecationWarning):
68                    return (yield from lock)
69
70        res = self.loop.run_until_complete(acquire_lock())
71
72        self.assertTrue(res)
73        self.assertTrue(lock.locked())
74
75        lock.release()
76        self.assertFalse(lock.locked())
77
78    def test_lock_by_with_statement(self):
79        loop = asyncio.new_event_loop()  # don't use TestLoop quirks
80        self.set_event_loop(loop)
81        with self.assertWarns(DeprecationWarning):
82            primitives = [
83                asyncio.Lock(loop=loop),
84                asyncio.Condition(loop=loop),
85                asyncio.Semaphore(loop=loop),
86                asyncio.BoundedSemaphore(loop=loop),
87            ]
88
89            @asyncio.coroutine
90            def test(lock):
91                yield from asyncio.sleep(0.01)
92                self.assertFalse(lock.locked())
93                with self.assertWarns(DeprecationWarning):
94                    with (yield from lock) as _lock:
95                        self.assertIs(_lock, None)
96                        self.assertTrue(lock.locked())
97                        yield from asyncio.sleep(0.01)
98                        self.assertTrue(lock.locked())
99                    self.assertFalse(lock.locked())
100
101        for primitive in primitives:
102            loop.run_until_complete(test(primitive))
103            self.assertFalse(primitive.locked())
104
105    def test_acquire(self):
106        with self.assertWarns(DeprecationWarning):
107            lock = asyncio.Lock(loop=self.loop)
108        result = []
109
110        self.assertTrue(self.loop.run_until_complete(lock.acquire()))
111
112        async def c1(result):
113            if await lock.acquire():
114                result.append(1)
115            return True
116
117        async def c2(result):
118            if await lock.acquire():
119                result.append(2)
120            return True
121
122        async def c3(result):
123            if await lock.acquire():
124                result.append(3)
125            return True
126
127        t1 = self.loop.create_task(c1(result))
128        t2 = self.loop.create_task(c2(result))
129
130        test_utils.run_briefly(self.loop)
131        self.assertEqual([], result)
132
133        lock.release()
134        test_utils.run_briefly(self.loop)
135        self.assertEqual([1], result)
136
137        test_utils.run_briefly(self.loop)
138        self.assertEqual([1], result)
139
140        t3 = self.loop.create_task(c3(result))
141
142        lock.release()
143        test_utils.run_briefly(self.loop)
144        self.assertEqual([1, 2], result)
145
146        lock.release()
147        test_utils.run_briefly(self.loop)
148        self.assertEqual([1, 2, 3], result)
149
150        self.assertTrue(t1.done())
151        self.assertTrue(t1.result())
152        self.assertTrue(t2.done())
153        self.assertTrue(t2.result())
154        self.assertTrue(t3.done())
155        self.assertTrue(t3.result())
156
157    def test_acquire_cancel(self):
158        with self.assertWarns(DeprecationWarning):
159            lock = asyncio.Lock(loop=self.loop)
160        self.assertTrue(self.loop.run_until_complete(lock.acquire()))
161
162        task = self.loop.create_task(lock.acquire())
163        self.loop.call_soon(task.cancel)
164        self.assertRaises(
165            asyncio.CancelledError,
166            self.loop.run_until_complete, task)
167        self.assertFalse(lock._waiters)
168
169    def test_cancel_race(self):
170        # Several tasks:
171        # - A acquires the lock
172        # - B is blocked in acquire()
173        # - C is blocked in acquire()
174        #
175        # Now, concurrently:
176        # - B is cancelled
177        # - A releases the lock
178        #
179        # If B's waiter is marked cancelled but not yet removed from
180        # _waiters, A's release() call will crash when trying to set
181        # B's waiter; instead, it should move on to C's waiter.
182
183        # Setup: A has the lock, b and c are waiting.
184        with self.assertWarns(DeprecationWarning):
185            lock = asyncio.Lock(loop=self.loop)
186
187        async def lockit(name, blocker):
188            await lock.acquire()
189            try:
190                if blocker is not None:
191                    await blocker
192            finally:
193                lock.release()
194
195        fa = self.loop.create_future()
196        ta = self.loop.create_task(lockit('A', fa))
197        test_utils.run_briefly(self.loop)
198        self.assertTrue(lock.locked())
199        tb = self.loop.create_task(lockit('B', None))
200        test_utils.run_briefly(self.loop)
201        self.assertEqual(len(lock._waiters), 1)
202        tc = self.loop.create_task(lockit('C', None))
203        test_utils.run_briefly(self.loop)
204        self.assertEqual(len(lock._waiters), 2)
205
206        # Create the race and check.
207        # Without the fix this failed at the last assert.
208        fa.set_result(None)
209        tb.cancel()
210        self.assertTrue(lock._waiters[0].cancelled())
211        test_utils.run_briefly(self.loop)
212        self.assertFalse(lock.locked())
213        self.assertTrue(ta.done())
214        self.assertTrue(tb.cancelled())
215        self.assertTrue(tc.done())
216
217    def test_cancel_release_race(self):
218        # Issue 32734
219        # Acquire 4 locks, cancel second, release first
220        # and 2 locks are taken at once.
221        with self.assertWarns(DeprecationWarning):
222            lock = asyncio.Lock(loop=self.loop)
223        lock_count = 0
224        call_count = 0
225
226        async def lockit():
227            nonlocal lock_count
228            nonlocal call_count
229            call_count += 1
230            await lock.acquire()
231            lock_count += 1
232
233        async def lockandtrigger():
234            await lock.acquire()
235            self.loop.call_soon(trigger)
236
237        def trigger():
238            t1.cancel()
239            lock.release()
240
241        t0 = self.loop.create_task(lockandtrigger())
242        t1 = self.loop.create_task(lockit())
243        t2 = self.loop.create_task(lockit())
244        t3 = self.loop.create_task(lockit())
245
246        # First loop acquires all
247        test_utils.run_briefly(self.loop)
248        self.assertTrue(t0.done())
249
250        # Second loop calls trigger
251        test_utils.run_briefly(self.loop)
252        # Third loop calls cancellation
253        test_utils.run_briefly(self.loop)
254
255        # Make sure only one lock was taken
256        self.assertEqual(lock_count, 1)
257        # While 3 calls were made to lockit()
258        self.assertEqual(call_count, 3)
259        self.assertTrue(t1.cancelled() and t2.done())
260
261        # Cleanup the task that is stuck on acquire.
262        t3.cancel()
263        test_utils.run_briefly(self.loop)
264        self.assertTrue(t3.cancelled())
265
266    def test_finished_waiter_cancelled(self):
267        with self.assertWarns(DeprecationWarning):
268            lock = asyncio.Lock(loop=self.loop)
269
270        ta = self.loop.create_task(lock.acquire())
271        test_utils.run_briefly(self.loop)
272        self.assertTrue(lock.locked())
273
274        tb = self.loop.create_task(lock.acquire())
275        test_utils.run_briefly(self.loop)
276        self.assertEqual(len(lock._waiters), 1)
277
278        # Create a second waiter, wake up the first, and cancel it.
279        # Without the fix, the second was not woken up.
280        tc = self.loop.create_task(lock.acquire())
281        lock.release()
282        tb.cancel()
283        test_utils.run_briefly(self.loop)
284
285        self.assertTrue(lock.locked())
286        self.assertTrue(ta.done())
287        self.assertTrue(tb.cancelled())
288
289    def test_release_not_acquired(self):
290        with self.assertWarns(DeprecationWarning):
291            lock = asyncio.Lock(loop=self.loop)
292
293        self.assertRaises(RuntimeError, lock.release)
294
295    def test_release_no_waiters(self):
296        with self.assertWarns(DeprecationWarning):
297            lock = asyncio.Lock(loop=self.loop)
298        self.loop.run_until_complete(lock.acquire())
299        self.assertTrue(lock.locked())
300
301        lock.release()
302        self.assertFalse(lock.locked())
303
304    def test_context_manager(self):
305        with self.assertWarns(DeprecationWarning):
306            lock = asyncio.Lock(loop=self.loop)
307
308            @asyncio.coroutine
309            def acquire_lock():
310                with self.assertWarns(DeprecationWarning):
311                    return (yield from lock)
312
313        with self.loop.run_until_complete(acquire_lock()):
314            self.assertTrue(lock.locked())
315
316        self.assertFalse(lock.locked())
317
318    def test_context_manager_cant_reuse(self):
319        with self.assertWarns(DeprecationWarning):
320            lock = asyncio.Lock(loop=self.loop)
321
322            @asyncio.coroutine
323            def acquire_lock():
324                with self.assertWarns(DeprecationWarning):
325                    return (yield from lock)
326
327        # This spells "yield from lock" outside a generator.
328        cm = self.loop.run_until_complete(acquire_lock())
329        with cm:
330            self.assertTrue(lock.locked())
331
332        self.assertFalse(lock.locked())
333
334        with self.assertRaises(AttributeError):
335            with cm:
336                pass
337
338    def test_context_manager_no_yield(self):
339        with self.assertWarns(DeprecationWarning):
340            lock = asyncio.Lock(loop=self.loop)
341
342        try:
343            with lock:
344                self.fail('RuntimeError is not raised in with expression')
345        except RuntimeError as err:
346            self.assertEqual(
347                str(err),
348                '"yield from" should be used as context manager expression')
349
350        self.assertFalse(lock.locked())
351
352
353class EventTests(test_utils.TestCase):
354
355    def setUp(self):
356        super().setUp()
357        self.loop = self.new_test_loop()
358
359    def test_ctor_loop(self):
360        loop = mock.Mock()
361        with self.assertWarns(DeprecationWarning):
362            ev = asyncio.Event(loop=loop)
363        self.assertIs(ev._loop, loop)
364
365        with self.assertWarns(DeprecationWarning):
366            ev = asyncio.Event(loop=self.loop)
367        self.assertIs(ev._loop, self.loop)
368
369    def test_ctor_noloop(self):
370        asyncio.set_event_loop(self.loop)
371        ev = asyncio.Event()
372        self.assertIs(ev._loop, self.loop)
373
374    def test_repr(self):
375        with self.assertWarns(DeprecationWarning):
376            ev = asyncio.Event(loop=self.loop)
377        self.assertTrue(repr(ev).endswith('[unset]>'))
378        match = RGX_REPR.match(repr(ev))
379        self.assertEqual(match.group('extras'), 'unset')
380
381        ev.set()
382        self.assertTrue(repr(ev).endswith('[set]>'))
383        self.assertTrue(RGX_REPR.match(repr(ev)))
384
385        ev._waiters.append(mock.Mock())
386        self.assertTrue('waiters:1' in repr(ev))
387        self.assertTrue(RGX_REPR.match(repr(ev)))
388
389    def test_wait(self):
390        with self.assertWarns(DeprecationWarning):
391            ev = asyncio.Event(loop=self.loop)
392        self.assertFalse(ev.is_set())
393
394        result = []
395
396        async def c1(result):
397            if await ev.wait():
398                result.append(1)
399
400        async def c2(result):
401            if await ev.wait():
402                result.append(2)
403
404        async def c3(result):
405            if await ev.wait():
406                result.append(3)
407
408        t1 = self.loop.create_task(c1(result))
409        t2 = self.loop.create_task(c2(result))
410
411        test_utils.run_briefly(self.loop)
412        self.assertEqual([], result)
413
414        t3 = self.loop.create_task(c3(result))
415
416        ev.set()
417        test_utils.run_briefly(self.loop)
418        self.assertEqual([3, 1, 2], result)
419
420        self.assertTrue(t1.done())
421        self.assertIsNone(t1.result())
422        self.assertTrue(t2.done())
423        self.assertIsNone(t2.result())
424        self.assertTrue(t3.done())
425        self.assertIsNone(t3.result())
426
427    def test_wait_on_set(self):
428        with self.assertWarns(DeprecationWarning):
429            ev = asyncio.Event(loop=self.loop)
430        ev.set()
431
432        res = self.loop.run_until_complete(ev.wait())
433        self.assertTrue(res)
434
435    def test_wait_cancel(self):
436        with self.assertWarns(DeprecationWarning):
437            ev = asyncio.Event(loop=self.loop)
438
439        wait = self.loop.create_task(ev.wait())
440        self.loop.call_soon(wait.cancel)
441        self.assertRaises(
442            asyncio.CancelledError,
443            self.loop.run_until_complete, wait)
444        self.assertFalse(ev._waiters)
445
446    def test_clear(self):
447        with self.assertWarns(DeprecationWarning):
448            ev = asyncio.Event(loop=self.loop)
449        self.assertFalse(ev.is_set())
450
451        ev.set()
452        self.assertTrue(ev.is_set())
453
454        ev.clear()
455        self.assertFalse(ev.is_set())
456
457    def test_clear_with_waiters(self):
458        with self.assertWarns(DeprecationWarning):
459            ev = asyncio.Event(loop=self.loop)
460        result = []
461
462        async def c1(result):
463            if await ev.wait():
464                result.append(1)
465            return True
466
467        t = self.loop.create_task(c1(result))
468        test_utils.run_briefly(self.loop)
469        self.assertEqual([], result)
470
471        ev.set()
472        ev.clear()
473        self.assertFalse(ev.is_set())
474
475        ev.set()
476        ev.set()
477        self.assertEqual(1, len(ev._waiters))
478
479        test_utils.run_briefly(self.loop)
480        self.assertEqual([1], result)
481        self.assertEqual(0, len(ev._waiters))
482
483        self.assertTrue(t.done())
484        self.assertTrue(t.result())
485
486
487class ConditionTests(test_utils.TestCase):
488
489    def setUp(self):
490        super().setUp()
491        self.loop = self.new_test_loop()
492
493    def test_ctor_loop(self):
494        loop = mock.Mock()
495        with self.assertWarns(DeprecationWarning):
496            cond = asyncio.Condition(loop=loop)
497            self.assertIs(cond._loop, loop)
498
499            cond = asyncio.Condition(loop=self.loop)
500            self.assertIs(cond._loop, self.loop)
501
502    def test_ctor_noloop(self):
503        asyncio.set_event_loop(self.loop)
504        cond = asyncio.Condition()
505        self.assertIs(cond._loop, self.loop)
506
507    def test_wait(self):
508        with self.assertWarns(DeprecationWarning):
509            cond = asyncio.Condition(loop=self.loop)
510        result = []
511
512        async def c1(result):
513            await cond.acquire()
514            if await cond.wait():
515                result.append(1)
516            return True
517
518        async def c2(result):
519            await cond.acquire()
520            if await cond.wait():
521                result.append(2)
522            return True
523
524        async def c3(result):
525            await cond.acquire()
526            if await cond.wait():
527                result.append(3)
528            return True
529
530        t1 = self.loop.create_task(c1(result))
531        t2 = self.loop.create_task(c2(result))
532        t3 = self.loop.create_task(c3(result))
533
534        test_utils.run_briefly(self.loop)
535        self.assertEqual([], result)
536        self.assertFalse(cond.locked())
537
538        self.assertTrue(self.loop.run_until_complete(cond.acquire()))
539        cond.notify()
540        test_utils.run_briefly(self.loop)
541        self.assertEqual([], result)
542        self.assertTrue(cond.locked())
543
544        cond.release()
545        test_utils.run_briefly(self.loop)
546        self.assertEqual([1], result)
547        self.assertTrue(cond.locked())
548
549        cond.notify(2)
550        test_utils.run_briefly(self.loop)
551        self.assertEqual([1], result)
552        self.assertTrue(cond.locked())
553
554        cond.release()
555        test_utils.run_briefly(self.loop)
556        self.assertEqual([1, 2], result)
557        self.assertTrue(cond.locked())
558
559        cond.release()
560        test_utils.run_briefly(self.loop)
561        self.assertEqual([1, 2, 3], result)
562        self.assertTrue(cond.locked())
563
564        self.assertTrue(t1.done())
565        self.assertTrue(t1.result())
566        self.assertTrue(t2.done())
567        self.assertTrue(t2.result())
568        self.assertTrue(t3.done())
569        self.assertTrue(t3.result())
570
571    def test_wait_cancel(self):
572        with self.assertWarns(DeprecationWarning):
573            cond = asyncio.Condition(loop=self.loop)
574        self.loop.run_until_complete(cond.acquire())
575
576        wait = self.loop.create_task(cond.wait())
577        self.loop.call_soon(wait.cancel)
578        self.assertRaises(
579            asyncio.CancelledError,
580            self.loop.run_until_complete, wait)
581        self.assertFalse(cond._waiters)
582        self.assertTrue(cond.locked())
583
584    def test_wait_cancel_contested(self):
585        with self.assertWarns(DeprecationWarning):
586            cond = asyncio.Condition(loop=self.loop)
587
588        self.loop.run_until_complete(cond.acquire())
589        self.assertTrue(cond.locked())
590
591        wait_task = self.loop.create_task(cond.wait())
592        test_utils.run_briefly(self.loop)
593        self.assertFalse(cond.locked())
594
595        # Notify, but contest the lock before cancelling
596        self.loop.run_until_complete(cond.acquire())
597        self.assertTrue(cond.locked())
598        cond.notify()
599        self.loop.call_soon(wait_task.cancel)
600        self.loop.call_soon(cond.release)
601
602        try:
603            self.loop.run_until_complete(wait_task)
604        except asyncio.CancelledError:
605            # Should not happen, since no cancellation points
606            pass
607
608        self.assertTrue(cond.locked())
609
610    def test_wait_cancel_after_notify(self):
611        # See bpo-32841
612        with self.assertWarns(DeprecationWarning):
613            cond = asyncio.Condition(loop=self.loop)
614        waited = False
615
616        async def wait_on_cond():
617            nonlocal waited
618            async with cond:
619                waited = True  # Make sure this area was reached
620                await cond.wait()
621
622        waiter = asyncio.ensure_future(wait_on_cond(), loop=self.loop)
623        test_utils.run_briefly(self.loop)  # Start waiting
624
625        self.loop.run_until_complete(cond.acquire())
626        cond.notify()
627        test_utils.run_briefly(self.loop)  # Get to acquire()
628        waiter.cancel()
629        test_utils.run_briefly(self.loop)  # Activate cancellation
630        cond.release()
631        test_utils.run_briefly(self.loop)  # Cancellation should occur
632
633        self.assertTrue(waiter.cancelled())
634        self.assertTrue(waited)
635
636    def test_wait_unacquired(self):
637        with self.assertWarns(DeprecationWarning):
638            cond = asyncio.Condition(loop=self.loop)
639        self.assertRaises(
640            RuntimeError,
641            self.loop.run_until_complete, cond.wait())
642
643    def test_wait_for(self):
644        with self.assertWarns(DeprecationWarning):
645            cond = asyncio.Condition(loop=self.loop)
646        presult = False
647
648        def predicate():
649            return presult
650
651        result = []
652
653        async def c1(result):
654            await cond.acquire()
655            if await cond.wait_for(predicate):
656                result.append(1)
657                cond.release()
658            return True
659
660        t = self.loop.create_task(c1(result))
661
662        test_utils.run_briefly(self.loop)
663        self.assertEqual([], result)
664
665        self.loop.run_until_complete(cond.acquire())
666        cond.notify()
667        cond.release()
668        test_utils.run_briefly(self.loop)
669        self.assertEqual([], result)
670
671        presult = True
672        self.loop.run_until_complete(cond.acquire())
673        cond.notify()
674        cond.release()
675        test_utils.run_briefly(self.loop)
676        self.assertEqual([1], result)
677
678        self.assertTrue(t.done())
679        self.assertTrue(t.result())
680
681    def test_wait_for_unacquired(self):
682        with self.assertWarns(DeprecationWarning):
683            cond = asyncio.Condition(loop=self.loop)
684
685        # predicate can return true immediately
686        res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3]))
687        self.assertEqual([1, 2, 3], res)
688
689        self.assertRaises(
690            RuntimeError,
691            self.loop.run_until_complete,
692            cond.wait_for(lambda: False))
693
694    def test_notify(self):
695        with self.assertWarns(DeprecationWarning):
696            cond = asyncio.Condition(loop=self.loop)
697        result = []
698
699        async def c1(result):
700            await cond.acquire()
701            if await cond.wait():
702                result.append(1)
703                cond.release()
704            return True
705
706        async def c2(result):
707            await cond.acquire()
708            if await cond.wait():
709                result.append(2)
710                cond.release()
711            return True
712
713        async def c3(result):
714            await cond.acquire()
715            if await cond.wait():
716                result.append(3)
717                cond.release()
718            return True
719
720        t1 = self.loop.create_task(c1(result))
721        t2 = self.loop.create_task(c2(result))
722        t3 = self.loop.create_task(c3(result))
723
724        test_utils.run_briefly(self.loop)
725        self.assertEqual([], result)
726
727        self.loop.run_until_complete(cond.acquire())
728        cond.notify(1)
729        cond.release()
730        test_utils.run_briefly(self.loop)
731        self.assertEqual([1], result)
732
733        self.loop.run_until_complete(cond.acquire())
734        cond.notify(1)
735        cond.notify(2048)
736        cond.release()
737        test_utils.run_briefly(self.loop)
738        self.assertEqual([1, 2, 3], result)
739
740        self.assertTrue(t1.done())
741        self.assertTrue(t1.result())
742        self.assertTrue(t2.done())
743        self.assertTrue(t2.result())
744        self.assertTrue(t3.done())
745        self.assertTrue(t3.result())
746
747    def test_notify_all(self):
748        with self.assertWarns(DeprecationWarning):
749            cond = asyncio.Condition(loop=self.loop)
750
751        result = []
752
753        async def c1(result):
754            await cond.acquire()
755            if await cond.wait():
756                result.append(1)
757                cond.release()
758            return True
759
760        async def c2(result):
761            await cond.acquire()
762            if await cond.wait():
763                result.append(2)
764                cond.release()
765            return True
766
767        t1 = self.loop.create_task(c1(result))
768        t2 = self.loop.create_task(c2(result))
769
770        test_utils.run_briefly(self.loop)
771        self.assertEqual([], result)
772
773        self.loop.run_until_complete(cond.acquire())
774        cond.notify_all()
775        cond.release()
776        test_utils.run_briefly(self.loop)
777        self.assertEqual([1, 2], result)
778
779        self.assertTrue(t1.done())
780        self.assertTrue(t1.result())
781        self.assertTrue(t2.done())
782        self.assertTrue(t2.result())
783
784    def test_notify_unacquired(self):
785        with self.assertWarns(DeprecationWarning):
786            cond = asyncio.Condition(loop=self.loop)
787        self.assertRaises(RuntimeError, cond.notify)
788
789    def test_notify_all_unacquired(self):
790        with self.assertWarns(DeprecationWarning):
791            cond = asyncio.Condition(loop=self.loop)
792        self.assertRaises(RuntimeError, cond.notify_all)
793
794    def test_repr(self):
795        with self.assertWarns(DeprecationWarning):
796            cond = asyncio.Condition(loop=self.loop)
797        self.assertTrue('unlocked' in repr(cond))
798        self.assertTrue(RGX_REPR.match(repr(cond)))
799
800        self.loop.run_until_complete(cond.acquire())
801        self.assertTrue('locked' in repr(cond))
802
803        cond._waiters.append(mock.Mock())
804        self.assertTrue('waiters:1' in repr(cond))
805        self.assertTrue(RGX_REPR.match(repr(cond)))
806
807        cond._waiters.append(mock.Mock())
808        self.assertTrue('waiters:2' in repr(cond))
809        self.assertTrue(RGX_REPR.match(repr(cond)))
810
811    def test_context_manager(self):
812        with self.assertWarns(DeprecationWarning):
813            cond = asyncio.Condition(loop=self.loop)
814
815        with self.assertWarns(DeprecationWarning):
816            @asyncio.coroutine
817            def acquire_cond():
818                with self.assertWarns(DeprecationWarning):
819                    return (yield from cond)
820
821        with self.loop.run_until_complete(acquire_cond()):
822            self.assertTrue(cond.locked())
823
824        self.assertFalse(cond.locked())
825
826    def test_context_manager_no_yield(self):
827        with self.assertWarns(DeprecationWarning):
828            cond = asyncio.Condition(loop=self.loop)
829
830        try:
831            with cond:
832                self.fail('RuntimeError is not raised in with expression')
833        except RuntimeError as err:
834            self.assertEqual(
835                str(err),
836                '"yield from" should be used as context manager expression')
837
838        self.assertFalse(cond.locked())
839
840    def test_explicit_lock(self):
841        with self.assertWarns(DeprecationWarning):
842            lock = asyncio.Lock(loop=self.loop)
843            cond = asyncio.Condition(lock, loop=self.loop)
844
845        self.assertIs(cond._lock, lock)
846        self.assertIs(cond._loop, lock._loop)
847
848    def test_ambiguous_loops(self):
849        loop = self.new_test_loop()
850        self.addCleanup(loop.close)
851        with self.assertWarns(DeprecationWarning):
852            lock = asyncio.Lock(loop=self.loop)
853            with self.assertRaises(ValueError):
854                asyncio.Condition(lock, loop=loop)
855
856    def test_timeout_in_block(self):
857        loop = asyncio.new_event_loop()
858        self.addCleanup(loop.close)
859
860        async def task_timeout():
861            condition = asyncio.Condition(loop=loop)
862            async with condition:
863                with self.assertRaises(asyncio.TimeoutError):
864                    await asyncio.wait_for(condition.wait(), timeout=0.5)
865
866        with self.assertWarns(DeprecationWarning):
867            loop.run_until_complete(task_timeout())
868
869
870class SemaphoreTests(test_utils.TestCase):
871
872    def setUp(self):
873        super().setUp()
874        self.loop = self.new_test_loop()
875
876    def test_ctor_loop(self):
877        loop = mock.Mock()
878        with self.assertWarns(DeprecationWarning):
879            sem = asyncio.Semaphore(loop=loop)
880        self.assertIs(sem._loop, loop)
881
882        with self.assertWarns(DeprecationWarning):
883            sem = asyncio.Semaphore(loop=self.loop)
884        self.assertIs(sem._loop, self.loop)
885
886    def test_ctor_noloop(self):
887        asyncio.set_event_loop(self.loop)
888        sem = asyncio.Semaphore()
889        self.assertIs(sem._loop, self.loop)
890
891    def test_initial_value_zero(self):
892        with self.assertWarns(DeprecationWarning):
893            sem = asyncio.Semaphore(0, loop=self.loop)
894        self.assertTrue(sem.locked())
895
896    def test_repr(self):
897        with self.assertWarns(DeprecationWarning):
898            sem = asyncio.Semaphore(loop=self.loop)
899        self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
900        self.assertTrue(RGX_REPR.match(repr(sem)))
901
902        self.loop.run_until_complete(sem.acquire())
903        self.assertTrue(repr(sem).endswith('[locked]>'))
904        self.assertTrue('waiters' not in repr(sem))
905        self.assertTrue(RGX_REPR.match(repr(sem)))
906
907        sem._waiters.append(mock.Mock())
908        self.assertTrue('waiters:1' in repr(sem))
909        self.assertTrue(RGX_REPR.match(repr(sem)))
910
911        sem._waiters.append(mock.Mock())
912        self.assertTrue('waiters:2' in repr(sem))
913        self.assertTrue(RGX_REPR.match(repr(sem)))
914
915    def test_semaphore(self):
916        with self.assertWarns(DeprecationWarning):
917            sem = asyncio.Semaphore(loop=self.loop)
918        self.assertEqual(1, sem._value)
919
920        with self.assertWarns(DeprecationWarning):
921            @asyncio.coroutine
922            def acquire_lock():
923                with self.assertWarns(DeprecationWarning):
924                    return (yield from sem)
925
926        res = self.loop.run_until_complete(acquire_lock())
927
928        self.assertTrue(res)
929        self.assertTrue(sem.locked())
930        self.assertEqual(0, sem._value)
931
932        sem.release()
933        self.assertFalse(sem.locked())
934        self.assertEqual(1, sem._value)
935
936    def test_semaphore_value(self):
937        self.assertRaises(ValueError, asyncio.Semaphore, -1)
938
939    def test_acquire(self):
940        with self.assertWarns(DeprecationWarning):
941            sem = asyncio.Semaphore(3, loop=self.loop)
942        result = []
943
944        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
945        self.assertTrue(self.loop.run_until_complete(sem.acquire()))
946        self.assertFalse(sem.locked())
947
948        async def c1(result):
949            await sem.acquire()
950            result.append(1)
951            return True
952
953        async def c2(result):
954            await sem.acquire()
955            result.append(2)
956            return True
957
958        async def c3(result):
959            await sem.acquire()
960            result.append(3)
961            return True
962
963        async def c4(result):
964            await sem.acquire()
965            result.append(4)
966            return True
967
968        t1 = self.loop.create_task(c1(result))
969        t2 = self.loop.create_task(c2(result))
970        t3 = self.loop.create_task(c3(result))
971
972        test_utils.run_briefly(self.loop)
973        self.assertEqual([1], result)
974        self.assertTrue(sem.locked())
975        self.assertEqual(2, len(sem._waiters))
976        self.assertEqual(0, sem._value)
977
978        t4 = self.loop.create_task(c4(result))
979
980        sem.release()
981        sem.release()
982        self.assertEqual(2, sem._value)
983
984        test_utils.run_briefly(self.loop)
985        self.assertEqual(0, sem._value)
986        self.assertEqual(3, len(result))
987        self.assertTrue(sem.locked())
988        self.assertEqual(1, len(sem._waiters))
989        self.assertEqual(0, sem._value)
990
991        self.assertTrue(t1.done())
992        self.assertTrue(t1.result())
993        race_tasks = [t2, t3, t4]
994        done_tasks = [t for t in race_tasks if t.done() and t.result()]
995        self.assertTrue(2, len(done_tasks))
996
997        # cleanup locked semaphore
998        sem.release()
999        self.loop.run_until_complete(asyncio.gather(*race_tasks))
1000
1001    def test_acquire_cancel(self):
1002        with self.assertWarns(DeprecationWarning):
1003            sem = asyncio.Semaphore(loop=self.loop)
1004        self.loop.run_until_complete(sem.acquire())
1005
1006        acquire = self.loop.create_task(sem.acquire())
1007        self.loop.call_soon(acquire.cancel)
1008        self.assertRaises(
1009            asyncio.CancelledError,
1010            self.loop.run_until_complete, acquire)
1011        self.assertTrue((not sem._waiters) or
1012                        all(waiter.done() for waiter in sem._waiters))
1013
1014    def test_acquire_cancel_before_awoken(self):
1015        with self.assertWarns(DeprecationWarning):
1016            sem = asyncio.Semaphore(value=0, loop=self.loop)
1017
1018        t1 = self.loop.create_task(sem.acquire())
1019        t2 = self.loop.create_task(sem.acquire())
1020        t3 = self.loop.create_task(sem.acquire())
1021        t4 = self.loop.create_task(sem.acquire())
1022
1023        test_utils.run_briefly(self.loop)
1024
1025        sem.release()
1026        t1.cancel()
1027        t2.cancel()
1028
1029        test_utils.run_briefly(self.loop)
1030        num_done = sum(t.done() for t in [t3, t4])
1031        self.assertEqual(num_done, 1)
1032
1033        t3.cancel()
1034        t4.cancel()
1035        test_utils.run_briefly(self.loop)
1036
1037    def test_acquire_hang(self):
1038        with self.assertWarns(DeprecationWarning):
1039            sem = asyncio.Semaphore(value=0, loop=self.loop)
1040
1041        t1 = self.loop.create_task(sem.acquire())
1042        t2 = self.loop.create_task(sem.acquire())
1043
1044        test_utils.run_briefly(self.loop)
1045
1046        sem.release()
1047        t1.cancel()
1048
1049        test_utils.run_briefly(self.loop)
1050        self.assertTrue(sem.locked())
1051
1052    def test_release_not_acquired(self):
1053        with self.assertWarns(DeprecationWarning):
1054            sem = asyncio.BoundedSemaphore(loop=self.loop)
1055
1056        self.assertRaises(ValueError, sem.release)
1057
1058    def test_release_no_waiters(self):
1059        with self.assertWarns(DeprecationWarning):
1060            sem = asyncio.Semaphore(loop=self.loop)
1061        self.loop.run_until_complete(sem.acquire())
1062        self.assertTrue(sem.locked())
1063
1064        sem.release()
1065        self.assertFalse(sem.locked())
1066
1067    def test_context_manager(self):
1068        with self.assertWarns(DeprecationWarning):
1069            sem = asyncio.Semaphore(2, loop=self.loop)
1070
1071            @asyncio.coroutine
1072            def acquire_lock():
1073                with self.assertWarns(DeprecationWarning):
1074                    return (yield from sem)
1075
1076        with self.loop.run_until_complete(acquire_lock()):
1077            self.assertFalse(sem.locked())
1078            self.assertEqual(1, sem._value)
1079
1080            with self.loop.run_until_complete(acquire_lock()):
1081                self.assertTrue(sem.locked())
1082
1083        self.assertEqual(2, sem._value)
1084
1085    def test_context_manager_no_yield(self):
1086        with self.assertWarns(DeprecationWarning):
1087            sem = asyncio.Semaphore(2, loop=self.loop)
1088
1089        try:
1090            with sem:
1091                self.fail('RuntimeError is not raised in with expression')
1092        except RuntimeError as err:
1093            self.assertEqual(
1094                str(err),
1095                '"yield from" should be used as context manager expression')
1096
1097        self.assertEqual(2, sem._value)
1098
1099
1100if __name__ == '__main__':
1101    unittest.main()
1102