• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import gc
7import io
8import random
9import re
10import sys
11import traceback
12import types
13import unittest
14from unittest import mock
15from types import GenericAlias
16
17import asyncio
18from asyncio import futures
19from asyncio import tasks
20from test.test_asyncio import utils as test_utils
21from test import support
22from test.support.script_helper import assert_python_ok
23
24
25def tearDownModule():
26    asyncio.set_event_loop_policy(None)
27
28
29async def coroutine_function():
30    pass
31
32
33def format_coroutine(qualname, state, src, source_traceback, generator=False):
34    if generator:
35        state = '%s' % state
36    else:
37        state = '%s, defined' % state
38    if source_traceback is not None:
39        frame = source_traceback[-1]
40        return ('coro=<%s() %s at %s> created at %s:%s'
41                % (qualname, state, src, frame[0], frame[1]))
42    else:
43        return 'coro=<%s() %s at %s>' % (qualname, state, src)
44
45
46def get_innermost_context(exc):
47    """
48    Return information about the innermost exception context in the chain.
49    """
50    depth = 0
51    while True:
52        context = exc.__context__
53        if context is None:
54            break
55
56        exc = context
57        depth += 1
58
59    return (type(exc), exc.args, depth)
60
61
62class Dummy:
63
64    def __repr__(self):
65        return '<Dummy>'
66
67    def __call__(self, *args):
68        pass
69
70
71class CoroLikeObject:
72    def send(self, v):
73        raise StopIteration(42)
74
75    def throw(self, *exc):
76        pass
77
78    def close(self):
79        pass
80
81    def __await__(self):
82        return self
83
84
85class BaseTaskTests:
86
87    Task = None
88    Future = None
89
90    def new_task(self, loop, coro, name='TestTask', context=None):
91        return self.__class__.Task(coro, loop=loop, name=name, context=context)
92
93    def new_future(self, loop):
94        return self.__class__.Future(loop=loop)
95
96    def setUp(self):
97        super().setUp()
98        self.loop = self.new_test_loop()
99        self.loop.set_task_factory(self.new_task)
100        self.loop.create_future = lambda: self.new_future(self.loop)
101
102    def test_generic_alias(self):
103        task = self.__class__.Task[str]
104        self.assertEqual(task.__args__, (str,))
105        self.assertIsInstance(task, GenericAlias)
106
107    def test_task_cancel_message_getter(self):
108        async def coro():
109            pass
110        t = self.new_task(self.loop, coro())
111        self.assertTrue(hasattr(t, '_cancel_message'))
112        self.assertEqual(t._cancel_message, None)
113
114        t.cancel('my message')
115        self.assertEqual(t._cancel_message, 'my message')
116
117        with self.assertRaises(asyncio.CancelledError) as cm:
118            self.loop.run_until_complete(t)
119
120        self.assertEqual('my message', cm.exception.args[0])
121
122    def test_task_cancel_message_setter(self):
123        async def coro():
124            pass
125        t = self.new_task(self.loop, coro())
126        t.cancel('my message')
127        t._cancel_message = 'my new message'
128        self.assertEqual(t._cancel_message, 'my new message')
129
130        with self.assertRaises(asyncio.CancelledError) as cm:
131            self.loop.run_until_complete(t)
132
133        self.assertEqual('my new message', cm.exception.args[0])
134
135    def test_task_del_collect(self):
136        class Evil:
137            def __del__(self):
138                gc.collect()
139
140        async def run():
141            return Evil()
142
143        self.loop.run_until_complete(
144            asyncio.gather(*[
145                self.new_task(self.loop, run()) for _ in range(100)
146            ]))
147
148    def test_other_loop_future(self):
149        other_loop = asyncio.new_event_loop()
150        fut = self.new_future(other_loop)
151
152        async def run(fut):
153            await fut
154
155        try:
156            with self.assertRaisesRegex(RuntimeError,
157                                        r'Task .* got Future .* attached'):
158                self.loop.run_until_complete(run(fut))
159        finally:
160            other_loop.close()
161
162    def test_task_awaits_on_itself(self):
163
164        async def test():
165            await task
166
167        task = asyncio.ensure_future(test(), loop=self.loop)
168
169        with self.assertRaisesRegex(RuntimeError,
170                                    'Task cannot await on itself'):
171            self.loop.run_until_complete(task)
172
173    def test_task_class(self):
174        async def notmuch():
175            return 'ok'
176        t = self.new_task(self.loop, notmuch())
177        self.loop.run_until_complete(t)
178        self.assertTrue(t.done())
179        self.assertEqual(t.result(), 'ok')
180        self.assertIs(t._loop, self.loop)
181        self.assertIs(t.get_loop(), self.loop)
182
183        loop = asyncio.new_event_loop()
184        self.set_event_loop(loop)
185        t = self.new_task(loop, notmuch())
186        self.assertIs(t._loop, loop)
187        loop.run_until_complete(t)
188        loop.close()
189
190    def test_ensure_future_coroutine(self):
191        async def notmuch():
192            return 'ok'
193        t = asyncio.ensure_future(notmuch(), loop=self.loop)
194        self.assertIs(t._loop, self.loop)
195        self.loop.run_until_complete(t)
196        self.assertTrue(t.done())
197        self.assertEqual(t.result(), 'ok')
198
199        a = notmuch()
200        self.addCleanup(a.close)
201        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
202            asyncio.ensure_future(a)
203
204        async def test():
205            return asyncio.ensure_future(notmuch())
206        t = self.loop.run_until_complete(test())
207        self.assertIs(t._loop, self.loop)
208        self.loop.run_until_complete(t)
209        self.assertTrue(t.done())
210        self.assertEqual(t.result(), 'ok')
211
212        # Deprecated in 3.10, undeprecated in 3.12
213        asyncio.set_event_loop(self.loop)
214        self.addCleanup(asyncio.set_event_loop, None)
215        t = asyncio.ensure_future(notmuch())
216        self.assertIs(t._loop, self.loop)
217        self.loop.run_until_complete(t)
218        self.assertTrue(t.done())
219        self.assertEqual(t.result(), 'ok')
220
221    def test_ensure_future_future(self):
222        f_orig = self.new_future(self.loop)
223        f_orig.set_result('ko')
224
225        f = asyncio.ensure_future(f_orig)
226        self.loop.run_until_complete(f)
227        self.assertTrue(f.done())
228        self.assertEqual(f.result(), 'ko')
229        self.assertIs(f, f_orig)
230
231        loop = asyncio.new_event_loop()
232        self.set_event_loop(loop)
233
234        with self.assertRaises(ValueError):
235            f = asyncio.ensure_future(f_orig, loop=loop)
236
237        loop.close()
238
239        f = asyncio.ensure_future(f_orig, loop=self.loop)
240        self.assertIs(f, f_orig)
241
242    def test_ensure_future_task(self):
243        async def notmuch():
244            return 'ok'
245        t_orig = self.new_task(self.loop, notmuch())
246        t = asyncio.ensure_future(t_orig)
247        self.loop.run_until_complete(t)
248        self.assertTrue(t.done())
249        self.assertEqual(t.result(), 'ok')
250        self.assertIs(t, t_orig)
251
252        loop = asyncio.new_event_loop()
253        self.set_event_loop(loop)
254
255        with self.assertRaises(ValueError):
256            t = asyncio.ensure_future(t_orig, loop=loop)
257
258        loop.close()
259
260        t = asyncio.ensure_future(t_orig, loop=self.loop)
261        self.assertIs(t, t_orig)
262
263    def test_ensure_future_awaitable(self):
264        class Aw:
265            def __init__(self, coro):
266                self.coro = coro
267            def __await__(self):
268                return self.coro.__await__()
269
270        async def coro():
271            return 'ok'
272
273        loop = asyncio.new_event_loop()
274        self.set_event_loop(loop)
275        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
276        loop.run_until_complete(fut)
277        self.assertEqual(fut.result(), 'ok')
278
279    def test_ensure_future_task_awaitable(self):
280        class Aw:
281            def __await__(self):
282                return asyncio.sleep(0, result='ok').__await__()
283
284        loop = asyncio.new_event_loop()
285        self.set_event_loop(loop)
286        task = asyncio.ensure_future(Aw(), loop=loop)
287        loop.run_until_complete(task)
288        self.assertTrue(task.done())
289        self.assertEqual(task.result(), 'ok')
290        self.assertIsInstance(task.get_coro(), types.CoroutineType)
291        loop.close()
292
293    def test_ensure_future_neither(self):
294        with self.assertRaises(TypeError):
295            asyncio.ensure_future('ok')
296
297    def test_ensure_future_error_msg(self):
298        loop = asyncio.new_event_loop()
299        f = self.new_future(self.loop)
300        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
301                                    'different loop than the one specified as '
302                                    'the loop argument'):
303            asyncio.ensure_future(f, loop=loop)
304        loop.close()
305
306    def test_get_stack(self):
307        T = None
308
309        async def foo():
310            await bar()
311
312        async def bar():
313            # test get_stack()
314            f = T.get_stack(limit=1)
315            try:
316                self.assertEqual(f[0].f_code.co_name, 'foo')
317            finally:
318                f = None
319
320            # test print_stack()
321            file = io.StringIO()
322            T.print_stack(limit=1, file=file)
323            file.seek(0)
324            tb = file.read()
325            self.assertRegex(tb, r'foo\(\) running')
326
327        async def runner():
328            nonlocal T
329            T = asyncio.ensure_future(foo(), loop=self.loop)
330            await T
331
332        self.loop.run_until_complete(runner())
333
334    def test_task_repr(self):
335        self.loop.set_debug(False)
336
337        async def notmuch():
338            return 'abc'
339
340        # test coroutine function
341        self.assertEqual(notmuch.__name__, 'notmuch')
342        self.assertRegex(notmuch.__qualname__,
343                         r'\w+.test_task_repr.<locals>.notmuch')
344        self.assertEqual(notmuch.__module__, __name__)
345
346        filename, lineno = test_utils.get_function_source(notmuch)
347        src = "%s:%s" % (filename, lineno)
348
349        # test coroutine object
350        gen = notmuch()
351        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
352        self.assertEqual(gen.__name__, 'notmuch')
353        self.assertEqual(gen.__qualname__, coro_qualname)
354
355        # test pending Task
356        t = self.new_task(self.loop, gen)
357        t.add_done_callback(Dummy())
358
359        coro = format_coroutine(coro_qualname, 'running', src,
360                                t._source_traceback, generator=True)
361        self.assertEqual(repr(t),
362                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
363
364        # test cancelling Task
365        t.cancel()  # Does not take immediate effect!
366        self.assertEqual(repr(t),
367                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
368
369        # test cancelled Task
370        self.assertRaises(asyncio.CancelledError,
371                          self.loop.run_until_complete, t)
372        coro = format_coroutine(coro_qualname, 'done', src,
373                                t._source_traceback)
374        self.assertEqual(repr(t),
375                         "<Task cancelled name='TestTask' %s>" % coro)
376
377        # test finished Task
378        t = self.new_task(self.loop, notmuch())
379        self.loop.run_until_complete(t)
380        coro = format_coroutine(coro_qualname, 'done', src,
381                                t._source_traceback)
382        self.assertEqual(repr(t),
383                         "<Task finished name='TestTask' %s result='abc'>" % coro)
384
385    def test_task_repr_autogenerated(self):
386        async def notmuch():
387            return 123
388
389        t1 = self.new_task(self.loop, notmuch(), None)
390        t2 = self.new_task(self.loop, notmuch(), None)
391        self.assertNotEqual(repr(t1), repr(t2))
392
393        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
394        self.assertIsNotNone(match1)
395        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
396        self.assertIsNotNone(match2)
397
398        # Autogenerated task names should have monotonically increasing numbers
399        self.assertLess(int(match1.group(1)), int(match2.group(1)))
400        self.loop.run_until_complete(t1)
401        self.loop.run_until_complete(t2)
402
403    def test_task_set_name_pylong(self):
404        # test that setting the task name to a PyLong explicitly doesn't
405        # incorrectly trigger the deferred name formatting logic
406        async def notmuch():
407            return 123
408
409        t = self.new_task(self.loop, notmuch(), name=987654321)
410        self.assertEqual(t.get_name(), '987654321')
411        t.set_name(123456789)
412        self.assertEqual(t.get_name(), '123456789')
413        self.loop.run_until_complete(t)
414
415    def test_task_repr_name_not_str(self):
416        async def notmuch():
417            return 123
418
419        t = self.new_task(self.loop, notmuch())
420        t.set_name({6})
421        self.assertEqual(t.get_name(), '{6}')
422        self.loop.run_until_complete(t)
423
424    def test_task_repr_wait_for(self):
425        self.loop.set_debug(False)
426
427        async def wait_for(fut):
428            return await fut
429
430        fut = self.new_future(self.loop)
431        task = self.new_task(self.loop, wait_for(fut))
432        test_utils.run_briefly(self.loop)
433        self.assertRegex(repr(task),
434                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
435
436        fut.set_result(None)
437        self.loop.run_until_complete(task)
438
439    def test_task_basics(self):
440
441        async def outer():
442            a = await inner1()
443            b = await inner2()
444            return a+b
445
446        async def inner1():
447            return 42
448
449        async def inner2():
450            return 1000
451
452        t = outer()
453        self.assertEqual(self.loop.run_until_complete(t), 1042)
454
455    def test_exception_chaining_after_await(self):
456        # Test that when awaiting on a task when an exception is already
457        # active, if the task raises an exception it will be chained
458        # with the original.
459        loop = asyncio.new_event_loop()
460        self.set_event_loop(loop)
461
462        async def raise_error():
463            raise ValueError
464
465        async def run():
466            try:
467                raise KeyError(3)
468            except Exception as exc:
469                task = self.new_task(loop, raise_error())
470                try:
471                    await task
472                except Exception as exc:
473                    self.assertEqual(type(exc), ValueError)
474                    chained = exc.__context__
475                    self.assertEqual((type(chained), chained.args),
476                        (KeyError, (3,)))
477
478        try:
479            task = self.new_task(loop, run())
480            loop.run_until_complete(task)
481        finally:
482            loop.close()
483
484    def test_exception_chaining_after_await_with_context_cycle(self):
485        # Check trying to create an exception context cycle:
486        # https://bugs.python.org/issue40696
487        has_cycle = None
488        loop = asyncio.new_event_loop()
489        self.set_event_loop(loop)
490
491        async def process_exc(exc):
492            raise exc
493
494        async def run():
495            nonlocal has_cycle
496            try:
497                raise KeyError('a')
498            except Exception as exc:
499                task = self.new_task(loop, process_exc(exc))
500                try:
501                    await task
502                except BaseException as exc:
503                    has_cycle = (exc is exc.__context__)
504                    # Prevent a hang if has_cycle is True.
505                    exc.__context__ = None
506
507        try:
508            task = self.new_task(loop, run())
509            loop.run_until_complete(task)
510        finally:
511            loop.close()
512        # This also distinguishes from the initial has_cycle=None.
513        self.assertEqual(has_cycle, False)
514
515
516    def test_cancelling(self):
517        loop = asyncio.new_event_loop()
518
519        async def task():
520            await asyncio.sleep(10)
521
522        try:
523            t = self.new_task(loop, task())
524            self.assertFalse(t.cancelling())
525            self.assertNotIn(" cancelling ", repr(t))
526            self.assertTrue(t.cancel())
527            self.assertTrue(t.cancelling())
528            self.assertIn(" cancelling ", repr(t))
529
530            # Since we commented out two lines from Task.cancel(),
531            # this t.cancel() call now returns True.
532            # self.assertFalse(t.cancel())
533            self.assertTrue(t.cancel())
534
535            with self.assertRaises(asyncio.CancelledError):
536                loop.run_until_complete(t)
537        finally:
538            loop.close()
539
540    def test_uncancel_basic(self):
541        loop = asyncio.new_event_loop()
542
543        async def task():
544            try:
545                await asyncio.sleep(10)
546            except asyncio.CancelledError:
547                asyncio.current_task().uncancel()
548                await asyncio.sleep(10)
549
550        try:
551            t = self.new_task(loop, task())
552            loop.run_until_complete(asyncio.sleep(0.01))
553
554            # Cancel first sleep
555            self.assertTrue(t.cancel())
556            self.assertIn(" cancelling ", repr(t))
557            self.assertEqual(t.cancelling(), 1)
558            self.assertFalse(t.cancelled())  # Task is still not complete
559            loop.run_until_complete(asyncio.sleep(0.01))
560
561            # after .uncancel()
562            self.assertNotIn(" cancelling ", repr(t))
563            self.assertEqual(t.cancelling(), 0)
564            self.assertFalse(t.cancelled())  # Task is still not complete
565
566            # Cancel second sleep
567            self.assertTrue(t.cancel())
568            self.assertEqual(t.cancelling(), 1)
569            self.assertFalse(t.cancelled())  # Task is still not complete
570            with self.assertRaises(asyncio.CancelledError):
571                loop.run_until_complete(t)
572            self.assertTrue(t.cancelled())  # Finally, task complete
573            self.assertTrue(t.done())
574
575            # uncancel is no longer effective after the task is complete
576            t.uncancel()
577            self.assertTrue(t.cancelled())
578            self.assertTrue(t.done())
579        finally:
580            loop.close()
581
582    def test_uncancel_structured_blocks(self):
583        # This test recreates the following high-level structure using uncancel()::
584        #
585        #     async def make_request_with_timeout():
586        #         try:
587        #             async with asyncio.timeout(1):
588        #                 # Structured block affected by the timeout:
589        #                 await make_request()
590        #                 await make_another_request()
591        #         except TimeoutError:
592        #             pass  # There was a timeout
593        #         # Outer code not affected by the timeout:
594        #         await unrelated_code()
595
596        loop = asyncio.new_event_loop()
597
598        async def make_request_with_timeout(*, sleep: float, timeout: float):
599            task = asyncio.current_task()
600            loop = task.get_loop()
601
602            timed_out = False
603            structured_block_finished = False
604            outer_code_reached = False
605
606            def on_timeout():
607                nonlocal timed_out
608                timed_out = True
609                task.cancel()
610
611            timeout_handle = loop.call_later(timeout, on_timeout)
612            try:
613                try:
614                    # Structured block affected by the timeout
615                    await asyncio.sleep(sleep)
616                    structured_block_finished = True
617                finally:
618                    timeout_handle.cancel()
619                    if (
620                        timed_out
621                        and task.uncancel() == 0
622                        and type(sys.exception()) is asyncio.CancelledError
623                    ):
624                        # Note the five rules that are needed here to satisfy proper
625                        # uncancellation:
626                        #
627                        # 1. handle uncancellation in a `finally:` block to allow for
628                        #    plain returns;
629                        # 2. our `timed_out` flag is set, meaning that it was our event
630                        #    that triggered the need to uncancel the task, regardless of
631                        #    what exception is raised;
632                        # 3. we can call `uncancel()` because *we* called `cancel()`
633                        #    before;
634                        # 4. we call `uncancel()` but we only continue converting the
635                        #    CancelledError to TimeoutError if `uncancel()` caused the
636                        #    cancellation request count go down to 0.  We need to look
637                        #    at the counter vs having a simple boolean flag because our
638                        #    code might have been nested (think multiple timeouts). See
639                        #    commit 7fce1063b6e5a366f8504e039a8ccdd6944625cd for
640                        #    details.
641                        # 5. we only convert CancelledError to TimeoutError; for other
642                        #    exceptions raised due to the cancellation (like
643                        #    a ConnectionLostError from a database client), simply
644                        #    propagate them.
645                        #
646                        # Those checks need to take place in this exact order to make
647                        # sure the `cancelling()` counter always stays in sync.
648                        #
649                        # Additionally, the original stimulus to `cancel()` the task
650                        # needs to be unscheduled to avoid re-cancelling the task later.
651                        # Here we do it by cancelling `timeout_handle` in the `finally:`
652                        # block.
653                        raise TimeoutError
654            except TimeoutError:
655                self.assertTrue(timed_out)
656
657            # Outer code not affected by the timeout:
658            outer_code_reached = True
659            await asyncio.sleep(0)
660            return timed_out, structured_block_finished, outer_code_reached
661
662        try:
663            # Test which timed out.
664            t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
665            timed_out, structured_block_finished, outer_code_reached = (
666                loop.run_until_complete(t1)
667            )
668            self.assertTrue(timed_out)
669            self.assertFalse(structured_block_finished)  # it was cancelled
670            self.assertTrue(outer_code_reached)  # task got uncancelled after leaving
671                                                 # the structured block and continued until
672                                                 # completion
673            self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
674
675            # Test which did not time out.
676            t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
677            timed_out, structured_block_finished, outer_code_reached = (
678                loop.run_until_complete(t2)
679            )
680            self.assertFalse(timed_out)
681            self.assertTrue(structured_block_finished)
682            self.assertTrue(outer_code_reached)
683            self.assertEqual(t2.cancelling(), 0)
684        finally:
685            loop.close()
686
687    def test_uncancel_resets_must_cancel(self):
688
689        async def coro():
690            await fut
691            return 42
692
693        loop = asyncio.new_event_loop()
694        fut = asyncio.Future(loop=loop)
695        task = self.new_task(loop, coro())
696        loop.run_until_complete(asyncio.sleep(0))  # Get task waiting for fut
697        fut.set_result(None)  # Make task runnable
698        try:
699            task.cancel()  # Enter cancelled state
700            self.assertEqual(task.cancelling(), 1)
701            self.assertTrue(task._must_cancel)
702
703            task.uncancel()  # Undo cancellation
704            self.assertEqual(task.cancelling(), 0)
705            self.assertFalse(task._must_cancel)
706        finally:
707            res = loop.run_until_complete(task)
708            self.assertEqual(res, 42)
709            loop.close()
710
711    def test_cancel(self):
712
713        def gen():
714            when = yield
715            self.assertAlmostEqual(10.0, when)
716            yield 0
717
718        loop = self.new_test_loop(gen)
719
720        async def task():
721            await asyncio.sleep(10.0)
722            return 12
723
724        t = self.new_task(loop, task())
725        loop.call_soon(t.cancel)
726        with self.assertRaises(asyncio.CancelledError):
727            loop.run_until_complete(t)
728        self.assertTrue(t.done())
729        self.assertTrue(t.cancelled())
730        self.assertFalse(t.cancel())
731
732    def test_cancel_with_message_then_future_result(self):
733        # Test Future.result() after calling cancel() with a message.
734        cases = [
735            ((), ()),
736            ((None,), ()),
737            (('my message',), ('my message',)),
738            # Non-string values should roundtrip.
739            ((5,), (5,)),
740        ]
741        for cancel_args, expected_args in cases:
742            with self.subTest(cancel_args=cancel_args):
743                loop = asyncio.new_event_loop()
744                self.set_event_loop(loop)
745
746                async def sleep():
747                    await asyncio.sleep(10)
748
749                async def coro():
750                    task = self.new_task(loop, sleep())
751                    await asyncio.sleep(0)
752                    task.cancel(*cancel_args)
753                    done, pending = await asyncio.wait([task])
754                    task.result()
755
756                task = self.new_task(loop, coro())
757                with self.assertRaises(asyncio.CancelledError) as cm:
758                    loop.run_until_complete(task)
759                exc = cm.exception
760                self.assertEqual(exc.args, expected_args)
761
762                actual = get_innermost_context(exc)
763                self.assertEqual(actual,
764                    (asyncio.CancelledError, expected_args, 0))
765
766    def test_cancel_with_message_then_future_exception(self):
767        # Test Future.exception() after calling cancel() with a message.
768        cases = [
769            ((), ()),
770            ((None,), ()),
771            (('my message',), ('my message',)),
772            # Non-string values should roundtrip.
773            ((5,), (5,)),
774        ]
775        for cancel_args, expected_args in cases:
776            with self.subTest(cancel_args=cancel_args):
777                loop = asyncio.new_event_loop()
778                self.set_event_loop(loop)
779
780                async def sleep():
781                    await asyncio.sleep(10)
782
783                async def coro():
784                    task = self.new_task(loop, sleep())
785                    await asyncio.sleep(0)
786                    task.cancel(*cancel_args)
787                    done, pending = await asyncio.wait([task])
788                    task.exception()
789
790                task = self.new_task(loop, coro())
791                with self.assertRaises(asyncio.CancelledError) as cm:
792                    loop.run_until_complete(task)
793                exc = cm.exception
794                self.assertEqual(exc.args, expected_args)
795
796                actual = get_innermost_context(exc)
797                self.assertEqual(actual,
798                    (asyncio.CancelledError, expected_args, 0))
799
800    def test_cancellation_exception_context(self):
801        loop = asyncio.new_event_loop()
802        self.set_event_loop(loop)
803        fut = loop.create_future()
804
805        async def sleep():
806            fut.set_result(None)
807            await asyncio.sleep(10)
808
809        async def coro():
810            inner_task = self.new_task(loop, sleep())
811            await fut
812            loop.call_soon(inner_task.cancel, 'msg')
813            try:
814                await inner_task
815            except asyncio.CancelledError as ex:
816                raise ValueError("cancelled") from ex
817
818        task = self.new_task(loop, coro())
819        with self.assertRaises(ValueError) as cm:
820            loop.run_until_complete(task)
821        exc = cm.exception
822        self.assertEqual(exc.args, ('cancelled',))
823
824        actual = get_innermost_context(exc)
825        self.assertEqual(actual,
826            (asyncio.CancelledError, ('msg',), 1))
827
828    def test_cancel_with_message_before_starting_task(self):
829        loop = asyncio.new_event_loop()
830        self.set_event_loop(loop)
831
832        async def sleep():
833            await asyncio.sleep(10)
834
835        async def coro():
836            task = self.new_task(loop, sleep())
837            # We deliberately leave out the sleep here.
838            task.cancel('my message')
839            done, pending = await asyncio.wait([task])
840            task.exception()
841
842        task = self.new_task(loop, coro())
843        with self.assertRaises(asyncio.CancelledError) as cm:
844            loop.run_until_complete(task)
845        exc = cm.exception
846        self.assertEqual(exc.args, ('my message',))
847
848        actual = get_innermost_context(exc)
849        self.assertEqual(actual,
850            (asyncio.CancelledError, ('my message',), 0))
851
852    def test_cancel_yield(self):
853        async def task():
854            await asyncio.sleep(0)
855            await asyncio.sleep(0)
856            return 12
857
858        t = self.new_task(self.loop, task())
859        test_utils.run_briefly(self.loop)  # start coro
860        t.cancel()
861        self.assertRaises(
862            asyncio.CancelledError, self.loop.run_until_complete, t)
863        self.assertTrue(t.done())
864        self.assertTrue(t.cancelled())
865        self.assertFalse(t.cancel())
866
867    def test_cancel_inner_future(self):
868        f = self.new_future(self.loop)
869
870        async def task():
871            await f
872            return 12
873
874        t = self.new_task(self.loop, task())
875        test_utils.run_briefly(self.loop)  # start task
876        f.cancel()
877        with self.assertRaises(asyncio.CancelledError):
878            self.loop.run_until_complete(t)
879        self.assertTrue(f.cancelled())
880        self.assertTrue(t.cancelled())
881
882    def test_cancel_both_task_and_inner_future(self):
883        f = self.new_future(self.loop)
884
885        async def task():
886            await f
887            return 12
888
889        t = self.new_task(self.loop, task())
890        test_utils.run_briefly(self.loop)
891
892        f.cancel()
893        t.cancel()
894
895        with self.assertRaises(asyncio.CancelledError):
896            self.loop.run_until_complete(t)
897
898        self.assertTrue(t.done())
899        self.assertTrue(f.cancelled())
900        self.assertTrue(t.cancelled())
901
902    def test_cancel_task_catching(self):
903        fut1 = self.new_future(self.loop)
904        fut2 = self.new_future(self.loop)
905
906        async def task():
907            await fut1
908            try:
909                await fut2
910            except asyncio.CancelledError:
911                return 42
912
913        t = self.new_task(self.loop, task())
914        test_utils.run_briefly(self.loop)
915        self.assertIs(t._fut_waiter, fut1)  # White-box test.
916        fut1.set_result(None)
917        test_utils.run_briefly(self.loop)
918        self.assertIs(t._fut_waiter, fut2)  # White-box test.
919        t.cancel()
920        self.assertTrue(fut2.cancelled())
921        res = self.loop.run_until_complete(t)
922        self.assertEqual(res, 42)
923        self.assertFalse(t.cancelled())
924
925    def test_cancel_task_ignoring(self):
926        fut1 = self.new_future(self.loop)
927        fut2 = self.new_future(self.loop)
928        fut3 = self.new_future(self.loop)
929
930        async def task():
931            await fut1
932            try:
933                await fut2
934            except asyncio.CancelledError:
935                pass
936            res = await fut3
937            return res
938
939        t = self.new_task(self.loop, task())
940        test_utils.run_briefly(self.loop)
941        self.assertIs(t._fut_waiter, fut1)  # White-box test.
942        fut1.set_result(None)
943        test_utils.run_briefly(self.loop)
944        self.assertIs(t._fut_waiter, fut2)  # White-box test.
945        t.cancel()
946        self.assertTrue(fut2.cancelled())
947        test_utils.run_briefly(self.loop)
948        self.assertIs(t._fut_waiter, fut3)  # White-box test.
949        fut3.set_result(42)
950        res = self.loop.run_until_complete(t)
951        self.assertEqual(res, 42)
952        self.assertFalse(fut3.cancelled())
953        self.assertFalse(t.cancelled())
954
955    def test_cancel_current_task(self):
956        loop = asyncio.new_event_loop()
957        self.set_event_loop(loop)
958
959        async def task():
960            t.cancel()
961            self.assertTrue(t._must_cancel)  # White-box test.
962            # The sleep should be cancelled immediately.
963            await asyncio.sleep(100)
964            return 12
965
966        t = self.new_task(loop, task())
967        self.assertFalse(t.cancelled())
968        self.assertRaises(
969            asyncio.CancelledError, loop.run_until_complete, t)
970        self.assertTrue(t.done())
971        self.assertTrue(t.cancelled())
972        self.assertFalse(t._must_cancel)  # White-box test.
973        self.assertFalse(t.cancel())
974
975    def test_cancel_at_end(self):
976        """coroutine end right after task is cancelled"""
977        loop = asyncio.new_event_loop()
978        self.set_event_loop(loop)
979
980        async def task():
981            t.cancel()
982            self.assertTrue(t._must_cancel)  # White-box test.
983            return 12
984
985        t = self.new_task(loop, task())
986        self.assertFalse(t.cancelled())
987        self.assertRaises(
988            asyncio.CancelledError, loop.run_until_complete, t)
989        self.assertTrue(t.done())
990        self.assertTrue(t.cancelled())
991        self.assertFalse(t._must_cancel)  # White-box test.
992        self.assertFalse(t.cancel())
993
994    def test_cancel_awaited_task(self):
995        # This tests for a relatively rare condition when
996        # a task cancellation is requested for a task which is not
997        # currently blocked, such as a task cancelling itself.
998        # In this situation we must ensure that whatever next future
999        # or task the cancelled task blocks on is cancelled correctly
1000        # as well.  See also bpo-34872.
1001        loop = asyncio.new_event_loop()
1002        self.addCleanup(lambda: loop.close())
1003
1004        task = nested_task = None
1005        fut = self.new_future(loop)
1006
1007        async def nested():
1008            await fut
1009
1010        async def coro():
1011            nonlocal nested_task
1012            # Create a sub-task and wait for it to run.
1013            nested_task = self.new_task(loop, nested())
1014            await asyncio.sleep(0)
1015
1016            # Request the current task to be cancelled.
1017            task.cancel()
1018            # Block on the nested task, which should be immediately
1019            # cancelled.
1020            await nested_task
1021
1022        task = self.new_task(loop, coro())
1023        with self.assertRaises(asyncio.CancelledError):
1024            loop.run_until_complete(task)
1025
1026        self.assertTrue(task.cancelled())
1027        self.assertTrue(nested_task.cancelled())
1028        self.assertTrue(fut.cancelled())
1029
1030    def assert_text_contains(self, text, substr):
1031        if substr not in text:
1032            raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
1033
1034    def test_cancel_traceback_for_future_result(self):
1035        # When calling Future.result() on a cancelled task, check that the
1036        # line of code that was interrupted is included in the traceback.
1037        loop = asyncio.new_event_loop()
1038        self.set_event_loop(loop)
1039
1040        async def nested():
1041            # This will get cancelled immediately.
1042            await asyncio.sleep(10)
1043
1044        async def coro():
1045            task = self.new_task(loop, nested())
1046            await asyncio.sleep(0)
1047            task.cancel()
1048            await task  # search target
1049
1050        task = self.new_task(loop, coro())
1051        try:
1052            loop.run_until_complete(task)
1053        except asyncio.CancelledError:
1054            tb = traceback.format_exc()
1055            self.assert_text_contains(tb, "await asyncio.sleep(10)")
1056            # The intermediate await should also be included.
1057            self.assert_text_contains(tb, "await task  # search target")
1058        else:
1059            self.fail('CancelledError did not occur')
1060
1061    def test_cancel_traceback_for_future_exception(self):
1062        # When calling Future.exception() on a cancelled task, check that the
1063        # line of code that was interrupted is included in the traceback.
1064        loop = asyncio.new_event_loop()
1065        self.set_event_loop(loop)
1066
1067        async def nested():
1068            # This will get cancelled immediately.
1069            await asyncio.sleep(10)
1070
1071        async def coro():
1072            task = self.new_task(loop, nested())
1073            await asyncio.sleep(0)
1074            task.cancel()
1075            done, pending = await asyncio.wait([task])
1076            task.exception()  # search target
1077
1078        task = self.new_task(loop, coro())
1079        try:
1080            loop.run_until_complete(task)
1081        except asyncio.CancelledError:
1082            tb = traceback.format_exc()
1083            self.assert_text_contains(tb, "await asyncio.sleep(10)")
1084            # The intermediate await should also be included.
1085            self.assert_text_contains(tb,
1086                "task.exception()  # search target")
1087        else:
1088            self.fail('CancelledError did not occur')
1089
1090    def test_stop_while_run_in_complete(self):
1091
1092        def gen():
1093            when = yield
1094            self.assertAlmostEqual(0.1, when)
1095            when = yield 0.1
1096            self.assertAlmostEqual(0.2, when)
1097            when = yield 0.1
1098            self.assertAlmostEqual(0.3, when)
1099            yield 0.1
1100
1101        loop = self.new_test_loop(gen)
1102
1103        x = 0
1104
1105        async def task():
1106            nonlocal x
1107            while x < 10:
1108                await asyncio.sleep(0.1)
1109                x += 1
1110                if x == 2:
1111                    loop.stop()
1112
1113        t = self.new_task(loop, task())
1114        with self.assertRaises(RuntimeError) as cm:
1115            loop.run_until_complete(t)
1116        self.assertEqual(str(cm.exception),
1117                         'Event loop stopped before Future completed.')
1118        self.assertFalse(t.done())
1119        self.assertEqual(x, 2)
1120        self.assertAlmostEqual(0.3, loop.time())
1121
1122        t.cancel()
1123        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
1124
1125    def test_log_traceback(self):
1126        async def coro():
1127            pass
1128
1129        task = self.new_task(self.loop, coro())
1130        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
1131            task._log_traceback = True
1132        self.loop.run_until_complete(task)
1133
1134    def test_wait(self):
1135
1136        def gen():
1137            when = yield
1138            self.assertAlmostEqual(0.1, when)
1139            when = yield 0
1140            self.assertAlmostEqual(0.15, when)
1141            yield 0.15
1142
1143        loop = self.new_test_loop(gen)
1144
1145        a = self.new_task(loop, asyncio.sleep(0.1))
1146        b = self.new_task(loop, asyncio.sleep(0.15))
1147
1148        async def foo():
1149            done, pending = await asyncio.wait([b, a])
1150            self.assertEqual(done, set([a, b]))
1151            self.assertEqual(pending, set())
1152            return 42
1153
1154        res = loop.run_until_complete(self.new_task(loop, foo()))
1155        self.assertEqual(res, 42)
1156        self.assertAlmostEqual(0.15, loop.time())
1157
1158        # Doing it again should take no time and exercise a different path.
1159        res = loop.run_until_complete(self.new_task(loop, foo()))
1160        self.assertAlmostEqual(0.15, loop.time())
1161        self.assertEqual(res, 42)
1162
1163    def test_wait_duplicate_coroutines(self):
1164
1165        async def coro(s):
1166            return s
1167        c = self.loop.create_task(coro('test'))
1168        task = self.new_task(
1169            self.loop,
1170            asyncio.wait([c, c, self.loop.create_task(coro('spam'))]))
1171
1172        done, pending = self.loop.run_until_complete(task)
1173
1174        self.assertFalse(pending)
1175        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1176
1177    def test_wait_errors(self):
1178        self.assertRaises(
1179            ValueError, self.loop.run_until_complete,
1180            asyncio.wait(set()))
1181
1182        # -1 is an invalid return_when value
1183        sleep_coro = asyncio.sleep(10.0)
1184        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1185        self.assertRaises(ValueError,
1186                          self.loop.run_until_complete, wait_coro)
1187
1188        sleep_coro.close()
1189
1190    def test_wait_first_completed(self):
1191
1192        def gen():
1193            when = yield
1194            self.assertAlmostEqual(10.0, when)
1195            when = yield 0
1196            self.assertAlmostEqual(0.1, when)
1197            yield 0.1
1198
1199        loop = self.new_test_loop(gen)
1200
1201        a = self.new_task(loop, asyncio.sleep(10.0))
1202        b = self.new_task(loop, asyncio.sleep(0.1))
1203        task = self.new_task(
1204            loop,
1205            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1206
1207        done, pending = loop.run_until_complete(task)
1208        self.assertEqual({b}, done)
1209        self.assertEqual({a}, pending)
1210        self.assertFalse(a.done())
1211        self.assertTrue(b.done())
1212        self.assertIsNone(b.result())
1213        self.assertAlmostEqual(0.1, loop.time())
1214
1215        # move forward to close generator
1216        loop.advance_time(10)
1217        loop.run_until_complete(asyncio.wait([a, b]))
1218
1219    def test_wait_really_done(self):
1220        # there is possibility that some tasks in the pending list
1221        # became done but their callbacks haven't all been called yet
1222
1223        async def coro1():
1224            await asyncio.sleep(0)
1225
1226        async def coro2():
1227            await asyncio.sleep(0)
1228            await asyncio.sleep(0)
1229
1230        a = self.new_task(self.loop, coro1())
1231        b = self.new_task(self.loop, coro2())
1232        task = self.new_task(
1233            self.loop,
1234            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1235
1236        done, pending = self.loop.run_until_complete(task)
1237        self.assertEqual({a, b}, done)
1238        self.assertTrue(a.done())
1239        self.assertIsNone(a.result())
1240        self.assertTrue(b.done())
1241        self.assertIsNone(b.result())
1242
1243    def test_wait_first_exception(self):
1244
1245        def gen():
1246            when = yield
1247            self.assertAlmostEqual(10.0, when)
1248            yield 0
1249
1250        loop = self.new_test_loop(gen)
1251
1252        # first_exception, task already has exception
1253        a = self.new_task(loop, asyncio.sleep(10.0))
1254
1255        async def exc():
1256            raise ZeroDivisionError('err')
1257
1258        b = self.new_task(loop, exc())
1259        task = self.new_task(
1260            loop,
1261            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1262
1263        done, pending = loop.run_until_complete(task)
1264        self.assertEqual({b}, done)
1265        self.assertEqual({a}, pending)
1266        self.assertAlmostEqual(0, loop.time())
1267
1268        # move forward to close generator
1269        loop.advance_time(10)
1270        loop.run_until_complete(asyncio.wait([a, b]))
1271
1272    def test_wait_first_exception_in_wait(self):
1273
1274        def gen():
1275            when = yield
1276            self.assertAlmostEqual(10.0, when)
1277            when = yield 0
1278            self.assertAlmostEqual(0.01, when)
1279            yield 0.01
1280
1281        loop = self.new_test_loop(gen)
1282
1283        # first_exception, exception during waiting
1284        a = self.new_task(loop, asyncio.sleep(10.0))
1285
1286        async def exc():
1287            await asyncio.sleep(0.01)
1288            raise ZeroDivisionError('err')
1289
1290        b = self.new_task(loop, exc())
1291        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1292
1293        done, pending = loop.run_until_complete(task)
1294        self.assertEqual({b}, done)
1295        self.assertEqual({a}, pending)
1296        self.assertAlmostEqual(0.01, loop.time())
1297
1298        # move forward to close generator
1299        loop.advance_time(10)
1300        loop.run_until_complete(asyncio.wait([a, b]))
1301
1302    def test_wait_with_exception(self):
1303
1304        def gen():
1305            when = yield
1306            self.assertAlmostEqual(0.1, when)
1307            when = yield 0
1308            self.assertAlmostEqual(0.15, when)
1309            yield 0.15
1310
1311        loop = self.new_test_loop(gen)
1312
1313        a = self.new_task(loop, asyncio.sleep(0.1))
1314
1315        async def sleeper():
1316            await asyncio.sleep(0.15)
1317            raise ZeroDivisionError('really')
1318
1319        b = self.new_task(loop, sleeper())
1320
1321        async def foo():
1322            done, pending = await asyncio.wait([b, a])
1323            self.assertEqual(len(done), 2)
1324            self.assertEqual(pending, set())
1325            errors = set(f for f in done if f.exception() is not None)
1326            self.assertEqual(len(errors), 1)
1327
1328        loop.run_until_complete(self.new_task(loop, foo()))
1329        self.assertAlmostEqual(0.15, loop.time())
1330
1331        loop.run_until_complete(self.new_task(loop, foo()))
1332        self.assertAlmostEqual(0.15, loop.time())
1333
1334    def test_wait_with_timeout(self):
1335
1336        def gen():
1337            when = yield
1338            self.assertAlmostEqual(0.1, when)
1339            when = yield 0
1340            self.assertAlmostEqual(0.15, when)
1341            when = yield 0
1342            self.assertAlmostEqual(0.11, when)
1343            yield 0.11
1344
1345        loop = self.new_test_loop(gen)
1346
1347        a = self.new_task(loop, asyncio.sleep(0.1))
1348        b = self.new_task(loop, asyncio.sleep(0.15))
1349
1350        async def foo():
1351            done, pending = await asyncio.wait([b, a], timeout=0.11)
1352            self.assertEqual(done, set([a]))
1353            self.assertEqual(pending, set([b]))
1354
1355        loop.run_until_complete(self.new_task(loop, foo()))
1356        self.assertAlmostEqual(0.11, loop.time())
1357
1358        # move forward to close generator
1359        loop.advance_time(10)
1360        loop.run_until_complete(asyncio.wait([a, b]))
1361
1362    def test_wait_concurrent_complete(self):
1363
1364        def gen():
1365            when = yield
1366            self.assertAlmostEqual(0.1, when)
1367            when = yield 0
1368            self.assertAlmostEqual(0.15, when)
1369            when = yield 0
1370            self.assertAlmostEqual(0.1, when)
1371            yield 0.1
1372
1373        loop = self.new_test_loop(gen)
1374
1375        a = self.new_task(loop, asyncio.sleep(0.1))
1376        b = self.new_task(loop, asyncio.sleep(0.15))
1377
1378        done, pending = loop.run_until_complete(
1379            asyncio.wait([b, a], timeout=0.1))
1380
1381        self.assertEqual(done, set([a]))
1382        self.assertEqual(pending, set([b]))
1383        self.assertAlmostEqual(0.1, loop.time())
1384
1385        # move forward to close generator
1386        loop.advance_time(10)
1387        loop.run_until_complete(asyncio.wait([a, b]))
1388
1389    def test_wait_with_iterator_of_tasks(self):
1390
1391        def gen():
1392            when = yield
1393            self.assertAlmostEqual(0.1, when)
1394            when = yield 0
1395            self.assertAlmostEqual(0.15, when)
1396            yield 0.15
1397
1398        loop = self.new_test_loop(gen)
1399
1400        a = self.new_task(loop, asyncio.sleep(0.1))
1401        b = self.new_task(loop, asyncio.sleep(0.15))
1402
1403        async def foo():
1404            done, pending = await asyncio.wait(iter([b, a]))
1405            self.assertEqual(done, set([a, b]))
1406            self.assertEqual(pending, set())
1407            return 42
1408
1409        res = loop.run_until_complete(self.new_task(loop, foo()))
1410        self.assertEqual(res, 42)
1411        self.assertAlmostEqual(0.15, loop.time())
1412
1413
1414    def test_wait_generator(self):
1415        async def func(a):
1416            return a
1417
1418        loop = self.new_test_loop()
1419
1420        async def main():
1421            tasks = (self.new_task(loop, func(i)) for i in range(10))
1422            done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
1423            self.assertEqual(len(done), 10)
1424            self.assertEqual(len(pending), 0)
1425
1426        loop.run_until_complete(main())
1427
1428
1429    def test_as_completed(self):
1430
1431        def gen():
1432            yield 0
1433            yield 0
1434            yield 0.01
1435            yield 0
1436
1437        async def sleeper(dt, x):
1438            nonlocal time_shifted
1439            await asyncio.sleep(dt)
1440            completed.add(x)
1441            if not time_shifted and 'a' in completed and 'b' in completed:
1442                time_shifted = True
1443                loop.advance_time(0.14)
1444            return x
1445
1446        async def try_iterator(awaitables):
1447            values = []
1448            for f in asyncio.as_completed(awaitables):
1449                values.append(await f)
1450            return values
1451
1452        async def try_async_iterator(awaitables):
1453            values = []
1454            async for f in asyncio.as_completed(awaitables):
1455                values.append(await f)
1456            return values
1457
1458        for foo in try_iterator, try_async_iterator:
1459            with self.subTest(method=foo.__name__):
1460                loop = self.new_test_loop(gen)
1461                # disable "slow callback" warning
1462                loop.slow_callback_duration = 1.0
1463
1464                completed = set()
1465                time_shifted = False
1466
1467                a = sleeper(0.01, 'a')
1468                b = sleeper(0.01, 'b')
1469                c = sleeper(0.15, 'c')
1470
1471                res = loop.run_until_complete(self.new_task(loop, foo([b, c, a])))
1472                self.assertAlmostEqual(0.15, loop.time())
1473                self.assertTrue('a' in res[:2])
1474                self.assertTrue('b' in res[:2])
1475                self.assertEqual(res[2], 'c')
1476
1477    def test_as_completed_same_tasks_in_as_out(self):
1478        # Ensures that asynchronously iterating as_completed's iterator
1479        # yields awaitables are the same awaitables that were passed in when
1480        # those awaitables are futures.
1481        async def try_async_iterator(awaitables):
1482            awaitables_out = set()
1483            async for out_aw in asyncio.as_completed(awaitables):
1484                awaitables_out.add(out_aw)
1485            return awaitables_out
1486
1487        async def coro(i):
1488            return i
1489
1490        with contextlib.closing(asyncio.new_event_loop()) as loop:
1491            # Coroutines shouldn't be yielded back as finished coroutines
1492            # can't be re-used.
1493            awaitables_in = frozenset(
1494                (coro(0), coro(1), coro(2), coro(3))
1495            )
1496            awaitables_out = loop.run_until_complete(
1497                try_async_iterator(awaitables_in)
1498            )
1499            if awaitables_in - awaitables_out != awaitables_in:
1500                raise self.failureException('Got original coroutines '
1501                                            'out of as_completed iterator.')
1502
1503            # Tasks should be yielded back.
1504            coro_obj_a = coro('a')
1505            task_b = loop.create_task(coro('b'))
1506            coro_obj_c = coro('c')
1507            task_d = loop.create_task(coro('d'))
1508            awaitables_in = frozenset(
1509                (coro_obj_a, task_b, coro_obj_c, task_d)
1510            )
1511            awaitables_out = loop.run_until_complete(
1512                try_async_iterator(awaitables_in)
1513            )
1514            if awaitables_in & awaitables_out != {task_b, task_d}:
1515                raise self.failureException('Only tasks should be yielded '
1516                                            'from as_completed iterator '
1517                                            'as-is.')
1518
1519    def test_as_completed_with_timeout(self):
1520
1521        def gen():
1522            yield
1523            yield 0
1524            yield 0
1525            yield 0.1
1526
1527        async def try_iterator():
1528            values = []
1529            for f in asyncio.as_completed([a, b], timeout=0.12):
1530                if values:
1531                    loop.advance_time(0.02)
1532                try:
1533                    v = await f
1534                    values.append((1, v))
1535                except asyncio.TimeoutError as exc:
1536                    values.append((2, exc))
1537            return values
1538
1539        async def try_async_iterator():
1540            values = []
1541            try:
1542                async for f in asyncio.as_completed([a, b], timeout=0.12):
1543                    v = await f
1544                    values.append((1, v))
1545                    loop.advance_time(0.02)
1546            except asyncio.TimeoutError as exc:
1547                values.append((2, exc))
1548            return values
1549
1550        for foo in try_iterator, try_async_iterator:
1551            with self.subTest(method=foo.__name__):
1552                loop = self.new_test_loop(gen)
1553                a = loop.create_task(asyncio.sleep(0.1, 'a'))
1554                b = loop.create_task(asyncio.sleep(0.15, 'b'))
1555
1556                res = loop.run_until_complete(self.new_task(loop, foo()))
1557                self.assertEqual(len(res), 2, res)
1558                self.assertEqual(res[0], (1, 'a'))
1559                self.assertEqual(res[1][0], 2)
1560                self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1561                self.assertAlmostEqual(0.12, loop.time())
1562
1563                # move forward to close generator
1564                loop.advance_time(10)
1565                loop.run_until_complete(asyncio.wait([a, b]))
1566
1567    def test_as_completed_with_unused_timeout(self):
1568
1569        def gen():
1570            yield
1571            yield 0
1572            yield 0.01
1573
1574        async def try_iterator():
1575            for f in asyncio.as_completed([a], timeout=1):
1576                v = await f
1577                self.assertEqual(v, 'a')
1578
1579        async def try_async_iterator():
1580            async for f in asyncio.as_completed([a], timeout=1):
1581                v = await f
1582                self.assertEqual(v, 'a')
1583
1584        for foo in try_iterator, try_async_iterator:
1585            with self.subTest(method=foo.__name__):
1586                a = asyncio.sleep(0.01, 'a')
1587                loop = self.new_test_loop(gen)
1588                loop.run_until_complete(self.new_task(loop, foo()))
1589                loop.close()
1590
1591    def test_as_completed_resume_iterator(self):
1592        # Test that as_completed returns an iterator that can be resumed
1593        # the next time iteration is performed (i.e. if __iter__ is called
1594        # again)
1595        async def try_iterator(awaitables):
1596            iterations = 0
1597            iterator = asyncio.as_completed(awaitables)
1598            collected = []
1599            for f in iterator:
1600                collected.append(await f)
1601                iterations += 1
1602                if iterations == 2:
1603                    break
1604            self.assertEqual(len(collected), 2)
1605
1606            # Resume same iterator:
1607            for f in iterator:
1608                collected.append(await f)
1609            return collected
1610
1611        async def try_async_iterator(awaitables):
1612            iterations = 0
1613            iterator = asyncio.as_completed(awaitables)
1614            collected = []
1615            async for f in iterator:
1616                collected.append(await f)
1617                iterations += 1
1618                if iterations == 2:
1619                    break
1620            self.assertEqual(len(collected), 2)
1621
1622            # Resume same iterator:
1623            async for f in iterator:
1624                collected.append(await f)
1625            return collected
1626
1627        async def coro(i):
1628            return i
1629
1630        with contextlib.closing(asyncio.new_event_loop()) as loop:
1631            for foo in try_iterator, try_async_iterator:
1632                with self.subTest(method=foo.__name__):
1633                    results = loop.run_until_complete(
1634                        foo((coro(0), coro(1), coro(2), coro(3)))
1635                    )
1636                    self.assertCountEqual(results, (0, 1, 2, 3))
1637
1638    def test_as_completed_reverse_wait(self):
1639        # Tests the plain iterator style of as_completed iteration to
1640        # ensure that the first future awaited resolves to the first
1641        # completed awaitable from the set we passed in, even if it wasn't
1642        # the first future generated by as_completed.
1643        def gen():
1644            yield 0
1645            yield 0.05
1646            yield 0
1647
1648        loop = self.new_test_loop(gen)
1649
1650        a = asyncio.sleep(0.05, 'a')
1651        b = asyncio.sleep(0.10, 'b')
1652        fs = {a, b}
1653
1654        async def test():
1655            futs = list(asyncio.as_completed(fs))
1656            self.assertEqual(len(futs), 2)
1657
1658            x = await futs[1]
1659            self.assertEqual(x, 'a')
1660            self.assertAlmostEqual(0.05, loop.time())
1661            loop.advance_time(0.05)
1662            y = await futs[0]
1663            self.assertEqual(y, 'b')
1664            self.assertAlmostEqual(0.10, loop.time())
1665
1666        loop.run_until_complete(test())
1667
1668    def test_as_completed_concurrent(self):
1669        # Ensure that more than one future or coroutine yielded from
1670        # as_completed can be awaited concurrently.
1671        def gen():
1672            when = yield
1673            self.assertAlmostEqual(0.05, when)
1674            when = yield 0
1675            self.assertAlmostEqual(0.05, when)
1676            yield 0.05
1677
1678        async def try_iterator(fs):
1679            return list(asyncio.as_completed(fs))
1680
1681        async def try_async_iterator(fs):
1682            return [f async for f in asyncio.as_completed(fs)]
1683
1684        for runner in try_iterator, try_async_iterator:
1685            with self.subTest(method=runner.__name__):
1686                a = asyncio.sleep(0.05, 'a')
1687                b = asyncio.sleep(0.05, 'b')
1688                fs = {a, b}
1689
1690                async def test():
1691                    futs = await runner(fs)
1692                    self.assertEqual(len(futs), 2)
1693                    done, pending = await asyncio.wait(
1694                        [asyncio.ensure_future(fut) for fut in futs]
1695                    )
1696                    self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1697
1698                loop = self.new_test_loop(gen)
1699                loop.run_until_complete(test())
1700
1701    def test_as_completed_duplicate_coroutines(self):
1702
1703        async def coro(s):
1704            return s
1705
1706        async def try_iterator():
1707            result = []
1708            c = coro('ham')
1709            for f in asyncio.as_completed([c, c, coro('spam')]):
1710                result.append(await f)
1711            return result
1712
1713        async def try_async_iterator():
1714            result = []
1715            c = coro('ham')
1716            async for f in asyncio.as_completed([c, c, coro('spam')]):
1717                result.append(await f)
1718            return result
1719
1720        for runner in try_iterator, try_async_iterator:
1721            with self.subTest(method=runner.__name__):
1722                fut = self.new_task(self.loop, runner())
1723                self.loop.run_until_complete(fut)
1724                result = fut.result()
1725                self.assertEqual(set(result), {'ham', 'spam'})
1726                self.assertEqual(len(result), 2)
1727
1728    def test_as_completed_coroutine_without_loop(self):
1729        async def coro():
1730            return 42
1731
1732        a = coro()
1733        self.addCleanup(a.close)
1734
1735        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
1736            futs = asyncio.as_completed([a])
1737            list(futs)
1738
1739    def test_as_completed_coroutine_use_running_loop(self):
1740        loop = self.new_test_loop()
1741
1742        async def coro():
1743            return 42
1744
1745        async def test():
1746            futs = list(asyncio.as_completed([coro()]))
1747            self.assertEqual(len(futs), 1)
1748            self.assertEqual(await futs[0], 42)
1749
1750        loop.run_until_complete(test())
1751
1752    def test_sleep(self):
1753
1754        def gen():
1755            when = yield
1756            self.assertAlmostEqual(0.05, when)
1757            when = yield 0.05
1758            self.assertAlmostEqual(0.1, when)
1759            yield 0.05
1760
1761        loop = self.new_test_loop(gen)
1762
1763        async def sleeper(dt, arg):
1764            await asyncio.sleep(dt/2)
1765            res = await asyncio.sleep(dt/2, arg)
1766            return res
1767
1768        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1769        loop.run_until_complete(t)
1770        self.assertTrue(t.done())
1771        self.assertEqual(t.result(), 'yeah')
1772        self.assertAlmostEqual(0.1, loop.time())
1773
1774    def test_sleep_when_delay_is_nan(self):
1775
1776        def gen():
1777            yield
1778
1779        loop = self.new_test_loop(gen)
1780
1781        async def sleeper():
1782            await asyncio.sleep(float("nan"))
1783
1784        t = self.new_task(loop, sleeper())
1785
1786        with self.assertRaises(ValueError):
1787            loop.run_until_complete(t)
1788
1789    def test_sleep_cancel(self):
1790
1791        def gen():
1792            when = yield
1793            self.assertAlmostEqual(10.0, when)
1794            yield 0
1795
1796        loop = self.new_test_loop(gen)
1797
1798        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1799
1800        handle = None
1801        orig_call_later = loop.call_later
1802
1803        def call_later(delay, callback, *args):
1804            nonlocal handle
1805            handle = orig_call_later(delay, callback, *args)
1806            return handle
1807
1808        loop.call_later = call_later
1809        test_utils.run_briefly(loop)
1810
1811        self.assertFalse(handle._cancelled)
1812
1813        t.cancel()
1814        test_utils.run_briefly(loop)
1815        self.assertTrue(handle._cancelled)
1816
1817    def test_task_cancel_sleeping_task(self):
1818
1819        def gen():
1820            when = yield
1821            self.assertAlmostEqual(0.1, when)
1822            when = yield 0
1823            self.assertAlmostEqual(5000, when)
1824            yield 0.1
1825
1826        loop = self.new_test_loop(gen)
1827
1828        async def sleep(dt):
1829            await asyncio.sleep(dt)
1830
1831        async def doit():
1832            sleeper = self.new_task(loop, sleep(5000))
1833            loop.call_later(0.1, sleeper.cancel)
1834            try:
1835                await sleeper
1836            except asyncio.CancelledError:
1837                return 'cancelled'
1838            else:
1839                return 'slept in'
1840
1841        doer = doit()
1842        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1843        self.assertAlmostEqual(0.1, loop.time())
1844
1845    def test_task_cancel_waiter_future(self):
1846        fut = self.new_future(self.loop)
1847
1848        async def coro():
1849            await fut
1850
1851        task = self.new_task(self.loop, coro())
1852        test_utils.run_briefly(self.loop)
1853        self.assertIs(task._fut_waiter, fut)
1854
1855        task.cancel()
1856        test_utils.run_briefly(self.loop)
1857        self.assertRaises(
1858            asyncio.CancelledError, self.loop.run_until_complete, task)
1859        self.assertIsNone(task._fut_waiter)
1860        self.assertTrue(fut.cancelled())
1861
1862    def test_task_set_methods(self):
1863        async def notmuch():
1864            return 'ko'
1865
1866        gen = notmuch()
1867        task = self.new_task(self.loop, gen)
1868
1869        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1870            task.set_result('ok')
1871
1872        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1873            task.set_exception(ValueError())
1874
1875        self.assertEqual(
1876            self.loop.run_until_complete(task),
1877            'ko')
1878
1879    def test_step_result_future(self):
1880        # If coroutine returns future, task waits on this future.
1881
1882        class Fut(asyncio.Future):
1883            def __init__(self, *args, **kwds):
1884                self.cb_added = False
1885                super().__init__(*args, **kwds)
1886
1887            def add_done_callback(self, *args, **kwargs):
1888                self.cb_added = True
1889                super().add_done_callback(*args, **kwargs)
1890
1891        fut = Fut(loop=self.loop)
1892        result = None
1893
1894        async def wait_for_future():
1895            nonlocal result
1896            result = await fut
1897
1898        t = self.new_task(self.loop, wait_for_future())
1899        test_utils.run_briefly(self.loop)
1900        self.assertTrue(fut.cb_added)
1901
1902        res = object()
1903        fut.set_result(res)
1904        test_utils.run_briefly(self.loop)
1905        self.assertIs(res, result)
1906        self.assertTrue(t.done())
1907        self.assertIsNone(t.result())
1908
1909    def test_baseexception_during_cancel(self):
1910
1911        def gen():
1912            when = yield
1913            self.assertAlmostEqual(10.0, when)
1914            yield 0
1915
1916        loop = self.new_test_loop(gen)
1917
1918        async def sleeper():
1919            await asyncio.sleep(10)
1920
1921        base_exc = SystemExit()
1922
1923        async def notmutch():
1924            try:
1925                await sleeper()
1926            except asyncio.CancelledError:
1927                raise base_exc
1928
1929        task = self.new_task(loop, notmutch())
1930        test_utils.run_briefly(loop)
1931
1932        task.cancel()
1933        self.assertFalse(task.done())
1934
1935        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1936
1937        self.assertTrue(task.done())
1938        self.assertFalse(task.cancelled())
1939        self.assertIs(task.exception(), base_exc)
1940
1941    def test_iscoroutinefunction(self):
1942        def fn():
1943            pass
1944
1945        self.assertFalse(asyncio.iscoroutinefunction(fn))
1946
1947        def fn1():
1948            yield
1949        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1950
1951        async def fn2():
1952            pass
1953        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1954
1955        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1956        self.assertTrue(asyncio.iscoroutinefunction(mock.AsyncMock()))
1957
1958    def test_coroutine_non_gen_function(self):
1959        async def func():
1960            return 'test'
1961
1962        self.assertTrue(asyncio.iscoroutinefunction(func))
1963
1964        coro = func()
1965        self.assertTrue(asyncio.iscoroutine(coro))
1966
1967        res = self.loop.run_until_complete(coro)
1968        self.assertEqual(res, 'test')
1969
1970    def test_coroutine_non_gen_function_return_future(self):
1971        fut = self.new_future(self.loop)
1972
1973        async def func():
1974            return fut
1975
1976        async def coro():
1977            fut.set_result('test')
1978
1979        t1 = self.new_task(self.loop, func())
1980        t2 = self.new_task(self.loop, coro())
1981        res = self.loop.run_until_complete(t1)
1982        self.assertEqual(res, fut)
1983        self.assertIsNone(t2.result())
1984
1985    def test_current_task(self):
1986        self.assertIsNone(asyncio.current_task(loop=self.loop))
1987
1988        async def coro(loop):
1989            self.assertIs(asyncio.current_task(), task)
1990
1991            self.assertIs(asyncio.current_task(None), task)
1992            self.assertIs(asyncio.current_task(), task)
1993
1994        task = self.new_task(self.loop, coro(self.loop))
1995        self.loop.run_until_complete(task)
1996        self.assertIsNone(asyncio.current_task(loop=self.loop))
1997
1998    def test_current_task_with_interleaving_tasks(self):
1999        self.assertIsNone(asyncio.current_task(loop=self.loop))
2000
2001        fut1 = self.new_future(self.loop)
2002        fut2 = self.new_future(self.loop)
2003
2004        async def coro1(loop):
2005            self.assertTrue(asyncio.current_task() is task1)
2006            await fut1
2007            self.assertTrue(asyncio.current_task() is task1)
2008            fut2.set_result(True)
2009
2010        async def coro2(loop):
2011            self.assertTrue(asyncio.current_task() is task2)
2012            fut1.set_result(True)
2013            await fut2
2014            self.assertTrue(asyncio.current_task() is task2)
2015
2016        task1 = self.new_task(self.loop, coro1(self.loop))
2017        task2 = self.new_task(self.loop, coro2(self.loop))
2018
2019        self.loop.run_until_complete(asyncio.wait((task1, task2)))
2020        self.assertIsNone(asyncio.current_task(loop=self.loop))
2021
2022    # Some thorough tests for cancellation propagation through
2023    # coroutines, tasks and wait().
2024
2025    def test_yield_future_passes_cancel(self):
2026        # Cancelling outer() cancels inner() cancels waiter.
2027        proof = 0
2028        waiter = self.new_future(self.loop)
2029
2030        async def inner():
2031            nonlocal proof
2032            try:
2033                await waiter
2034            except asyncio.CancelledError:
2035                proof += 1
2036                raise
2037            else:
2038                self.fail('got past sleep() in inner()')
2039
2040        async def outer():
2041            nonlocal proof
2042            try:
2043                await inner()
2044            except asyncio.CancelledError:
2045                proof += 100  # Expect this path.
2046            else:
2047                proof += 10
2048
2049        f = asyncio.ensure_future(outer(), loop=self.loop)
2050        test_utils.run_briefly(self.loop)
2051        f.cancel()
2052        self.loop.run_until_complete(f)
2053        self.assertEqual(proof, 101)
2054        self.assertTrue(waiter.cancelled())
2055
2056    def test_yield_wait_does_not_shield_cancel(self):
2057        # Cancelling outer() makes wait() return early, leaves inner()
2058        # running.
2059        proof = 0
2060        waiter = self.new_future(self.loop)
2061
2062        async def inner():
2063            nonlocal proof
2064            await waiter
2065            proof += 1
2066
2067        async def outer():
2068            nonlocal proof
2069            with self.assertWarns(DeprecationWarning):
2070                d, p = await asyncio.wait([asyncio.create_task(inner())])
2071            proof += 100
2072
2073        f = asyncio.ensure_future(outer(), loop=self.loop)
2074        test_utils.run_briefly(self.loop)
2075        f.cancel()
2076        self.assertRaises(
2077            asyncio.CancelledError, self.loop.run_until_complete, f)
2078        waiter.set_result(None)
2079        test_utils.run_briefly(self.loop)
2080        self.assertEqual(proof, 1)
2081
2082    def test_shield_result(self):
2083        inner = self.new_future(self.loop)
2084        outer = asyncio.shield(inner)
2085        inner.set_result(42)
2086        res = self.loop.run_until_complete(outer)
2087        self.assertEqual(res, 42)
2088
2089    def test_shield_exception(self):
2090        inner = self.new_future(self.loop)
2091        outer = asyncio.shield(inner)
2092        test_utils.run_briefly(self.loop)
2093        exc = RuntimeError('expected')
2094        inner.set_exception(exc)
2095        test_utils.run_briefly(self.loop)
2096        self.assertIs(outer.exception(), exc)
2097
2098    def test_shield_cancel_inner(self):
2099        inner = self.new_future(self.loop)
2100        outer = asyncio.shield(inner)
2101        test_utils.run_briefly(self.loop)
2102        inner.cancel()
2103        test_utils.run_briefly(self.loop)
2104        self.assertTrue(outer.cancelled())
2105
2106    def test_shield_cancel_outer(self):
2107        inner = self.new_future(self.loop)
2108        outer = asyncio.shield(inner)
2109        test_utils.run_briefly(self.loop)
2110        outer.cancel()
2111        test_utils.run_briefly(self.loop)
2112        self.assertTrue(outer.cancelled())
2113        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
2114
2115    def test_shield_shortcut(self):
2116        fut = self.new_future(self.loop)
2117        fut.set_result(42)
2118        res = self.loop.run_until_complete(asyncio.shield(fut))
2119        self.assertEqual(res, 42)
2120
2121    def test_shield_effect(self):
2122        # Cancelling outer() does not affect inner().
2123        proof = 0
2124        waiter = self.new_future(self.loop)
2125
2126        async def inner():
2127            nonlocal proof
2128            await waiter
2129            proof += 1
2130
2131        async def outer():
2132            nonlocal proof
2133            await asyncio.shield(inner())
2134            proof += 100
2135
2136        f = asyncio.ensure_future(outer(), loop=self.loop)
2137        test_utils.run_briefly(self.loop)
2138        f.cancel()
2139        with self.assertRaises(asyncio.CancelledError):
2140            self.loop.run_until_complete(f)
2141        waiter.set_result(None)
2142        test_utils.run_briefly(self.loop)
2143        self.assertEqual(proof, 1)
2144
2145    def test_shield_gather(self):
2146        child1 = self.new_future(self.loop)
2147        child2 = self.new_future(self.loop)
2148        parent = asyncio.gather(child1, child2)
2149        outer = asyncio.shield(parent)
2150        test_utils.run_briefly(self.loop)
2151        outer.cancel()
2152        test_utils.run_briefly(self.loop)
2153        self.assertTrue(outer.cancelled())
2154        child1.set_result(1)
2155        child2.set_result(2)
2156        test_utils.run_briefly(self.loop)
2157        self.assertEqual(parent.result(), [1, 2])
2158
2159    def test_gather_shield(self):
2160        child1 = self.new_future(self.loop)
2161        child2 = self.new_future(self.loop)
2162        inner1 = asyncio.shield(child1)
2163        inner2 = asyncio.shield(child2)
2164        parent = asyncio.gather(inner1, inner2)
2165        test_utils.run_briefly(self.loop)
2166        parent.cancel()
2167        # This should cancel inner1 and inner2 but bot child1 and child2.
2168        test_utils.run_briefly(self.loop)
2169        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
2170        self.assertTrue(inner1.cancelled())
2171        self.assertTrue(inner2.cancelled())
2172        child1.set_result(1)
2173        child2.set_result(2)
2174        test_utils.run_briefly(self.loop)
2175
2176    def test_shield_coroutine_without_loop(self):
2177        async def coro():
2178            return 42
2179
2180        inner = coro()
2181        self.addCleanup(inner.close)
2182        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
2183            asyncio.shield(inner)
2184
2185    def test_shield_coroutine_use_running_loop(self):
2186        async def coro():
2187            return 42
2188
2189        async def test():
2190            return asyncio.shield(coro())
2191        outer = self.loop.run_until_complete(test())
2192        self.assertEqual(outer._loop, self.loop)
2193        res = self.loop.run_until_complete(outer)
2194        self.assertEqual(res, 42)
2195
2196    def test_shield_coroutine_use_global_loop(self):
2197        # Deprecated in 3.10, undeprecated in 3.12
2198        async def coro():
2199            return 42
2200
2201        asyncio.set_event_loop(self.loop)
2202        self.addCleanup(asyncio.set_event_loop, None)
2203        outer = asyncio.shield(coro())
2204        self.assertEqual(outer._loop, self.loop)
2205        res = self.loop.run_until_complete(outer)
2206        self.assertEqual(res, 42)
2207
2208    def test_as_completed_invalid_args(self):
2209        # as_completed() expects a list of futures, not a future instance
2210        # TypeError should be raised either on iterator construction or first
2211        # iteration
2212
2213        # Plain iterator
2214        fut = self.new_future(self.loop)
2215        with self.assertRaises(TypeError):
2216            iterator = asyncio.as_completed(fut)
2217            next(iterator)
2218        coro = coroutine_function()
2219        with self.assertRaises(TypeError):
2220            iterator = asyncio.as_completed(coro)
2221            next(iterator)
2222        coro.close()
2223
2224        # Async iterator
2225        async def try_async_iterator(aw):
2226            async for f in asyncio.as_completed(aw):
2227                break
2228
2229        fut = self.new_future(self.loop)
2230        with self.assertRaises(TypeError):
2231            self.loop.run_until_complete(try_async_iterator(fut))
2232        coro = coroutine_function()
2233        with self.assertRaises(TypeError):
2234            self.loop.run_until_complete(try_async_iterator(coro))
2235        coro.close()
2236
2237    def test_wait_invalid_args(self):
2238        fut = self.new_future(self.loop)
2239
2240        # wait() expects a list of futures, not a future instance
2241        self.assertRaises(TypeError, self.loop.run_until_complete,
2242            asyncio.wait(fut))
2243        coro = coroutine_function()
2244        self.assertRaises(TypeError, self.loop.run_until_complete,
2245            asyncio.wait(coro))
2246        coro.close()
2247
2248        # wait() expects at least a future
2249        self.assertRaises(ValueError, self.loop.run_until_complete,
2250            asyncio.wait([]))
2251
2252    def test_log_destroyed_pending_task(self):
2253        Task = self.__class__.Task
2254
2255        async def kill_me(loop):
2256            future = self.new_future(loop)
2257            await future
2258            # at this point, the only reference to kill_me() task is
2259            # the Task._wakeup() method in future._callbacks
2260            raise Exception("code never reached")
2261
2262        mock_handler = mock.Mock()
2263        self.loop.set_debug(True)
2264        self.loop.set_exception_handler(mock_handler)
2265
2266        # schedule the task
2267        coro = kill_me(self.loop)
2268        task = asyncio.ensure_future(coro, loop=self.loop)
2269
2270        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2271
2272        asyncio.set_event_loop(None)
2273
2274        # execute the task so it waits for future
2275        self.loop._run_once()
2276        self.assertEqual(len(self.loop._ready), 0)
2277
2278        coro = None
2279        source_traceback = task._source_traceback
2280        task = None
2281
2282        # no more reference to kill_me() task: the task is destroyed by the GC
2283        support.gc_collect()
2284
2285        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2286
2287        mock_handler.assert_called_with(self.loop, {
2288            'message': 'Task was destroyed but it is pending!',
2289            'task': mock.ANY,
2290            'source_traceback': source_traceback,
2291        })
2292        mock_handler.reset_mock()
2293
2294    @mock.patch('asyncio.base_events.logger')
2295    def test_tb_logger_not_called_after_cancel(self, m_log):
2296        loop = asyncio.new_event_loop()
2297        self.set_event_loop(loop)
2298
2299        async def coro():
2300            raise TypeError
2301
2302        async def runner():
2303            task = self.new_task(loop, coro())
2304            await asyncio.sleep(0.05)
2305            task.cancel()
2306            task = None
2307
2308        loop.run_until_complete(runner())
2309        self.assertFalse(m_log.error.called)
2310
2311    def test_task_source_traceback(self):
2312        self.loop.set_debug(True)
2313
2314        task = self.new_task(self.loop, coroutine_function())
2315        lineno = sys._getframe().f_lineno - 1
2316        self.assertIsInstance(task._source_traceback, list)
2317        self.assertEqual(task._source_traceback[-2][:3],
2318                         (__file__,
2319                          lineno,
2320                          'test_task_source_traceback'))
2321        self.loop.run_until_complete(task)
2322
2323    def test_cancel_gather_1(self):
2324        """Ensure that a gathering future refuses to be cancelled once all
2325        children are done"""
2326        loop = asyncio.new_event_loop()
2327        self.addCleanup(loop.close)
2328
2329        fut = self.new_future(loop)
2330        async def create():
2331            # The indirection fut->child_coro is needed since otherwise the
2332            # gathering task is done at the same time as the child future
2333            async def child_coro():
2334                return await fut
2335            gather_future = asyncio.gather(child_coro())
2336            return asyncio.ensure_future(gather_future)
2337        gather_task = loop.run_until_complete(create())
2338
2339        cancel_result = None
2340        def cancelling_callback(_):
2341            nonlocal cancel_result
2342            cancel_result = gather_task.cancel()
2343        fut.add_done_callback(cancelling_callback)
2344
2345        fut.set_result(42) # calls the cancelling_callback after fut is done()
2346
2347        # At this point the task should complete.
2348        loop.run_until_complete(gather_task)
2349
2350        # Python issue #26923: asyncio.gather drops cancellation
2351        self.assertEqual(cancel_result, False)
2352        self.assertFalse(gather_task.cancelled())
2353        self.assertEqual(gather_task.result(), [42])
2354
2355    def test_cancel_gather_2(self):
2356        cases = [
2357            ((), ()),
2358            ((None,), ()),
2359            (('my message',), ('my message',)),
2360            # Non-string values should roundtrip.
2361            ((5,), (5,)),
2362        ]
2363        for cancel_args, expected_args in cases:
2364            with self.subTest(cancel_args=cancel_args):
2365                loop = asyncio.new_event_loop()
2366                self.addCleanup(loop.close)
2367
2368                async def test():
2369                    time = 0
2370                    while True:
2371                        time += 0.05
2372                        await asyncio.gather(asyncio.sleep(0.05),
2373                                             return_exceptions=True)
2374                        if time > 1:
2375                            return
2376
2377                async def main():
2378                    qwe = self.new_task(loop, test())
2379                    await asyncio.sleep(0.2)
2380                    qwe.cancel(*cancel_args)
2381                    await qwe
2382
2383                try:
2384                    loop.run_until_complete(main())
2385                except asyncio.CancelledError as exc:
2386                    self.assertEqual(exc.args, expected_args)
2387                    actual = get_innermost_context(exc)
2388                    self.assertEqual(
2389                        actual,
2390                        (asyncio.CancelledError, expected_args, 0),
2391                    )
2392                else:
2393                    self.fail(
2394                        'gather() does not propagate CancelledError '
2395                        'raised by inner task to the gather() caller.'
2396                    )
2397
2398    def test_exception_traceback(self):
2399        # See http://bugs.python.org/issue28843
2400
2401        async def foo():
2402            1 / 0
2403
2404        async def main():
2405            task = self.new_task(self.loop, foo())
2406            await asyncio.sleep(0)  # skip one loop iteration
2407            self.assertIsNotNone(task.exception().__traceback__)
2408
2409        self.loop.run_until_complete(main())
2410
2411    @mock.patch('asyncio.base_events.logger')
2412    def test_error_in_call_soon(self, m_log):
2413        def call_soon(callback, *args, **kwargs):
2414            raise ValueError
2415        self.loop.call_soon = call_soon
2416
2417        async def coro():
2418            pass
2419
2420        self.assertFalse(m_log.error.called)
2421
2422        with self.assertRaises(ValueError):
2423            gen = coro()
2424            try:
2425                self.new_task(self.loop, gen)
2426            finally:
2427                gen.close()
2428        gc.collect()  # For PyPy or other GCs.
2429
2430        self.assertTrue(m_log.error.called)
2431        message = m_log.error.call_args[0][0]
2432        self.assertIn('Task was destroyed but it is pending', message)
2433
2434        self.assertEqual(asyncio.all_tasks(self.loop), set())
2435
2436    def test_create_task_with_noncoroutine(self):
2437        with self.assertRaisesRegex(TypeError,
2438                                    "a coroutine was expected, got 123"):
2439            self.new_task(self.loop, 123)
2440
2441        # test it for the second time to ensure that caching
2442        # in asyncio.iscoroutine() doesn't break things.
2443        with self.assertRaisesRegex(TypeError,
2444                                    "a coroutine was expected, got 123"):
2445            self.new_task(self.loop, 123)
2446
2447    def test_create_task_with_async_function(self):
2448
2449        async def coro():
2450            pass
2451
2452        task = self.new_task(self.loop, coro())
2453        self.assertIsInstance(task, self.Task)
2454        self.loop.run_until_complete(task)
2455
2456        # test it for the second time to ensure that caching
2457        # in asyncio.iscoroutine() doesn't break things.
2458        task = self.new_task(self.loop, coro())
2459        self.assertIsInstance(task, self.Task)
2460        self.loop.run_until_complete(task)
2461
2462    def test_create_task_with_asynclike_function(self):
2463        task = self.new_task(self.loop, CoroLikeObject())
2464        self.assertIsInstance(task, self.Task)
2465        self.assertEqual(self.loop.run_until_complete(task), 42)
2466
2467        # test it for the second time to ensure that caching
2468        # in asyncio.iscoroutine() doesn't break things.
2469        task = self.new_task(self.loop, CoroLikeObject())
2470        self.assertIsInstance(task, self.Task)
2471        self.assertEqual(self.loop.run_until_complete(task), 42)
2472
2473    def test_bare_create_task(self):
2474
2475        async def inner():
2476            return 1
2477
2478        async def coro():
2479            task = asyncio.create_task(inner())
2480            self.assertIsInstance(task, self.Task)
2481            ret = await task
2482            self.assertEqual(1, ret)
2483
2484        self.loop.run_until_complete(coro())
2485
2486    def test_bare_create_named_task(self):
2487
2488        async def coro_noop():
2489            pass
2490
2491        async def coro():
2492            task = asyncio.create_task(coro_noop(), name='No-op')
2493            self.assertEqual(task.get_name(), 'No-op')
2494            await task
2495
2496        self.loop.run_until_complete(coro())
2497
2498    def test_context_1(self):
2499        cvar = contextvars.ContextVar('cvar', default='nope')
2500
2501        async def sub():
2502            await asyncio.sleep(0.01)
2503            self.assertEqual(cvar.get(), 'nope')
2504            cvar.set('something else')
2505
2506        async def main():
2507            self.assertEqual(cvar.get(), 'nope')
2508            subtask = self.new_task(loop, sub())
2509            cvar.set('yes')
2510            self.assertEqual(cvar.get(), 'yes')
2511            await subtask
2512            self.assertEqual(cvar.get(), 'yes')
2513
2514        loop = asyncio.new_event_loop()
2515        try:
2516            task = self.new_task(loop, main())
2517            loop.run_until_complete(task)
2518        finally:
2519            loop.close()
2520
2521    def test_context_2(self):
2522        cvar = contextvars.ContextVar('cvar', default='nope')
2523
2524        async def main():
2525            def fut_on_done(fut):
2526                # This change must not pollute the context
2527                # of the "main()" task.
2528                cvar.set('something else')
2529
2530            self.assertEqual(cvar.get(), 'nope')
2531
2532            for j in range(2):
2533                fut = self.new_future(loop)
2534                fut.add_done_callback(fut_on_done)
2535                cvar.set(f'yes{j}')
2536                loop.call_soon(fut.set_result, None)
2537                await fut
2538                self.assertEqual(cvar.get(), f'yes{j}')
2539
2540                for i in range(3):
2541                    # Test that task passed its context to add_done_callback:
2542                    cvar.set(f'yes{i}-{j}')
2543                    await asyncio.sleep(0.001)
2544                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2545
2546        loop = asyncio.new_event_loop()
2547        try:
2548            task = self.new_task(loop, main())
2549            loop.run_until_complete(task)
2550        finally:
2551            loop.close()
2552
2553        self.assertEqual(cvar.get(), 'nope')
2554
2555    def test_context_3(self):
2556        # Run 100 Tasks in parallel, each modifying cvar.
2557
2558        cvar = contextvars.ContextVar('cvar', default=-1)
2559
2560        async def sub(num):
2561            for i in range(10):
2562                cvar.set(num + i)
2563                await asyncio.sleep(random.uniform(0.001, 0.05))
2564                self.assertEqual(cvar.get(), num + i)
2565
2566        async def main():
2567            tasks = []
2568            for i in range(100):
2569                task = loop.create_task(sub(random.randint(0, 10)))
2570                tasks.append(task)
2571
2572            await asyncio.gather(*tasks)
2573
2574        loop = asyncio.new_event_loop()
2575        try:
2576            loop.run_until_complete(main())
2577        finally:
2578            loop.close()
2579
2580        self.assertEqual(cvar.get(), -1)
2581
2582    def test_context_4(self):
2583        cvar = contextvars.ContextVar('cvar')
2584
2585        async def coro(val):
2586            await asyncio.sleep(0)
2587            cvar.set(val)
2588
2589        async def main():
2590            ret = []
2591            ctx = contextvars.copy_context()
2592            ret.append(ctx.get(cvar))
2593            t1 = self.new_task(loop, coro(1), context=ctx)
2594            await t1
2595            ret.append(ctx.get(cvar))
2596            t2 = self.new_task(loop, coro(2), context=ctx)
2597            await t2
2598            ret.append(ctx.get(cvar))
2599            return ret
2600
2601        loop = asyncio.new_event_loop()
2602        try:
2603            task = self.new_task(loop, main())
2604            ret = loop.run_until_complete(task)
2605        finally:
2606            loop.close()
2607
2608        self.assertEqual([None, 1, 2], ret)
2609
2610    def test_context_5(self):
2611        cvar = contextvars.ContextVar('cvar')
2612
2613        async def coro(val):
2614            await asyncio.sleep(0)
2615            cvar.set(val)
2616
2617        async def main():
2618            ret = []
2619            ctx = contextvars.copy_context()
2620            ret.append(ctx.get(cvar))
2621            t1 = asyncio.create_task(coro(1), context=ctx)
2622            await t1
2623            ret.append(ctx.get(cvar))
2624            t2 = asyncio.create_task(coro(2), context=ctx)
2625            await t2
2626            ret.append(ctx.get(cvar))
2627            return ret
2628
2629        loop = asyncio.new_event_loop()
2630        try:
2631            task = self.new_task(loop, main())
2632            ret = loop.run_until_complete(task)
2633        finally:
2634            loop.close()
2635
2636        self.assertEqual([None, 1, 2], ret)
2637
2638    def test_context_6(self):
2639        cvar = contextvars.ContextVar('cvar')
2640
2641        async def coro(val):
2642            await asyncio.sleep(0)
2643            cvar.set(val)
2644
2645        async def main():
2646            ret = []
2647            ctx = contextvars.copy_context()
2648            ret.append(ctx.get(cvar))
2649            t1 = loop.create_task(coro(1), context=ctx)
2650            await t1
2651            ret.append(ctx.get(cvar))
2652            t2 = loop.create_task(coro(2), context=ctx)
2653            await t2
2654            ret.append(ctx.get(cvar))
2655            return ret
2656
2657        loop = asyncio.new_event_loop()
2658        try:
2659            task = loop.create_task(main())
2660            ret = loop.run_until_complete(task)
2661        finally:
2662            loop.close()
2663
2664        self.assertEqual([None, 1, 2], ret)
2665
2666    def test_get_coro(self):
2667        loop = asyncio.new_event_loop()
2668        coro = coroutine_function()
2669        try:
2670            task = self.new_task(loop, coro)
2671            loop.run_until_complete(task)
2672            self.assertIs(task.get_coro(), coro)
2673        finally:
2674            loop.close()
2675
2676    def test_get_context(self):
2677        loop = asyncio.new_event_loop()
2678        coro = coroutine_function()
2679        context = contextvars.copy_context()
2680        try:
2681            task = self.new_task(loop, coro, context=context)
2682            loop.run_until_complete(task)
2683            self.assertIs(task.get_context(), context)
2684        finally:
2685            loop.close()
2686
2687    def test_proper_refcounts(self):
2688        # see: https://github.com/python/cpython/issues/126083
2689        class Break:
2690            def __str__(self):
2691                raise RuntimeError("break")
2692
2693        obj = object()
2694        initial_refcount = sys.getrefcount(obj)
2695
2696        coro = coroutine_function()
2697        loop = asyncio.new_event_loop()
2698        task = asyncio.Task.__new__(asyncio.Task)
2699
2700        for _ in range(5):
2701            with self.assertRaisesRegex(RuntimeError, 'break'):
2702                task.__init__(coro, loop=loop, context=obj, name=Break())
2703
2704        coro.close()
2705        del task
2706
2707        self.assertEqual(sys.getrefcount(obj), initial_refcount)
2708
2709
2710def add_subclass_tests(cls):
2711    BaseTask = cls.Task
2712    BaseFuture = cls.Future
2713
2714    if BaseTask is None or BaseFuture is None:
2715        return cls
2716
2717    class CommonFuture:
2718        def __init__(self, *args, **kwargs):
2719            self.calls = collections.defaultdict(lambda: 0)
2720            super().__init__(*args, **kwargs)
2721
2722        def add_done_callback(self, *args, **kwargs):
2723            self.calls['add_done_callback'] += 1
2724            return super().add_done_callback(*args, **kwargs)
2725
2726    class Task(CommonFuture, BaseTask):
2727        pass
2728
2729    class Future(CommonFuture, BaseFuture):
2730        pass
2731
2732    def test_subclasses_ctask_cfuture(self):
2733        fut = self.Future(loop=self.loop)
2734
2735        async def func():
2736            self.loop.call_soon(lambda: fut.set_result('spam'))
2737            return await fut
2738
2739        task = self.Task(func(), loop=self.loop)
2740
2741        result = self.loop.run_until_complete(task)
2742
2743        self.assertEqual(result, 'spam')
2744
2745        self.assertEqual(
2746            dict(task.calls),
2747            {'add_done_callback': 1})
2748
2749        self.assertEqual(
2750            dict(fut.calls),
2751            {'add_done_callback': 1})
2752
2753    # Add patched Task & Future back to the test case
2754    cls.Task = Task
2755    cls.Future = Future
2756
2757    # Add an extra unit-test
2758    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2759
2760    # Disable the "test_task_source_traceback" test
2761    # (the test is hardcoded for a particular call stack, which
2762    # is slightly different for Task subclasses)
2763    cls.test_task_source_traceback = None
2764
2765    return cls
2766
2767
2768class SetMethodsTest:
2769
2770    def test_set_result_causes_invalid_state(self):
2771        Future = type(self).Future
2772        self.loop.call_exception_handler = exc_handler = mock.Mock()
2773
2774        async def foo():
2775            await asyncio.sleep(0.1)
2776            return 10
2777
2778        coro = foo()
2779        task = self.new_task(self.loop, coro)
2780        Future.set_result(task, 'spam')
2781
2782        self.assertEqual(
2783            self.loop.run_until_complete(task),
2784            'spam')
2785
2786        exc_handler.assert_called_once()
2787        exc = exc_handler.call_args[0][0]['exception']
2788        with self.assertRaisesRegex(asyncio.InvalidStateError,
2789                                    r'step\(\): already done'):
2790            raise exc
2791
2792        coro.close()
2793
2794    def test_set_exception_causes_invalid_state(self):
2795        class MyExc(Exception):
2796            pass
2797
2798        Future = type(self).Future
2799        self.loop.call_exception_handler = exc_handler = mock.Mock()
2800
2801        async def foo():
2802            await asyncio.sleep(0.1)
2803            return 10
2804
2805        coro = foo()
2806        task = self.new_task(self.loop, coro)
2807        Future.set_exception(task, MyExc())
2808
2809        with self.assertRaises(MyExc):
2810            self.loop.run_until_complete(task)
2811
2812        exc_handler.assert_called_once()
2813        exc = exc_handler.call_args[0][0]['exception']
2814        with self.assertRaisesRegex(asyncio.InvalidStateError,
2815                                    r'step\(\): already done'):
2816            raise exc
2817
2818        coro.close()
2819
2820
2821@unittest.skipUnless(hasattr(futures, '_CFuture') and
2822                     hasattr(tasks, '_CTask'),
2823                     'requires the C _asyncio module')
2824class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2825                          test_utils.TestCase):
2826
2827    Task = getattr(tasks, '_CTask', None)
2828    Future = getattr(futures, '_CFuture', None)
2829
2830    @support.refcount_test
2831    def test_refleaks_in_task___init__(self):
2832        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2833        async def coro():
2834            pass
2835        task = self.new_task(self.loop, coro())
2836        self.loop.run_until_complete(task)
2837        refs_before = gettotalrefcount()
2838        for i in range(100):
2839            task.__init__(coro(), loop=self.loop)
2840            self.loop.run_until_complete(task)
2841        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2842
2843    def test_del__log_destroy_pending_segfault(self):
2844        async def coro():
2845            pass
2846        task = self.new_task(self.loop, coro())
2847        self.loop.run_until_complete(task)
2848        with self.assertRaises(AttributeError):
2849            del task._log_destroy_pending
2850
2851
2852@unittest.skipUnless(hasattr(futures, '_CFuture') and
2853                     hasattr(tasks, '_CTask'),
2854                     'requires the C _asyncio module')
2855@add_subclass_tests
2856class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2857
2858    Task = getattr(tasks, '_CTask', None)
2859    Future = getattr(futures, '_CFuture', None)
2860
2861
2862@unittest.skipUnless(hasattr(tasks, '_CTask'),
2863                     'requires the C _asyncio module')
2864@add_subclass_tests
2865class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2866
2867    Task = getattr(tasks, '_CTask', None)
2868    Future = futures._PyFuture
2869
2870
2871@unittest.skipUnless(hasattr(futures, '_CFuture'),
2872                     'requires the C _asyncio module')
2873@add_subclass_tests
2874class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
2875
2876    Future = getattr(futures, '_CFuture', None)
2877    Task = tasks._PyTask
2878
2879
2880@unittest.skipUnless(hasattr(tasks, '_CTask'),
2881                     'requires the C _asyncio module')
2882class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2883
2884    Task = getattr(tasks, '_CTask', None)
2885    Future = futures._PyFuture
2886
2887
2888@unittest.skipUnless(hasattr(futures, '_CFuture'),
2889                     'requires the C _asyncio module')
2890class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
2891
2892    Task = tasks._PyTask
2893    Future = getattr(futures, '_CFuture', None)
2894
2895
2896class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
2897                            test_utils.TestCase):
2898
2899    Task = tasks._PyTask
2900    Future = futures._PyFuture
2901
2902
2903@add_subclass_tests
2904class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2905    Task = tasks._PyTask
2906    Future = futures._PyFuture
2907
2908
2909@unittest.skipUnless(hasattr(tasks, '_CTask'),
2910                     'requires the C _asyncio module')
2911class CTask_Future_Tests(test_utils.TestCase):
2912
2913    def test_foobar(self):
2914        class Fut(asyncio.Future):
2915            @property
2916            def get_loop(self):
2917                raise AttributeError
2918
2919        async def coro():
2920            await fut
2921            return 'spam'
2922
2923        self.loop = asyncio.new_event_loop()
2924        try:
2925            fut = Fut(loop=self.loop)
2926            self.loop.call_later(0.1, fut.set_result, 1)
2927            task = self.loop.create_task(coro())
2928            res = self.loop.run_until_complete(task)
2929        finally:
2930            self.loop.close()
2931
2932        self.assertEqual(res, 'spam')
2933
2934
2935class BaseTaskIntrospectionTests:
2936    _register_task = None
2937    _unregister_task = None
2938    _enter_task = None
2939    _leave_task = None
2940
2941    def test__register_task_1(self):
2942        class TaskLike:
2943            @property
2944            def _loop(self):
2945                return loop
2946
2947            def done(self):
2948                return False
2949
2950        task = TaskLike()
2951        loop = mock.Mock()
2952
2953        self.assertEqual(asyncio.all_tasks(loop), set())
2954        self._register_task(task)
2955        self.assertEqual(asyncio.all_tasks(loop), {task})
2956        self._unregister_task(task)
2957
2958    def test__register_task_2(self):
2959        class TaskLike:
2960            def get_loop(self):
2961                return loop
2962
2963            def done(self):
2964                return False
2965
2966        task = TaskLike()
2967        loop = mock.Mock()
2968
2969        self.assertEqual(asyncio.all_tasks(loop), set())
2970        self._register_task(task)
2971        self.assertEqual(asyncio.all_tasks(loop), {task})
2972        self._unregister_task(task)
2973
2974    def test__register_task_3(self):
2975        class TaskLike:
2976            def get_loop(self):
2977                return loop
2978
2979            def done(self):
2980                return True
2981
2982        task = TaskLike()
2983        loop = mock.Mock()
2984
2985        self.assertEqual(asyncio.all_tasks(loop), set())
2986        self._register_task(task)
2987        self.assertEqual(asyncio.all_tasks(loop), set())
2988        self._unregister_task(task)
2989
2990    def test__enter_task(self):
2991        task = mock.Mock()
2992        loop = mock.Mock()
2993        self.assertIsNone(asyncio.current_task(loop))
2994        self._enter_task(loop, task)
2995        self.assertIs(asyncio.current_task(loop), task)
2996        self._leave_task(loop, task)
2997
2998    def test__enter_task_failure(self):
2999        task1 = mock.Mock()
3000        task2 = mock.Mock()
3001        loop = mock.Mock()
3002        self._enter_task(loop, task1)
3003        with self.assertRaises(RuntimeError):
3004            self._enter_task(loop, task2)
3005        self.assertIs(asyncio.current_task(loop), task1)
3006        self._leave_task(loop, task1)
3007
3008    def test__leave_task(self):
3009        task = mock.Mock()
3010        loop = mock.Mock()
3011        self._enter_task(loop, task)
3012        self._leave_task(loop, task)
3013        self.assertIsNone(asyncio.current_task(loop))
3014
3015    def test__leave_task_failure1(self):
3016        task1 = mock.Mock()
3017        task2 = mock.Mock()
3018        loop = mock.Mock()
3019        self._enter_task(loop, task1)
3020        with self.assertRaises(RuntimeError):
3021            self._leave_task(loop, task2)
3022        self.assertIs(asyncio.current_task(loop), task1)
3023        self._leave_task(loop, task1)
3024
3025    def test__leave_task_failure2(self):
3026        task = mock.Mock()
3027        loop = mock.Mock()
3028        with self.assertRaises(RuntimeError):
3029            self._leave_task(loop, task)
3030        self.assertIsNone(asyncio.current_task(loop))
3031
3032    def test__unregister_task(self):
3033        task = mock.Mock()
3034        loop = mock.Mock()
3035        task.get_loop = lambda: loop
3036        self._register_task(task)
3037        self._unregister_task(task)
3038        self.assertEqual(asyncio.all_tasks(loop), set())
3039
3040    def test__unregister_task_not_registered(self):
3041        task = mock.Mock()
3042        loop = mock.Mock()
3043        self._unregister_task(task)
3044        self.assertEqual(asyncio.all_tasks(loop), set())
3045
3046
3047class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3048    _register_task = staticmethod(tasks._py_register_task)
3049    _unregister_task = staticmethod(tasks._py_unregister_task)
3050    _enter_task = staticmethod(tasks._py_enter_task)
3051    _leave_task = staticmethod(tasks._py_leave_task)
3052
3053
3054@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
3055                     'requires the C _asyncio module')
3056class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3057    if hasattr(tasks, '_c_register_task'):
3058        _register_task = staticmethod(tasks._c_register_task)
3059        _unregister_task = staticmethod(tasks._c_unregister_task)
3060        _enter_task = staticmethod(tasks._c_enter_task)
3061        _leave_task = staticmethod(tasks._c_leave_task)
3062    else:
3063        _register_task = _unregister_task = _enter_task = _leave_task = None
3064
3065
3066class BaseCurrentLoopTests:
3067    current_task = None
3068
3069    def setUp(self):
3070        super().setUp()
3071        self.loop = asyncio.new_event_loop()
3072        self.set_event_loop(self.loop)
3073
3074    def new_task(self, coro):
3075        raise NotImplementedError
3076
3077    def test_current_task_no_running_loop(self):
3078        self.assertIsNone(self.current_task(loop=self.loop))
3079
3080    def test_current_task_no_running_loop_implicit(self):
3081        with self.assertRaisesRegex(RuntimeError, 'no running event loop'):
3082            self.current_task()
3083
3084    def test_current_task_with_implicit_loop(self):
3085        async def coro():
3086            self.assertIs(self.current_task(loop=self.loop), task)
3087
3088            self.assertIs(self.current_task(None), task)
3089            self.assertIs(self.current_task(), task)
3090
3091        task = self.new_task(coro())
3092        self.loop.run_until_complete(task)
3093        self.assertIsNone(self.current_task(loop=self.loop))
3094
3095
3096class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3097    current_task = staticmethod(tasks._py_current_task)
3098
3099    def new_task(self, coro):
3100        return tasks._PyTask(coro, loop=self.loop)
3101
3102
3103@unittest.skipUnless(hasattr(tasks, '_CTask') and
3104                     hasattr(tasks, '_c_current_task'),
3105                     'requires the C _asyncio module')
3106class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3107    if hasattr(tasks, '_c_current_task'):
3108        current_task = staticmethod(tasks._c_current_task)
3109    else:
3110        current_task = None
3111
3112    def new_task(self, coro):
3113        return getattr(tasks, '_CTask')(coro, loop=self.loop)
3114
3115
3116class GenericTaskTests(test_utils.TestCase):
3117
3118    def test_future_subclass(self):
3119        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
3120
3121    @support.cpython_only
3122    def test_asyncio_module_compiled(self):
3123        # Because of circular imports it's easy to make _asyncio
3124        # module non-importable.  This is a simple test that will
3125        # fail on systems where C modules were successfully compiled
3126        # (hence the test for _functools etc), but _asyncio somehow didn't.
3127        try:
3128            import _functools
3129            import _json
3130            import _pickle
3131        except ImportError:
3132            self.skipTest('C modules are not available')
3133        else:
3134            try:
3135                import _asyncio
3136            except ImportError:
3137                self.fail('_asyncio module is missing')
3138
3139
3140class GatherTestsBase:
3141
3142    def setUp(self):
3143        super().setUp()
3144        self.one_loop = self.new_test_loop()
3145        self.other_loop = self.new_test_loop()
3146        self.set_event_loop(self.one_loop, cleanup=False)
3147
3148    def _run_loop(self, loop):
3149        while loop._ready:
3150            test_utils.run_briefly(loop)
3151
3152    def _check_success(self, **kwargs):
3153        a, b, c = [self.one_loop.create_future() for i in range(3)]
3154        fut = self._gather(*self.wrap_futures(a, b, c), **kwargs)
3155        cb = test_utils.MockCallback()
3156        fut.add_done_callback(cb)
3157        b.set_result(1)
3158        a.set_result(2)
3159        self._run_loop(self.one_loop)
3160        self.assertEqual(cb.called, False)
3161        self.assertFalse(fut.done())
3162        c.set_result(3)
3163        self._run_loop(self.one_loop)
3164        cb.assert_called_once_with(fut)
3165        self.assertEqual(fut.result(), [2, 1, 3])
3166
3167    def test_success(self):
3168        self._check_success()
3169        self._check_success(return_exceptions=False)
3170
3171    def test_result_exception_success(self):
3172        self._check_success(return_exceptions=True)
3173
3174    def test_one_exception(self):
3175        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3176        fut = self._gather(*self.wrap_futures(a, b, c, d, e))
3177        cb = test_utils.MockCallback()
3178        fut.add_done_callback(cb)
3179        exc = ZeroDivisionError()
3180        a.set_result(1)
3181        b.set_exception(exc)
3182        self._run_loop(self.one_loop)
3183        self.assertTrue(fut.done())
3184        cb.assert_called_once_with(fut)
3185        self.assertIs(fut.exception(), exc)
3186        # Does nothing
3187        c.set_result(3)
3188        d.cancel()
3189        e.set_exception(RuntimeError())
3190        e.exception()
3191
3192    def test_return_exceptions(self):
3193        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
3194        fut = self._gather(*self.wrap_futures(a, b, c, d),
3195                           return_exceptions=True)
3196        cb = test_utils.MockCallback()
3197        fut.add_done_callback(cb)
3198        exc = ZeroDivisionError()
3199        exc2 = RuntimeError()
3200        b.set_result(1)
3201        c.set_exception(exc)
3202        a.set_result(3)
3203        self._run_loop(self.one_loop)
3204        self.assertFalse(fut.done())
3205        d.set_exception(exc2)
3206        self._run_loop(self.one_loop)
3207        self.assertTrue(fut.done())
3208        cb.assert_called_once_with(fut)
3209        self.assertEqual(fut.result(), [3, 1, exc, exc2])
3210
3211    def test_env_var_debug(self):
3212        code = '\n'.join((
3213            'import asyncio.coroutines',
3214            'print(asyncio.coroutines._is_debug_mode())'))
3215
3216        # Test with -E to not fail if the unit test was run with
3217        # PYTHONASYNCIODEBUG set to a non-empty string
3218        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
3219        self.assertEqual(stdout.rstrip(), b'False')
3220
3221        sts, stdout, stderr = assert_python_ok('-c', code,
3222                                               PYTHONASYNCIODEBUG='',
3223                                               PYTHONDEVMODE='')
3224        self.assertEqual(stdout.rstrip(), b'False')
3225
3226        sts, stdout, stderr = assert_python_ok('-c', code,
3227                                               PYTHONASYNCIODEBUG='1',
3228                                               PYTHONDEVMODE='')
3229        self.assertEqual(stdout.rstrip(), b'True')
3230
3231        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
3232                                               PYTHONASYNCIODEBUG='1',
3233                                               PYTHONDEVMODE='')
3234        self.assertEqual(stdout.rstrip(), b'False')
3235
3236        # -X dev
3237        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
3238                                               '-c', code)
3239        self.assertEqual(stdout.rstrip(), b'True')
3240
3241
3242class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
3243
3244    def wrap_futures(self, *futures):
3245        return futures
3246
3247    def _gather(self, *args, **kwargs):
3248        return asyncio.gather(*args, **kwargs)
3249
3250    def test_constructor_empty_sequence_without_loop(self):
3251        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
3252            asyncio.gather()
3253
3254    def test_constructor_empty_sequence_use_running_loop(self):
3255        async def gather():
3256            return asyncio.gather()
3257        fut = self.one_loop.run_until_complete(gather())
3258        self.assertIsInstance(fut, asyncio.Future)
3259        self.assertIs(fut._loop, self.one_loop)
3260        self._run_loop(self.one_loop)
3261        self.assertTrue(fut.done())
3262        self.assertEqual(fut.result(), [])
3263
3264    def test_constructor_empty_sequence_use_global_loop(self):
3265        # Deprecated in 3.10, undeprecated in 3.12
3266        asyncio.set_event_loop(self.one_loop)
3267        self.addCleanup(asyncio.set_event_loop, None)
3268        fut = asyncio.gather()
3269        self.assertIsInstance(fut, asyncio.Future)
3270        self.assertIs(fut._loop, self.one_loop)
3271        self._run_loop(self.one_loop)
3272        self.assertTrue(fut.done())
3273        self.assertEqual(fut.result(), [])
3274
3275    def test_constructor_heterogenous_futures(self):
3276        fut1 = self.one_loop.create_future()
3277        fut2 = self.other_loop.create_future()
3278        with self.assertRaises(ValueError):
3279            asyncio.gather(fut1, fut2)
3280
3281    def test_constructor_homogenous_futures(self):
3282        children = [self.other_loop.create_future() for i in range(3)]
3283        fut = asyncio.gather(*children)
3284        self.assertIs(fut._loop, self.other_loop)
3285        self._run_loop(self.other_loop)
3286        self.assertFalse(fut.done())
3287        fut = asyncio.gather(*children)
3288        self.assertIs(fut._loop, self.other_loop)
3289        self._run_loop(self.other_loop)
3290        self.assertFalse(fut.done())
3291
3292    def test_one_cancellation(self):
3293        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3294        fut = asyncio.gather(a, b, c, d, e)
3295        cb = test_utils.MockCallback()
3296        fut.add_done_callback(cb)
3297        a.set_result(1)
3298        b.cancel()
3299        self._run_loop(self.one_loop)
3300        self.assertTrue(fut.done())
3301        cb.assert_called_once_with(fut)
3302        self.assertFalse(fut.cancelled())
3303        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3304        # Does nothing
3305        c.set_result(3)
3306        d.cancel()
3307        e.set_exception(RuntimeError())
3308        e.exception()
3309
3310    def test_result_exception_one_cancellation(self):
3311        a, b, c, d, e, f = [self.one_loop.create_future()
3312                            for i in range(6)]
3313        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3314        cb = test_utils.MockCallback()
3315        fut.add_done_callback(cb)
3316        a.set_result(1)
3317        zde = ZeroDivisionError()
3318        b.set_exception(zde)
3319        c.cancel()
3320        self._run_loop(self.one_loop)
3321        self.assertFalse(fut.done())
3322        d.set_result(3)
3323        e.cancel()
3324        rte = RuntimeError()
3325        f.set_exception(rte)
3326        res = self.one_loop.run_until_complete(fut)
3327        self.assertIsInstance(res[2], asyncio.CancelledError)
3328        self.assertIsInstance(res[4], asyncio.CancelledError)
3329        res[2] = res[4] = None
3330        self.assertEqual(res, [1, zde, None, 3, None, rte])
3331        cb.assert_called_once_with(fut)
3332
3333
3334class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3335
3336    def wrap_futures(self, *futures):
3337        coros = []
3338        for fut in futures:
3339            async def coro(fut=fut):
3340                return await fut
3341            coros.append(coro())
3342        return coros
3343
3344    def _gather(self, *args, **kwargs):
3345        async def coro():
3346            return asyncio.gather(*args, **kwargs)
3347        return self.one_loop.run_until_complete(coro())
3348
3349    def test_constructor_without_loop(self):
3350        async def coro():
3351            return 'abc'
3352        gen1 = coro()
3353        self.addCleanup(gen1.close)
3354        gen2 = coro()
3355        self.addCleanup(gen2.close)
3356        with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
3357            asyncio.gather(gen1, gen2)
3358
3359    def test_constructor_use_running_loop(self):
3360        async def coro():
3361            return 'abc'
3362        gen1 = coro()
3363        gen2 = coro()
3364        async def gather():
3365            return asyncio.gather(gen1, gen2)
3366        fut = self.one_loop.run_until_complete(gather())
3367        self.assertIs(fut._loop, self.one_loop)
3368        self.one_loop.run_until_complete(fut)
3369
3370    def test_constructor_use_global_loop(self):
3371        # Deprecated in 3.10, undeprecated in 3.12
3372        async def coro():
3373            return 'abc'
3374        asyncio.set_event_loop(self.other_loop)
3375        self.addCleanup(asyncio.set_event_loop, None)
3376        gen1 = coro()
3377        gen2 = coro()
3378        fut = asyncio.gather(gen1, gen2)
3379        self.assertIs(fut._loop, self.other_loop)
3380        self.other_loop.run_until_complete(fut)
3381
3382    def test_duplicate_coroutines(self):
3383        async def coro(s):
3384            return s
3385        c = coro('abc')
3386        fut = self._gather(c, c, coro('def'), c)
3387        self._run_loop(self.one_loop)
3388        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3389
3390    def test_cancellation_broadcast(self):
3391        # Cancelling outer() cancels all children.
3392        proof = 0
3393        waiter = self.one_loop.create_future()
3394
3395        async def inner():
3396            nonlocal proof
3397            await waiter
3398            proof += 1
3399
3400        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3401        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3402        gatherer = None
3403
3404        async def outer():
3405            nonlocal proof, gatherer
3406            gatherer = asyncio.gather(child1, child2)
3407            await gatherer
3408            proof += 100
3409
3410        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3411        test_utils.run_briefly(self.one_loop)
3412        self.assertTrue(f.cancel())
3413        with self.assertRaises(asyncio.CancelledError):
3414            self.one_loop.run_until_complete(f)
3415        self.assertFalse(gatherer.cancel())
3416        self.assertTrue(waiter.cancelled())
3417        self.assertTrue(child1.cancelled())
3418        self.assertTrue(child2.cancelled())
3419        test_utils.run_briefly(self.one_loop)
3420        self.assertEqual(proof, 0)
3421
3422    def test_exception_marking(self):
3423        # Test for the first line marked "Mark exception retrieved."
3424
3425        async def inner(f):
3426            await f
3427            raise RuntimeError('should not be ignored')
3428
3429        a = self.one_loop.create_future()
3430        b = self.one_loop.create_future()
3431
3432        async def outer():
3433            await asyncio.gather(inner(a), inner(b))
3434
3435        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3436        test_utils.run_briefly(self.one_loop)
3437        a.set_result(None)
3438        test_utils.run_briefly(self.one_loop)
3439        b.set_result(None)
3440        test_utils.run_briefly(self.one_loop)
3441        self.assertIsInstance(f.exception(), RuntimeError)
3442
3443    def test_issue46672(self):
3444        with mock.patch(
3445            'asyncio.base_events.BaseEventLoop.call_exception_handler',
3446        ):
3447            async def coro(s):
3448                return s
3449            c = coro('abc')
3450
3451            with self.assertRaises(TypeError):
3452                self._gather(c, {})
3453            self._run_loop(self.one_loop)
3454            # NameError should not happen:
3455            self.one_loop.call_exception_handler.assert_not_called()
3456
3457
3458class RunCoroutineThreadsafeTests(test_utils.TestCase):
3459    """Test case for asyncio.run_coroutine_threadsafe."""
3460
3461    def setUp(self):
3462        super().setUp()
3463        self.loop = asyncio.new_event_loop()
3464        self.set_event_loop(self.loop) # Will cleanup properly
3465
3466    async def add(self, a, b, fail=False, cancel=False):
3467        """Wait 0.05 second and return a + b."""
3468        await asyncio.sleep(0.05)
3469        if fail:
3470            raise RuntimeError("Fail!")
3471        if cancel:
3472            asyncio.current_task(self.loop).cancel()
3473            await asyncio.sleep(0)
3474        return a + b
3475
3476    def target(self, fail=False, cancel=False, timeout=None,
3477               advance_coro=False):
3478        """Run add coroutine in the event loop."""
3479        coro = self.add(1, 2, fail=fail, cancel=cancel)
3480        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3481        if advance_coro:
3482            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3483            # otherwise it spills errors and breaks **other** unittests, since
3484            # 'target' is interacting with threads.
3485
3486            # With this call, `coro` will be advanced.
3487            self.loop.call_soon_threadsafe(coro.send, None)
3488        try:
3489            return future.result(timeout)
3490        finally:
3491            future.done() or future.cancel()
3492
3493    def test_run_coroutine_threadsafe(self):
3494        """Test coroutine submission from a thread to an event loop."""
3495        future = self.loop.run_in_executor(None, self.target)
3496        result = self.loop.run_until_complete(future)
3497        self.assertEqual(result, 3)
3498
3499    def test_run_coroutine_threadsafe_with_exception(self):
3500        """Test coroutine submission from a thread to an event loop
3501        when an exception is raised."""
3502        future = self.loop.run_in_executor(None, self.target, True)
3503        with self.assertRaises(RuntimeError) as exc_context:
3504            self.loop.run_until_complete(future)
3505        self.assertIn("Fail!", exc_context.exception.args)
3506
3507    def test_run_coroutine_threadsafe_with_timeout(self):
3508        """Test coroutine submission from a thread to an event loop
3509        when a timeout is raised."""
3510        callback = lambda: self.target(timeout=0)
3511        future = self.loop.run_in_executor(None, callback)
3512        with self.assertRaises(asyncio.TimeoutError):
3513            self.loop.run_until_complete(future)
3514        test_utils.run_briefly(self.loop)
3515        # Check that there's no pending task (add has been cancelled)
3516        for task in asyncio.all_tasks(self.loop):
3517            self.assertTrue(task.done())
3518
3519    def test_run_coroutine_threadsafe_task_cancelled(self):
3520        """Test coroutine submission from a thread to an event loop
3521        when the task is cancelled."""
3522        callback = lambda: self.target(cancel=True)
3523        future = self.loop.run_in_executor(None, callback)
3524        with self.assertRaises(asyncio.CancelledError):
3525            self.loop.run_until_complete(future)
3526
3527    def test_run_coroutine_threadsafe_task_factory_exception(self):
3528        """Test coroutine submission from a thread to an event loop
3529        when the task factory raise an exception."""
3530
3531        def task_factory(loop, coro):
3532            raise NameError
3533
3534        run = self.loop.run_in_executor(
3535            None, lambda: self.target(advance_coro=True))
3536
3537        # Set exception handler
3538        callback = test_utils.MockCallback()
3539        self.loop.set_exception_handler(callback)
3540
3541        # Set corrupted task factory
3542        self.addCleanup(self.loop.set_task_factory,
3543                        self.loop.get_task_factory())
3544        self.loop.set_task_factory(task_factory)
3545
3546        # Run event loop
3547        with self.assertRaises(NameError) as exc_context:
3548            self.loop.run_until_complete(run)
3549
3550        # Check exceptions
3551        self.assertEqual(len(callback.call_args_list), 1)
3552        (loop, context), kwargs = callback.call_args
3553        self.assertEqual(context['exception'], exc_context.exception)
3554
3555
3556class SleepTests(test_utils.TestCase):
3557    def setUp(self):
3558        super().setUp()
3559        self.loop = asyncio.new_event_loop()
3560        self.set_event_loop(self.loop)
3561
3562    def tearDown(self):
3563        self.loop.close()
3564        self.loop = None
3565        super().tearDown()
3566
3567    def test_sleep_zero(self):
3568        result = 0
3569
3570        def inc_result(num):
3571            nonlocal result
3572            result += num
3573
3574        async def coro():
3575            self.loop.call_soon(inc_result, 1)
3576            self.assertEqual(result, 0)
3577            num = await asyncio.sleep(0, result=10)
3578            self.assertEqual(result, 1) # inc'ed by call_soon
3579            inc_result(num) # num should be 11
3580
3581        self.loop.run_until_complete(coro())
3582        self.assertEqual(result, 11)
3583
3584
3585class CompatibilityTests(test_utils.TestCase):
3586    # Tests for checking a bridge between old-styled coroutines
3587    # and async/await syntax
3588
3589    def setUp(self):
3590        super().setUp()
3591        self.loop = asyncio.new_event_loop()
3592        self.set_event_loop(self.loop)
3593
3594    def tearDown(self):
3595        self.loop.close()
3596        self.loop = None
3597        super().tearDown()
3598
3599
3600if __name__ == '__main__':
3601    unittest.main()
3602