• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import traceback
14import types
15import unittest
16import weakref
17from unittest import mock
18
19import asyncio
20from asyncio import coroutines
21from asyncio import futures
22from asyncio import tasks
23from test.test_asyncio import utils as test_utils
24from test import support
25from test.support.script_helper import assert_python_ok
26
27
28def tearDownModule():
29    asyncio.set_event_loop_policy(None)
30
31
32async def coroutine_function():
33    pass
34
35
36@contextlib.contextmanager
37def set_coroutine_debug(enabled):
38    coroutines = asyncio.coroutines
39
40    old_debug = coroutines._DEBUG
41    try:
42        coroutines._DEBUG = enabled
43        yield
44    finally:
45        coroutines._DEBUG = old_debug
46
47
48def format_coroutine(qualname, state, src, source_traceback, generator=False):
49    if generator:
50        state = '%s' % state
51    else:
52        state = '%s, defined' % state
53    if source_traceback is not None:
54        frame = source_traceback[-1]
55        return ('coro=<%s() %s at %s> created at %s:%s'
56                % (qualname, state, src, frame[0], frame[1]))
57    else:
58        return 'coro=<%s() %s at %s>' % (qualname, state, src)
59
60
61def get_innermost_context(exc):
62    """
63    Return information about the innermost exception context in the chain.
64    """
65    depth = 0
66    while True:
67        context = exc.__context__
68        if context is None:
69            break
70
71        exc = context
72        depth += 1
73
74    return (type(exc), exc.args, depth)
75
76
77class Dummy:
78
79    def __repr__(self):
80        return '<Dummy>'
81
82    def __call__(self, *args):
83        pass
84
85
86class CoroLikeObject:
87    def send(self, v):
88        raise StopIteration(42)
89
90    def throw(self, *exc):
91        pass
92
93    def close(self):
94        pass
95
96    def __await__(self):
97        return self
98
99
100# The following value can be used as a very small timeout:
101# it passes check "timeout > 0", but has almost
102# no effect on the test performance
103_EPSILON = 0.0001
104
105
106class BaseTaskTests:
107
108    Task = None
109    Future = None
110
111    def new_task(self, loop, coro, name='TestTask'):
112        return self.__class__.Task(coro, loop=loop, name=name)
113
114    def new_future(self, loop):
115        return self.__class__.Future(loop=loop)
116
117    def setUp(self):
118        super().setUp()
119        self.loop = self.new_test_loop()
120        self.loop.set_task_factory(self.new_task)
121        self.loop.create_future = lambda: self.new_future(self.loop)
122
123    def test_task_cancel_message_getter(self):
124        async def coro():
125            pass
126        t = self.new_task(self.loop, coro())
127        self.assertTrue(hasattr(t, '_cancel_message'))
128        self.assertEqual(t._cancel_message, None)
129
130        t.cancel('my message')
131        self.assertEqual(t._cancel_message, 'my message')
132
133        with self.assertRaises(asyncio.CancelledError):
134            self.loop.run_until_complete(t)
135
136    def test_task_cancel_message_setter(self):
137        async def coro():
138            pass
139        t = self.new_task(self.loop, coro())
140        t.cancel('my message')
141        t._cancel_message = 'my new message'
142        self.assertEqual(t._cancel_message, 'my new message')
143
144        with self.assertRaises(asyncio.CancelledError):
145            self.loop.run_until_complete(t)
146
147    def test_task_del_collect(self):
148        class Evil:
149            def __del__(self):
150                gc.collect()
151
152        async def run():
153            return Evil()
154
155        self.loop.run_until_complete(
156            asyncio.gather(*[
157                self.new_task(self.loop, run()) for _ in range(100)
158            ]))
159
160    def test_other_loop_future(self):
161        other_loop = asyncio.new_event_loop()
162        fut = self.new_future(other_loop)
163
164        async def run(fut):
165            await fut
166
167        try:
168            with self.assertRaisesRegex(RuntimeError,
169                                        r'Task .* got Future .* attached'):
170                self.loop.run_until_complete(run(fut))
171        finally:
172            other_loop.close()
173
174    def test_task_awaits_on_itself(self):
175
176        async def test():
177            await task
178
179        task = asyncio.ensure_future(test(), loop=self.loop)
180
181        with self.assertRaisesRegex(RuntimeError,
182                                    'Task cannot await on itself'):
183            self.loop.run_until_complete(task)
184
185    def test_task_class(self):
186        async def notmuch():
187            return 'ok'
188        t = self.new_task(self.loop, notmuch())
189        self.loop.run_until_complete(t)
190        self.assertTrue(t.done())
191        self.assertEqual(t.result(), 'ok')
192        self.assertIs(t._loop, self.loop)
193        self.assertIs(t.get_loop(), self.loop)
194
195        loop = asyncio.new_event_loop()
196        self.set_event_loop(loop)
197        t = self.new_task(loop, notmuch())
198        self.assertIs(t._loop, loop)
199        loop.run_until_complete(t)
200        loop.close()
201
202    def test_ensure_future_coroutine(self):
203        async def notmuch():
204            return 'ok'
205        t = asyncio.ensure_future(notmuch(), loop=self.loop)
206        self.assertIs(t._loop, self.loop)
207        self.loop.run_until_complete(t)
208        self.assertTrue(t.done())
209        self.assertEqual(t.result(), 'ok')
210
211        a = notmuch()
212        self.addCleanup(a.close)
213        with self.assertWarns(DeprecationWarning) as cm:
214            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
215                asyncio.ensure_future(a)
216        self.assertEqual(cm.warnings[0].filename, __file__)
217
218        async def test():
219            return asyncio.ensure_future(notmuch())
220        t = self.loop.run_until_complete(test())
221        self.assertIs(t._loop, self.loop)
222        self.loop.run_until_complete(t)
223        self.assertTrue(t.done())
224        self.assertEqual(t.result(), 'ok')
225
226        # Deprecated in 3.10
227        asyncio.set_event_loop(self.loop)
228        self.addCleanup(asyncio.set_event_loop, None)
229        with self.assertWarns(DeprecationWarning) as cm:
230            t = asyncio.ensure_future(notmuch())
231        self.assertEqual(cm.warnings[0].filename, __file__)
232        self.assertIs(t._loop, self.loop)
233        self.loop.run_until_complete(t)
234        self.assertTrue(t.done())
235        self.assertEqual(t.result(), 'ok')
236
237    def test_ensure_future_coroutine_2(self):
238        with self.assertWarns(DeprecationWarning):
239            @asyncio.coroutine
240            def notmuch():
241                return 'ok'
242        t = asyncio.ensure_future(notmuch(), loop=self.loop)
243        self.assertIs(t._loop, self.loop)
244        self.loop.run_until_complete(t)
245        self.assertTrue(t.done())
246        self.assertEqual(t.result(), 'ok')
247
248        a = notmuch()
249        self.addCleanup(a.close)
250        with self.assertWarns(DeprecationWarning) as cm:
251            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
252                asyncio.ensure_future(a)
253        self.assertEqual(cm.warnings[0].filename, __file__)
254
255        async def test():
256            return asyncio.ensure_future(notmuch())
257        t = self.loop.run_until_complete(test())
258        self.assertIs(t._loop, self.loop)
259        self.loop.run_until_complete(t)
260        self.assertTrue(t.done())
261        self.assertEqual(t.result(), 'ok')
262
263        # Deprecated in 3.10
264        asyncio.set_event_loop(self.loop)
265        self.addCleanup(asyncio.set_event_loop, None)
266        with self.assertWarns(DeprecationWarning) as cm:
267            t = asyncio.ensure_future(notmuch())
268        self.assertEqual(cm.warnings[0].filename, __file__)
269        self.assertIs(t._loop, self.loop)
270        self.loop.run_until_complete(t)
271        self.assertTrue(t.done())
272        self.assertEqual(t.result(), 'ok')
273
274    def test_ensure_future_future(self):
275        f_orig = self.new_future(self.loop)
276        f_orig.set_result('ko')
277
278        f = asyncio.ensure_future(f_orig)
279        self.loop.run_until_complete(f)
280        self.assertTrue(f.done())
281        self.assertEqual(f.result(), 'ko')
282        self.assertIs(f, f_orig)
283
284        loop = asyncio.new_event_loop()
285        self.set_event_loop(loop)
286
287        with self.assertRaises(ValueError):
288            f = asyncio.ensure_future(f_orig, loop=loop)
289
290        loop.close()
291
292        f = asyncio.ensure_future(f_orig, loop=self.loop)
293        self.assertIs(f, f_orig)
294
295    def test_ensure_future_task(self):
296        async def notmuch():
297            return 'ok'
298        t_orig = self.new_task(self.loop, notmuch())
299        t = asyncio.ensure_future(t_orig)
300        self.loop.run_until_complete(t)
301        self.assertTrue(t.done())
302        self.assertEqual(t.result(), 'ok')
303        self.assertIs(t, t_orig)
304
305        loop = asyncio.new_event_loop()
306        self.set_event_loop(loop)
307
308        with self.assertRaises(ValueError):
309            t = asyncio.ensure_future(t_orig, loop=loop)
310
311        loop.close()
312
313        t = asyncio.ensure_future(t_orig, loop=self.loop)
314        self.assertIs(t, t_orig)
315
316    def test_ensure_future_awaitable(self):
317        class Aw:
318            def __init__(self, coro):
319                self.coro = coro
320            def __await__(self):
321                return (yield from self.coro)
322
323        with self.assertWarns(DeprecationWarning):
324            @asyncio.coroutine
325            def coro():
326                return 'ok'
327
328        loop = asyncio.new_event_loop()
329        self.set_event_loop(loop)
330        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
331        loop.run_until_complete(fut)
332        self.assertEqual(fut.result(), 'ok')
333
334    def test_ensure_future_neither(self):
335        with self.assertRaises(TypeError):
336            asyncio.ensure_future('ok')
337
338    def test_ensure_future_error_msg(self):
339        loop = asyncio.new_event_loop()
340        f = self.new_future(self.loop)
341        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
342                                    'different loop than the one specified as '
343                                    'the loop argument'):
344            asyncio.ensure_future(f, loop=loop)
345        loop.close()
346
347    def test_get_stack(self):
348        T = None
349
350        async def foo():
351            await bar()
352
353        async def bar():
354            # test get_stack()
355            f = T.get_stack(limit=1)
356            try:
357                self.assertEqual(f[0].f_code.co_name, 'foo')
358            finally:
359                f = None
360
361            # test print_stack()
362            file = io.StringIO()
363            T.print_stack(limit=1, file=file)
364            file.seek(0)
365            tb = file.read()
366            self.assertRegex(tb, r'foo\(\) running')
367
368        async def runner():
369            nonlocal T
370            T = asyncio.ensure_future(foo(), loop=self.loop)
371            await T
372
373        self.loop.run_until_complete(runner())
374
375    def test_task_repr(self):
376        self.loop.set_debug(False)
377
378        async def notmuch():
379            return 'abc'
380
381        # test coroutine function
382        self.assertEqual(notmuch.__name__, 'notmuch')
383        self.assertRegex(notmuch.__qualname__,
384                         r'\w+.test_task_repr.<locals>.notmuch')
385        self.assertEqual(notmuch.__module__, __name__)
386
387        filename, lineno = test_utils.get_function_source(notmuch)
388        src = "%s:%s" % (filename, lineno)
389
390        # test coroutine object
391        gen = notmuch()
392        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
393        self.assertEqual(gen.__name__, 'notmuch')
394        self.assertEqual(gen.__qualname__, coro_qualname)
395
396        # test pending Task
397        t = self.new_task(self.loop, gen)
398        t.add_done_callback(Dummy())
399
400        coro = format_coroutine(coro_qualname, 'running', src,
401                                t._source_traceback, generator=True)
402        self.assertEqual(repr(t),
403                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
404
405        # test cancelling Task
406        t.cancel()  # Does not take immediate effect!
407        self.assertEqual(repr(t),
408                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
409
410        # test cancelled Task
411        self.assertRaises(asyncio.CancelledError,
412                          self.loop.run_until_complete, t)
413        coro = format_coroutine(coro_qualname, 'done', src,
414                                t._source_traceback)
415        self.assertEqual(repr(t),
416                         "<Task cancelled name='TestTask' %s>" % coro)
417
418        # test finished Task
419        t = self.new_task(self.loop, notmuch())
420        self.loop.run_until_complete(t)
421        coro = format_coroutine(coro_qualname, 'done', src,
422                                t._source_traceback)
423        self.assertEqual(repr(t),
424                         "<Task finished name='TestTask' %s result='abc'>" % coro)
425
426    def test_task_repr_autogenerated(self):
427        async def notmuch():
428            return 123
429
430        t1 = self.new_task(self.loop, notmuch(), None)
431        t2 = self.new_task(self.loop, notmuch(), None)
432        self.assertNotEqual(repr(t1), repr(t2))
433
434        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
435        self.assertIsNotNone(match1)
436        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
437        self.assertIsNotNone(match2)
438
439        # Autogenerated task names should have monotonically increasing numbers
440        self.assertLess(int(match1.group(1)), int(match2.group(1)))
441        self.loop.run_until_complete(t1)
442        self.loop.run_until_complete(t2)
443
444    def test_task_repr_name_not_str(self):
445        async def notmuch():
446            return 123
447
448        t = self.new_task(self.loop, notmuch())
449        t.set_name({6})
450        self.assertEqual(t.get_name(), '{6}')
451        self.loop.run_until_complete(t)
452
453    def test_task_repr_coro_decorator(self):
454        self.loop.set_debug(False)
455
456        with self.assertWarns(DeprecationWarning):
457            @asyncio.coroutine
458            def notmuch():
459                # notmuch() function doesn't use yield from: it will be wrapped by
460                # @coroutine decorator
461                return 123
462
463        # test coroutine function
464        self.assertEqual(notmuch.__name__, 'notmuch')
465        self.assertRegex(notmuch.__qualname__,
466                         r'\w+.test_task_repr_coro_decorator'
467                         r'\.<locals>\.notmuch')
468        self.assertEqual(notmuch.__module__, __name__)
469
470        # test coroutine object
471        gen = notmuch()
472        # On Python >= 3.5, generators now inherit the name of the
473        # function, as expected, and have a qualified name (__qualname__
474        # attribute).
475        coro_name = 'notmuch'
476        coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
477                         '.<locals>.notmuch')
478        self.assertEqual(gen.__name__, coro_name)
479        self.assertEqual(gen.__qualname__, coro_qualname)
480
481        # test repr(CoroWrapper)
482        if coroutines._DEBUG:
483            # format the coroutine object
484            if coroutines._DEBUG:
485                filename, lineno = test_utils.get_function_source(notmuch)
486                frame = gen._source_traceback[-1]
487                coro = ('%s() running, defined at %s:%s, created at %s:%s'
488                        % (coro_qualname, filename, lineno,
489                           frame[0], frame[1]))
490            else:
491                code = gen.gi_code
492                coro = ('%s() running at %s:%s'
493                        % (coro_qualname, code.co_filename,
494                           code.co_firstlineno))
495
496            self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
497
498        # test pending Task
499        t = self.new_task(self.loop, gen)
500        t.add_done_callback(Dummy())
501
502        # format the coroutine object
503        if coroutines._DEBUG:
504            src = '%s:%s' % test_utils.get_function_source(notmuch)
505        else:
506            code = gen.gi_code
507            src = '%s:%s' % (code.co_filename, code.co_firstlineno)
508        coro = format_coroutine(coro_qualname, 'running', src,
509                                t._source_traceback,
510                                generator=not coroutines._DEBUG)
511        self.assertEqual(repr(t),
512                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
513        self.loop.run_until_complete(t)
514
515    def test_task_repr_wait_for(self):
516        self.loop.set_debug(False)
517
518        async def wait_for(fut):
519            return await fut
520
521        fut = self.new_future(self.loop)
522        task = self.new_task(self.loop, wait_for(fut))
523        test_utils.run_briefly(self.loop)
524        self.assertRegex(repr(task),
525                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
526
527        fut.set_result(None)
528        self.loop.run_until_complete(task)
529
530    def test_task_repr_partial_corowrapper(self):
531        # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
532        # coroutine is a partial function
533        with set_coroutine_debug(True):
534            self.loop.set_debug(True)
535
536            async def func(x, y):
537                await asyncio.sleep(0)
538
539            with self.assertWarns(DeprecationWarning):
540                partial_func = asyncio.coroutine(functools.partial(func, 1))
541            task = self.loop.create_task(partial_func(2))
542
543            # make warnings quiet
544            task._log_destroy_pending = False
545            self.addCleanup(task._coro.close)
546
547        coro_repr = repr(task._coro)
548        expected = (
549            r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
550            r'\.<locals>\.func at'
551        )
552        self.assertRegex(coro_repr, expected)
553
554    def test_task_basics(self):
555
556        async def outer():
557            a = await inner1()
558            b = await inner2()
559            return a+b
560
561        async def inner1():
562            return 42
563
564        async def inner2():
565            return 1000
566
567        t = outer()
568        self.assertEqual(self.loop.run_until_complete(t), 1042)
569
570    def test_exception_chaining_after_await(self):
571        # Test that when awaiting on a task when an exception is already
572        # active, if the task raises an exception it will be chained
573        # with the original.
574        loop = asyncio.new_event_loop()
575        self.set_event_loop(loop)
576
577        async def raise_error():
578            raise ValueError
579
580        async def run():
581            try:
582                raise KeyError(3)
583            except Exception as exc:
584                task = self.new_task(loop, raise_error())
585                try:
586                    await task
587                except Exception as exc:
588                    self.assertEqual(type(exc), ValueError)
589                    chained = exc.__context__
590                    self.assertEqual((type(chained), chained.args),
591                        (KeyError, (3,)))
592
593        try:
594            task = self.new_task(loop, run())
595            loop.run_until_complete(task)
596        finally:
597            loop.close()
598
599    def test_exception_chaining_after_await_with_context_cycle(self):
600        # Check trying to create an exception context cycle:
601        # https://bugs.python.org/issue40696
602        has_cycle = None
603        loop = asyncio.new_event_loop()
604        self.set_event_loop(loop)
605
606        async def process_exc(exc):
607            raise exc
608
609        async def run():
610            nonlocal has_cycle
611            try:
612                raise KeyError('a')
613            except Exception as exc:
614                task = self.new_task(loop, process_exc(exc))
615                try:
616                    await task
617                except BaseException as exc:
618                    has_cycle = (exc is exc.__context__)
619                    # Prevent a hang if has_cycle is True.
620                    exc.__context__ = None
621
622        try:
623            task = self.new_task(loop, run())
624            loop.run_until_complete(task)
625        finally:
626            loop.close()
627        # This also distinguishes from the initial has_cycle=None.
628        self.assertEqual(has_cycle, False)
629
630    def test_cancel(self):
631
632        def gen():
633            when = yield
634            self.assertAlmostEqual(10.0, when)
635            yield 0
636
637        loop = self.new_test_loop(gen)
638
639        async def task():
640            await asyncio.sleep(10.0)
641            return 12
642
643        t = self.new_task(loop, task())
644        loop.call_soon(t.cancel)
645        with self.assertRaises(asyncio.CancelledError):
646            loop.run_until_complete(t)
647        self.assertTrue(t.done())
648        self.assertTrue(t.cancelled())
649        self.assertFalse(t.cancel())
650
651    def test_cancel_with_message_then_future_result(self):
652        # Test Future.result() after calling cancel() with a message.
653        cases = [
654            ((), ()),
655            ((None,), ()),
656            (('my message',), ('my message',)),
657            # Non-string values should roundtrip.
658            ((5,), (5,)),
659        ]
660        for cancel_args, expected_args in cases:
661            with self.subTest(cancel_args=cancel_args):
662                loop = asyncio.new_event_loop()
663                self.set_event_loop(loop)
664
665                async def sleep():
666                    await asyncio.sleep(10)
667
668                async def coro():
669                    task = self.new_task(loop, sleep())
670                    await asyncio.sleep(0)
671                    task.cancel(*cancel_args)
672                    done, pending = await asyncio.wait([task])
673                    task.result()
674
675                task = self.new_task(loop, coro())
676                with self.assertRaises(asyncio.CancelledError) as cm:
677                    loop.run_until_complete(task)
678                exc = cm.exception
679                self.assertEqual(exc.args, ())
680
681                actual = get_innermost_context(exc)
682                self.assertEqual(actual,
683                    (asyncio.CancelledError, expected_args, 2))
684
685    def test_cancel_with_message_then_future_exception(self):
686        # Test Future.exception() after calling cancel() with a message.
687        cases = [
688            ((), ()),
689            ((None,), ()),
690            (('my message',), ('my message',)),
691            # Non-string values should roundtrip.
692            ((5,), (5,)),
693        ]
694        for cancel_args, expected_args in cases:
695            with self.subTest(cancel_args=cancel_args):
696                loop = asyncio.new_event_loop()
697                self.set_event_loop(loop)
698
699                async def sleep():
700                    await asyncio.sleep(10)
701
702                async def coro():
703                    task = self.new_task(loop, sleep())
704                    await asyncio.sleep(0)
705                    task.cancel(*cancel_args)
706                    done, pending = await asyncio.wait([task])
707                    task.exception()
708
709                task = self.new_task(loop, coro())
710                with self.assertRaises(asyncio.CancelledError) as cm:
711                    loop.run_until_complete(task)
712                exc = cm.exception
713                self.assertEqual(exc.args, ())
714
715                actual = get_innermost_context(exc)
716                self.assertEqual(actual,
717                    (asyncio.CancelledError, expected_args, 2))
718
719    def test_cancel_with_message_before_starting_task(self):
720        loop = asyncio.new_event_loop()
721        self.set_event_loop(loop)
722
723        async def sleep():
724            await asyncio.sleep(10)
725
726        async def coro():
727            task = self.new_task(loop, sleep())
728            # We deliberately leave out the sleep here.
729            task.cancel('my message')
730            done, pending = await asyncio.wait([task])
731            task.exception()
732
733        task = self.new_task(loop, coro())
734        with self.assertRaises(asyncio.CancelledError) as cm:
735            loop.run_until_complete(task)
736        exc = cm.exception
737        self.assertEqual(exc.args, ())
738
739        actual = get_innermost_context(exc)
740        self.assertEqual(actual,
741            (asyncio.CancelledError, ('my message',), 2))
742
743    def test_cancel_yield(self):
744        with self.assertWarns(DeprecationWarning):
745            @asyncio.coroutine
746            def task():
747                yield
748                yield
749                return 12
750
751        t = self.new_task(self.loop, task())
752        test_utils.run_briefly(self.loop)  # start coro
753        t.cancel()
754        self.assertRaises(
755            asyncio.CancelledError, self.loop.run_until_complete, t)
756        self.assertTrue(t.done())
757        self.assertTrue(t.cancelled())
758        self.assertFalse(t.cancel())
759
760    def test_cancel_inner_future(self):
761        f = self.new_future(self.loop)
762
763        async def task():
764            await f
765            return 12
766
767        t = self.new_task(self.loop, task())
768        test_utils.run_briefly(self.loop)  # start task
769        f.cancel()
770        with self.assertRaises(asyncio.CancelledError):
771            self.loop.run_until_complete(t)
772        self.assertTrue(f.cancelled())
773        self.assertTrue(t.cancelled())
774
775    def test_cancel_both_task_and_inner_future(self):
776        f = self.new_future(self.loop)
777
778        async def task():
779            await f
780            return 12
781
782        t = self.new_task(self.loop, task())
783        test_utils.run_briefly(self.loop)
784
785        f.cancel()
786        t.cancel()
787
788        with self.assertRaises(asyncio.CancelledError):
789            self.loop.run_until_complete(t)
790
791        self.assertTrue(t.done())
792        self.assertTrue(f.cancelled())
793        self.assertTrue(t.cancelled())
794
795    def test_cancel_task_catching(self):
796        fut1 = self.new_future(self.loop)
797        fut2 = self.new_future(self.loop)
798
799        async def task():
800            await fut1
801            try:
802                await fut2
803            except asyncio.CancelledError:
804                return 42
805
806        t = self.new_task(self.loop, task())
807        test_utils.run_briefly(self.loop)
808        self.assertIs(t._fut_waiter, fut1)  # White-box test.
809        fut1.set_result(None)
810        test_utils.run_briefly(self.loop)
811        self.assertIs(t._fut_waiter, fut2)  # White-box test.
812        t.cancel()
813        self.assertTrue(fut2.cancelled())
814        res = self.loop.run_until_complete(t)
815        self.assertEqual(res, 42)
816        self.assertFalse(t.cancelled())
817
818    def test_cancel_task_ignoring(self):
819        fut1 = self.new_future(self.loop)
820        fut2 = self.new_future(self.loop)
821        fut3 = self.new_future(self.loop)
822
823        async def task():
824            await fut1
825            try:
826                await fut2
827            except asyncio.CancelledError:
828                pass
829            res = await fut3
830            return res
831
832        t = self.new_task(self.loop, task())
833        test_utils.run_briefly(self.loop)
834        self.assertIs(t._fut_waiter, fut1)  # White-box test.
835        fut1.set_result(None)
836        test_utils.run_briefly(self.loop)
837        self.assertIs(t._fut_waiter, fut2)  # White-box test.
838        t.cancel()
839        self.assertTrue(fut2.cancelled())
840        test_utils.run_briefly(self.loop)
841        self.assertIs(t._fut_waiter, fut3)  # White-box test.
842        fut3.set_result(42)
843        res = self.loop.run_until_complete(t)
844        self.assertEqual(res, 42)
845        self.assertFalse(fut3.cancelled())
846        self.assertFalse(t.cancelled())
847
848    def test_cancel_current_task(self):
849        loop = asyncio.new_event_loop()
850        self.set_event_loop(loop)
851
852        async def task():
853            t.cancel()
854            self.assertTrue(t._must_cancel)  # White-box test.
855            # The sleep should be cancelled immediately.
856            await asyncio.sleep(100)
857            return 12
858
859        t = self.new_task(loop, task())
860        self.assertFalse(t.cancelled())
861        self.assertRaises(
862            asyncio.CancelledError, loop.run_until_complete, t)
863        self.assertTrue(t.done())
864        self.assertTrue(t.cancelled())
865        self.assertFalse(t._must_cancel)  # White-box test.
866        self.assertFalse(t.cancel())
867
868    def test_cancel_at_end(self):
869        """coroutine end right after task is cancelled"""
870        loop = asyncio.new_event_loop()
871        self.set_event_loop(loop)
872
873        async def task():
874            t.cancel()
875            self.assertTrue(t._must_cancel)  # White-box test.
876            return 12
877
878        t = self.new_task(loop, task())
879        self.assertFalse(t.cancelled())
880        self.assertRaises(
881            asyncio.CancelledError, loop.run_until_complete, t)
882        self.assertTrue(t.done())
883        self.assertTrue(t.cancelled())
884        self.assertFalse(t._must_cancel)  # White-box test.
885        self.assertFalse(t.cancel())
886
887    def test_cancel_awaited_task(self):
888        # This tests for a relatively rare condition when
889        # a task cancellation is requested for a task which is not
890        # currently blocked, such as a task cancelling itself.
891        # In this situation we must ensure that whatever next future
892        # or task the cancelled task blocks on is cancelled correctly
893        # as well.  See also bpo-34872.
894        loop = asyncio.new_event_loop()
895        self.addCleanup(lambda: loop.close())
896
897        task = nested_task = None
898        fut = self.new_future(loop)
899
900        async def nested():
901            await fut
902
903        async def coro():
904            nonlocal nested_task
905            # Create a sub-task and wait for it to run.
906            nested_task = self.new_task(loop, nested())
907            await asyncio.sleep(0)
908
909            # Request the current task to be cancelled.
910            task.cancel()
911            # Block on the nested task, which should be immediately
912            # cancelled.
913            await nested_task
914
915        task = self.new_task(loop, coro())
916        with self.assertRaises(asyncio.CancelledError):
917            loop.run_until_complete(task)
918
919        self.assertTrue(task.cancelled())
920        self.assertTrue(nested_task.cancelled())
921        self.assertTrue(fut.cancelled())
922
923    def assert_text_contains(self, text, substr):
924        if substr not in text:
925            raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
926
927    def test_cancel_traceback_for_future_result(self):
928        # When calling Future.result() on a cancelled task, check that the
929        # line of code that was interrupted is included in the traceback.
930        loop = asyncio.new_event_loop()
931        self.set_event_loop(loop)
932
933        async def nested():
934            # This will get cancelled immediately.
935            await asyncio.sleep(10)
936
937        async def coro():
938            task = self.new_task(loop, nested())
939            await asyncio.sleep(0)
940            task.cancel()
941            await task  # search target
942
943        task = self.new_task(loop, coro())
944        try:
945            loop.run_until_complete(task)
946        except asyncio.CancelledError:
947            tb = traceback.format_exc()
948            self.assert_text_contains(tb, "await asyncio.sleep(10)")
949            # The intermediate await should also be included.
950            self.assert_text_contains(tb, "await task  # search target")
951        else:
952            self.fail('CancelledError did not occur')
953
954    def test_cancel_traceback_for_future_exception(self):
955        # When calling Future.exception() on a cancelled task, check that the
956        # line of code that was interrupted is included in the traceback.
957        loop = asyncio.new_event_loop()
958        self.set_event_loop(loop)
959
960        async def nested():
961            # This will get cancelled immediately.
962            await asyncio.sleep(10)
963
964        async def coro():
965            task = self.new_task(loop, nested())
966            await asyncio.sleep(0)
967            task.cancel()
968            done, pending = await asyncio.wait([task])
969            task.exception()  # search target
970
971        task = self.new_task(loop, coro())
972        try:
973            loop.run_until_complete(task)
974        except asyncio.CancelledError:
975            tb = traceback.format_exc()
976            self.assert_text_contains(tb, "await asyncio.sleep(10)")
977            # The intermediate await should also be included.
978            self.assert_text_contains(tb,
979                "task.exception()  # search target")
980        else:
981            self.fail('CancelledError did not occur')
982
983    def test_stop_while_run_in_complete(self):
984
985        def gen():
986            when = yield
987            self.assertAlmostEqual(0.1, when)
988            when = yield 0.1
989            self.assertAlmostEqual(0.2, when)
990            when = yield 0.1
991            self.assertAlmostEqual(0.3, when)
992            yield 0.1
993
994        loop = self.new_test_loop(gen)
995
996        x = 0
997
998        async def task():
999            nonlocal x
1000            while x < 10:
1001                await asyncio.sleep(0.1)
1002                x += 1
1003                if x == 2:
1004                    loop.stop()
1005
1006        t = self.new_task(loop, task())
1007        with self.assertRaises(RuntimeError) as cm:
1008            loop.run_until_complete(t)
1009        self.assertEqual(str(cm.exception),
1010                         'Event loop stopped before Future completed.')
1011        self.assertFalse(t.done())
1012        self.assertEqual(x, 2)
1013        self.assertAlmostEqual(0.3, loop.time())
1014
1015        t.cancel()
1016        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
1017
1018    def test_log_traceback(self):
1019        async def coro():
1020            pass
1021
1022        task = self.new_task(self.loop, coro())
1023        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
1024            task._log_traceback = True
1025        self.loop.run_until_complete(task)
1026
1027    def test_wait_for_timeout_less_then_0_or_0_future_done(self):
1028        def gen():
1029            when = yield
1030            self.assertAlmostEqual(0, when)
1031
1032        loop = self.new_test_loop(gen)
1033
1034        fut = self.new_future(loop)
1035        fut.set_result('done')
1036
1037        ret = loop.run_until_complete(asyncio.wait_for(fut, 0))
1038
1039        self.assertEqual(ret, 'done')
1040        self.assertTrue(fut.done())
1041        self.assertAlmostEqual(0, loop.time())
1042
1043    def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
1044        def gen():
1045            when = yield
1046            self.assertAlmostEqual(0, when)
1047
1048        loop = self.new_test_loop(gen)
1049
1050        foo_started = False
1051
1052        async def foo():
1053            nonlocal foo_started
1054            foo_started = True
1055
1056        with self.assertRaises(asyncio.TimeoutError):
1057            loop.run_until_complete(asyncio.wait_for(foo(), 0))
1058
1059        self.assertAlmostEqual(0, loop.time())
1060        self.assertEqual(foo_started, False)
1061
1062    def test_wait_for_timeout_less_then_0_or_0(self):
1063        def gen():
1064            when = yield
1065            self.assertAlmostEqual(0.2, when)
1066            when = yield 0
1067            self.assertAlmostEqual(0, when)
1068
1069        for timeout in [0, -1]:
1070            with self.subTest(timeout=timeout):
1071                loop = self.new_test_loop(gen)
1072
1073                foo_running = None
1074
1075                async def foo():
1076                    nonlocal foo_running
1077                    foo_running = True
1078                    try:
1079                        await asyncio.sleep(0.2)
1080                    finally:
1081                        foo_running = False
1082                    return 'done'
1083
1084                fut = self.new_task(loop, foo())
1085
1086                with self.assertRaises(asyncio.TimeoutError):
1087                    loop.run_until_complete(asyncio.wait_for(fut, timeout))
1088                self.assertTrue(fut.done())
1089                # it should have been cancelled due to the timeout
1090                self.assertTrue(fut.cancelled())
1091                self.assertAlmostEqual(0, loop.time())
1092                self.assertEqual(foo_running, False)
1093
1094    def test_wait_for(self):
1095
1096        def gen():
1097            when = yield
1098            self.assertAlmostEqual(0.2, when)
1099            when = yield 0
1100            self.assertAlmostEqual(0.1, when)
1101            when = yield 0.1
1102
1103        loop = self.new_test_loop(gen)
1104
1105        foo_running = None
1106
1107        async def foo():
1108            nonlocal foo_running
1109            foo_running = True
1110            try:
1111                await asyncio.sleep(0.2)
1112            finally:
1113                foo_running = False
1114            return 'done'
1115
1116        fut = self.new_task(loop, foo())
1117
1118        with self.assertRaises(asyncio.TimeoutError):
1119            loop.run_until_complete(asyncio.wait_for(fut, 0.1))
1120        self.assertTrue(fut.done())
1121        # it should have been cancelled due to the timeout
1122        self.assertTrue(fut.cancelled())
1123        self.assertAlmostEqual(0.1, loop.time())
1124        self.assertEqual(foo_running, False)
1125
1126    def test_wait_for_blocking(self):
1127        loop = self.new_test_loop()
1128
1129        async def coro():
1130            return 'done'
1131
1132        res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
1133        self.assertEqual(res, 'done')
1134
1135    def test_wait_for_race_condition(self):
1136
1137        def gen():
1138            yield 0.1
1139            yield 0.1
1140            yield 0.1
1141
1142        loop = self.new_test_loop(gen)
1143
1144        fut = self.new_future(loop)
1145        task = asyncio.wait_for(fut, timeout=0.2)
1146        loop.call_later(0.1, fut.set_result, "ok")
1147        res = loop.run_until_complete(task)
1148        self.assertEqual(res, "ok")
1149
1150    def test_wait_for_cancellation_race_condition(self):
1151        async def inner():
1152            with contextlib.suppress(asyncio.CancelledError):
1153                await asyncio.sleep(1)
1154            return 1
1155
1156        async def main():
1157            result = await asyncio.wait_for(inner(), timeout=.01)
1158            self.assertEqual(result, 1)
1159
1160        asyncio.run(main())
1161
1162    def test_wait_for_waits_for_task_cancellation(self):
1163        loop = asyncio.new_event_loop()
1164        self.addCleanup(loop.close)
1165
1166        task_done = False
1167
1168        async def foo():
1169            async def inner():
1170                nonlocal task_done
1171                try:
1172                    await asyncio.sleep(0.2)
1173                except asyncio.CancelledError:
1174                    await asyncio.sleep(_EPSILON)
1175                    raise
1176                finally:
1177                    task_done = True
1178
1179            inner_task = self.new_task(loop, inner())
1180
1181            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1182
1183        with self.assertRaises(asyncio.TimeoutError) as cm:
1184            loop.run_until_complete(foo())
1185
1186        self.assertTrue(task_done)
1187        chained = cm.exception.__context__
1188        self.assertEqual(type(chained), asyncio.CancelledError)
1189
1190    def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
1191        loop = asyncio.new_event_loop()
1192        self.addCleanup(loop.close)
1193
1194        task_done = False
1195
1196        async def foo():
1197            async def inner():
1198                nonlocal task_done
1199                try:
1200                    await asyncio.sleep(10)
1201                except asyncio.CancelledError:
1202                    await asyncio.sleep(_EPSILON)
1203                    raise
1204                finally:
1205                    task_done = True
1206
1207            inner_task = self.new_task(loop, inner())
1208            await asyncio.sleep(_EPSILON)
1209            await asyncio.wait_for(inner_task, timeout=0)
1210
1211        with self.assertRaises(asyncio.TimeoutError) as cm:
1212            loop.run_until_complete(foo())
1213
1214        self.assertTrue(task_done)
1215        chained = cm.exception.__context__
1216        self.assertEqual(type(chained), asyncio.CancelledError)
1217
1218    def test_wait_for_reraises_exception_during_cancellation(self):
1219        loop = asyncio.new_event_loop()
1220        self.addCleanup(loop.close)
1221
1222        class FooException(Exception):
1223            pass
1224
1225        async def foo():
1226            async def inner():
1227                try:
1228                    await asyncio.sleep(0.2)
1229                finally:
1230                    raise FooException
1231
1232            inner_task = self.new_task(loop, inner())
1233
1234            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1235
1236        with self.assertRaises(FooException):
1237            loop.run_until_complete(foo())
1238
1239    def test_wait_for_self_cancellation(self):
1240        loop = asyncio.new_event_loop()
1241        self.addCleanup(loop.close)
1242
1243        async def foo():
1244            async def inner():
1245                try:
1246                    await asyncio.sleep(0.3)
1247                except asyncio.CancelledError:
1248                    try:
1249                        await asyncio.sleep(0.3)
1250                    except asyncio.CancelledError:
1251                        await asyncio.sleep(0.3)
1252
1253                return 42
1254
1255            inner_task = self.new_task(loop, inner())
1256
1257            wait = asyncio.wait_for(inner_task, timeout=0.1)
1258
1259            # Test that wait_for itself is properly cancellable
1260            # even when the initial task holds up the initial cancellation.
1261            task = self.new_task(loop, wait)
1262            await asyncio.sleep(0.2)
1263            task.cancel()
1264
1265            with self.assertRaises(asyncio.CancelledError):
1266                await task
1267
1268            self.assertEqual(await inner_task, 42)
1269
1270        loop.run_until_complete(foo())
1271
1272    def test_wait(self):
1273
1274        def gen():
1275            when = yield
1276            self.assertAlmostEqual(0.1, when)
1277            when = yield 0
1278            self.assertAlmostEqual(0.15, when)
1279            yield 0.15
1280
1281        loop = self.new_test_loop(gen)
1282
1283        a = self.new_task(loop, asyncio.sleep(0.1))
1284        b = self.new_task(loop, asyncio.sleep(0.15))
1285
1286        async def foo():
1287            done, pending = await asyncio.wait([b, a])
1288            self.assertEqual(done, set([a, b]))
1289            self.assertEqual(pending, set())
1290            return 42
1291
1292        res = loop.run_until_complete(self.new_task(loop, foo()))
1293        self.assertEqual(res, 42)
1294        self.assertAlmostEqual(0.15, loop.time())
1295
1296        # Doing it again should take no time and exercise a different path.
1297        res = loop.run_until_complete(self.new_task(loop, foo()))
1298        self.assertAlmostEqual(0.15, loop.time())
1299        self.assertEqual(res, 42)
1300
1301    def test_wait_duplicate_coroutines(self):
1302
1303        with self.assertWarns(DeprecationWarning):
1304            @asyncio.coroutine
1305            def coro(s):
1306                return s
1307        c = coro('test')
1308        task = self.new_task(
1309            self.loop,
1310            asyncio.wait([c, c, coro('spam')]))
1311
1312        with self.assertWarns(DeprecationWarning):
1313            done, pending = self.loop.run_until_complete(task)
1314
1315        self.assertFalse(pending)
1316        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1317
1318    def test_wait_errors(self):
1319        self.assertRaises(
1320            ValueError, self.loop.run_until_complete,
1321            asyncio.wait(set()))
1322
1323        # -1 is an invalid return_when value
1324        sleep_coro = asyncio.sleep(10.0)
1325        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1326        self.assertRaises(ValueError,
1327                          self.loop.run_until_complete, wait_coro)
1328
1329        sleep_coro.close()
1330
1331    def test_wait_first_completed(self):
1332
1333        def gen():
1334            when = yield
1335            self.assertAlmostEqual(10.0, when)
1336            when = yield 0
1337            self.assertAlmostEqual(0.1, when)
1338            yield 0.1
1339
1340        loop = self.new_test_loop(gen)
1341
1342        a = self.new_task(loop, asyncio.sleep(10.0))
1343        b = self.new_task(loop, asyncio.sleep(0.1))
1344        task = self.new_task(
1345            loop,
1346            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1347
1348        done, pending = loop.run_until_complete(task)
1349        self.assertEqual({b}, done)
1350        self.assertEqual({a}, pending)
1351        self.assertFalse(a.done())
1352        self.assertTrue(b.done())
1353        self.assertIsNone(b.result())
1354        self.assertAlmostEqual(0.1, loop.time())
1355
1356        # move forward to close generator
1357        loop.advance_time(10)
1358        loop.run_until_complete(asyncio.wait([a, b]))
1359
1360    def test_wait_really_done(self):
1361        # there is possibility that some tasks in the pending list
1362        # became done but their callbacks haven't all been called yet
1363
1364        async def coro1():
1365            await asyncio.sleep(0)
1366
1367        async def coro2():
1368            await asyncio.sleep(0)
1369            await asyncio.sleep(0)
1370
1371        a = self.new_task(self.loop, coro1())
1372        b = self.new_task(self.loop, coro2())
1373        task = self.new_task(
1374            self.loop,
1375            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1376
1377        done, pending = self.loop.run_until_complete(task)
1378        self.assertEqual({a, b}, done)
1379        self.assertTrue(a.done())
1380        self.assertIsNone(a.result())
1381        self.assertTrue(b.done())
1382        self.assertIsNone(b.result())
1383
1384    def test_wait_first_exception(self):
1385
1386        def gen():
1387            when = yield
1388            self.assertAlmostEqual(10.0, when)
1389            yield 0
1390
1391        loop = self.new_test_loop(gen)
1392
1393        # first_exception, task already has exception
1394        a = self.new_task(loop, asyncio.sleep(10.0))
1395
1396        async def exc():
1397            raise ZeroDivisionError('err')
1398
1399        b = self.new_task(loop, exc())
1400        task = self.new_task(
1401            loop,
1402            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1403
1404        done, pending = loop.run_until_complete(task)
1405        self.assertEqual({b}, done)
1406        self.assertEqual({a}, pending)
1407        self.assertAlmostEqual(0, loop.time())
1408
1409        # move forward to close generator
1410        loop.advance_time(10)
1411        loop.run_until_complete(asyncio.wait([a, b]))
1412
1413    def test_wait_first_exception_in_wait(self):
1414
1415        def gen():
1416            when = yield
1417            self.assertAlmostEqual(10.0, when)
1418            when = yield 0
1419            self.assertAlmostEqual(0.01, when)
1420            yield 0.01
1421
1422        loop = self.new_test_loop(gen)
1423
1424        # first_exception, exception during waiting
1425        a = self.new_task(loop, asyncio.sleep(10.0))
1426
1427        async def exc():
1428            await asyncio.sleep(0.01)
1429            raise ZeroDivisionError('err')
1430
1431        b = self.new_task(loop, exc())
1432        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1433
1434        done, pending = loop.run_until_complete(task)
1435        self.assertEqual({b}, done)
1436        self.assertEqual({a}, pending)
1437        self.assertAlmostEqual(0.01, loop.time())
1438
1439        # move forward to close generator
1440        loop.advance_time(10)
1441        loop.run_until_complete(asyncio.wait([a, b]))
1442
1443    def test_wait_with_exception(self):
1444
1445        def gen():
1446            when = yield
1447            self.assertAlmostEqual(0.1, when)
1448            when = yield 0
1449            self.assertAlmostEqual(0.15, when)
1450            yield 0.15
1451
1452        loop = self.new_test_loop(gen)
1453
1454        a = self.new_task(loop, asyncio.sleep(0.1))
1455
1456        async def sleeper():
1457            await asyncio.sleep(0.15)
1458            raise ZeroDivisionError('really')
1459
1460        b = self.new_task(loop, sleeper())
1461
1462        async def foo():
1463            done, pending = await asyncio.wait([b, a])
1464            self.assertEqual(len(done), 2)
1465            self.assertEqual(pending, set())
1466            errors = set(f for f in done if f.exception() is not None)
1467            self.assertEqual(len(errors), 1)
1468
1469        loop.run_until_complete(self.new_task(loop, foo()))
1470        self.assertAlmostEqual(0.15, loop.time())
1471
1472        loop.run_until_complete(self.new_task(loop, foo()))
1473        self.assertAlmostEqual(0.15, loop.time())
1474
1475    def test_wait_with_timeout(self):
1476
1477        def gen():
1478            when = yield
1479            self.assertAlmostEqual(0.1, when)
1480            when = yield 0
1481            self.assertAlmostEqual(0.15, when)
1482            when = yield 0
1483            self.assertAlmostEqual(0.11, when)
1484            yield 0.11
1485
1486        loop = self.new_test_loop(gen)
1487
1488        a = self.new_task(loop, asyncio.sleep(0.1))
1489        b = self.new_task(loop, asyncio.sleep(0.15))
1490
1491        async def foo():
1492            done, pending = await asyncio.wait([b, a], timeout=0.11)
1493            self.assertEqual(done, set([a]))
1494            self.assertEqual(pending, set([b]))
1495
1496        loop.run_until_complete(self.new_task(loop, foo()))
1497        self.assertAlmostEqual(0.11, loop.time())
1498
1499        # move forward to close generator
1500        loop.advance_time(10)
1501        loop.run_until_complete(asyncio.wait([a, b]))
1502
1503    def test_wait_concurrent_complete(self):
1504
1505        def gen():
1506            when = yield
1507            self.assertAlmostEqual(0.1, when)
1508            when = yield 0
1509            self.assertAlmostEqual(0.15, when)
1510            when = yield 0
1511            self.assertAlmostEqual(0.1, when)
1512            yield 0.1
1513
1514        loop = self.new_test_loop(gen)
1515
1516        a = self.new_task(loop, asyncio.sleep(0.1))
1517        b = self.new_task(loop, asyncio.sleep(0.15))
1518
1519        done, pending = loop.run_until_complete(
1520            asyncio.wait([b, a], timeout=0.1))
1521
1522        self.assertEqual(done, set([a]))
1523        self.assertEqual(pending, set([b]))
1524        self.assertAlmostEqual(0.1, loop.time())
1525
1526        # move forward to close generator
1527        loop.advance_time(10)
1528        loop.run_until_complete(asyncio.wait([a, b]))
1529
1530    def test_wait_with_iterator_of_tasks(self):
1531
1532        def gen():
1533            when = yield
1534            self.assertAlmostEqual(0.1, when)
1535            when = yield 0
1536            self.assertAlmostEqual(0.15, when)
1537            yield 0.15
1538
1539        loop = self.new_test_loop(gen)
1540
1541        a = self.new_task(loop, asyncio.sleep(0.1))
1542        b = self.new_task(loop, asyncio.sleep(0.15))
1543
1544        async def foo():
1545            done, pending = await asyncio.wait(iter([b, a]))
1546            self.assertEqual(done, set([a, b]))
1547            self.assertEqual(pending, set())
1548            return 42
1549
1550        res = loop.run_until_complete(self.new_task(loop, foo()))
1551        self.assertEqual(res, 42)
1552        self.assertAlmostEqual(0.15, loop.time())
1553
1554    def test_as_completed(self):
1555
1556        def gen():
1557            yield 0
1558            yield 0
1559            yield 0.01
1560            yield 0
1561
1562        loop = self.new_test_loop(gen)
1563        # disable "slow callback" warning
1564        loop.slow_callback_duration = 1.0
1565        completed = set()
1566        time_shifted = False
1567
1568        with self.assertWarns(DeprecationWarning):
1569            @asyncio.coroutine
1570            def sleeper(dt, x):
1571                nonlocal time_shifted
1572                yield from  asyncio.sleep(dt)
1573                completed.add(x)
1574                if not time_shifted and 'a' in completed and 'b' in completed:
1575                    time_shifted = True
1576                    loop.advance_time(0.14)
1577                return x
1578
1579        a = sleeper(0.01, 'a')
1580        b = sleeper(0.01, 'b')
1581        c = sleeper(0.15, 'c')
1582
1583        async def foo():
1584            values = []
1585            for f in asyncio.as_completed([b, c, a]):
1586                values.append(await f)
1587            return values
1588
1589        res = loop.run_until_complete(self.new_task(loop, foo()))
1590        self.assertAlmostEqual(0.15, loop.time())
1591        self.assertTrue('a' in res[:2])
1592        self.assertTrue('b' in res[:2])
1593        self.assertEqual(res[2], 'c')
1594
1595        # Doing it again should take no time and exercise a different path.
1596        res = loop.run_until_complete(self.new_task(loop, foo()))
1597        self.assertAlmostEqual(0.15, loop.time())
1598
1599    def test_as_completed_with_timeout(self):
1600
1601        def gen():
1602            yield
1603            yield 0
1604            yield 0
1605            yield 0.1
1606
1607        loop = self.new_test_loop(gen)
1608
1609        a = loop.create_task(asyncio.sleep(0.1, 'a'))
1610        b = loop.create_task(asyncio.sleep(0.15, 'b'))
1611
1612        async def foo():
1613            values = []
1614            for f in asyncio.as_completed([a, b], timeout=0.12):
1615                if values:
1616                    loop.advance_time(0.02)
1617                try:
1618                    v = await f
1619                    values.append((1, v))
1620                except asyncio.TimeoutError as exc:
1621                    values.append((2, exc))
1622            return values
1623
1624        res = loop.run_until_complete(self.new_task(loop, foo()))
1625        self.assertEqual(len(res), 2, res)
1626        self.assertEqual(res[0], (1, 'a'))
1627        self.assertEqual(res[1][0], 2)
1628        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1629        self.assertAlmostEqual(0.12, loop.time())
1630
1631        # move forward to close generator
1632        loop.advance_time(10)
1633        loop.run_until_complete(asyncio.wait([a, b]))
1634
1635    def test_as_completed_with_unused_timeout(self):
1636
1637        def gen():
1638            yield
1639            yield 0
1640            yield 0.01
1641
1642        loop = self.new_test_loop(gen)
1643
1644        a = asyncio.sleep(0.01, 'a')
1645
1646        async def foo():
1647            for f in asyncio.as_completed([a], timeout=1):
1648                v = await f
1649                self.assertEqual(v, 'a')
1650
1651        loop.run_until_complete(self.new_task(loop, foo()))
1652
1653    def test_as_completed_reverse_wait(self):
1654
1655        def gen():
1656            yield 0
1657            yield 0.05
1658            yield 0
1659
1660        loop = self.new_test_loop(gen)
1661
1662        a = asyncio.sleep(0.05, 'a')
1663        b = asyncio.sleep(0.10, 'b')
1664        fs = {a, b}
1665
1666        async def test():
1667            futs = list(asyncio.as_completed(fs))
1668            self.assertEqual(len(futs), 2)
1669
1670            x = await futs[1]
1671            self.assertEqual(x, 'a')
1672            self.assertAlmostEqual(0.05, loop.time())
1673            loop.advance_time(0.05)
1674            y = await futs[0]
1675            self.assertEqual(y, 'b')
1676            self.assertAlmostEqual(0.10, loop.time())
1677
1678        loop.run_until_complete(test())
1679
1680    def test_as_completed_concurrent(self):
1681
1682        def gen():
1683            when = yield
1684            self.assertAlmostEqual(0.05, when)
1685            when = yield 0
1686            self.assertAlmostEqual(0.05, when)
1687            yield 0.05
1688
1689        a = asyncio.sleep(0.05, 'a')
1690        b = asyncio.sleep(0.05, 'b')
1691        fs = {a, b}
1692
1693        async def test():
1694            futs = list(asyncio.as_completed(fs))
1695            self.assertEqual(len(futs), 2)
1696            waiter = asyncio.wait(futs)
1697            # Deprecation from passing coros in futs to asyncio.wait()
1698            with self.assertWarns(DeprecationWarning) as cm:
1699                done, pending = await waiter
1700            self.assertEqual(cm.warnings[0].filename, __file__)
1701            self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1702
1703        loop = self.new_test_loop(gen)
1704        loop.run_until_complete(test())
1705
1706    def test_as_completed_duplicate_coroutines(self):
1707
1708        with self.assertWarns(DeprecationWarning):
1709            @asyncio.coroutine
1710            def coro(s):
1711                return s
1712
1713        with self.assertWarns(DeprecationWarning):
1714            @asyncio.coroutine
1715            def runner():
1716                result = []
1717                c = coro('ham')
1718                for f in asyncio.as_completed([c, c, coro('spam')]):
1719                    result.append((yield from f))
1720                return result
1721
1722        fut = self.new_task(self.loop, runner())
1723        self.loop.run_until_complete(fut)
1724        result = fut.result()
1725        self.assertEqual(set(result), {'ham', 'spam'})
1726        self.assertEqual(len(result), 2)
1727
1728    def test_as_completed_coroutine_without_loop(self):
1729        async def coro():
1730            return 42
1731
1732        a = coro()
1733        self.addCleanup(a.close)
1734
1735        futs = asyncio.as_completed([a])
1736        with self.assertWarns(DeprecationWarning) as cm:
1737            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
1738                list(futs)
1739        self.assertEqual(cm.warnings[0].filename, __file__)
1740
1741    def test_as_completed_coroutine_use_running_loop(self):
1742        loop = self.new_test_loop()
1743
1744        async def coro():
1745            return 42
1746
1747        async def test():
1748            futs = list(asyncio.as_completed([coro()]))
1749            self.assertEqual(len(futs), 1)
1750            self.assertEqual(await futs[0], 42)
1751
1752        loop.run_until_complete(test())
1753
1754    def test_as_completed_coroutine_use_global_loop(self):
1755        # Deprecated in 3.10
1756        async def coro():
1757            return 42
1758
1759        loop = self.new_test_loop()
1760        asyncio.set_event_loop(loop)
1761        self.addCleanup(asyncio.set_event_loop, None)
1762        futs = asyncio.as_completed([coro()])
1763        with self.assertWarns(DeprecationWarning) as cm:
1764            futs = list(futs)
1765        self.assertEqual(cm.warnings[0].filename, __file__)
1766        self.assertEqual(len(futs), 1)
1767        self.assertEqual(loop.run_until_complete(futs[0]), 42)
1768
1769    def test_sleep(self):
1770
1771        def gen():
1772            when = yield
1773            self.assertAlmostEqual(0.05, when)
1774            when = yield 0.05
1775            self.assertAlmostEqual(0.1, when)
1776            yield 0.05
1777
1778        loop = self.new_test_loop(gen)
1779
1780        async def sleeper(dt, arg):
1781            await asyncio.sleep(dt/2)
1782            res = await asyncio.sleep(dt/2, arg)
1783            return res
1784
1785        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1786        loop.run_until_complete(t)
1787        self.assertTrue(t.done())
1788        self.assertEqual(t.result(), 'yeah')
1789        self.assertAlmostEqual(0.1, loop.time())
1790
1791    def test_sleep_cancel(self):
1792
1793        def gen():
1794            when = yield
1795            self.assertAlmostEqual(10.0, when)
1796            yield 0
1797
1798        loop = self.new_test_loop(gen)
1799
1800        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1801
1802        handle = None
1803        orig_call_later = loop.call_later
1804
1805        def call_later(delay, callback, *args):
1806            nonlocal handle
1807            handle = orig_call_later(delay, callback, *args)
1808            return handle
1809
1810        loop.call_later = call_later
1811        test_utils.run_briefly(loop)
1812
1813        self.assertFalse(handle._cancelled)
1814
1815        t.cancel()
1816        test_utils.run_briefly(loop)
1817        self.assertTrue(handle._cancelled)
1818
1819    def test_task_cancel_sleeping_task(self):
1820
1821        def gen():
1822            when = yield
1823            self.assertAlmostEqual(0.1, when)
1824            when = yield 0
1825            self.assertAlmostEqual(5000, when)
1826            yield 0.1
1827
1828        loop = self.new_test_loop(gen)
1829
1830        async def sleep(dt):
1831            await asyncio.sleep(dt)
1832
1833        async def doit():
1834            sleeper = self.new_task(loop, sleep(5000))
1835            loop.call_later(0.1, sleeper.cancel)
1836            try:
1837                await sleeper
1838            except asyncio.CancelledError:
1839                return 'cancelled'
1840            else:
1841                return 'slept in'
1842
1843        doer = doit()
1844        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1845        self.assertAlmostEqual(0.1, loop.time())
1846
1847    def test_task_cancel_waiter_future(self):
1848        fut = self.new_future(self.loop)
1849
1850        async def coro():
1851            await fut
1852
1853        task = self.new_task(self.loop, coro())
1854        test_utils.run_briefly(self.loop)
1855        self.assertIs(task._fut_waiter, fut)
1856
1857        task.cancel()
1858        test_utils.run_briefly(self.loop)
1859        self.assertRaises(
1860            asyncio.CancelledError, self.loop.run_until_complete, task)
1861        self.assertIsNone(task._fut_waiter)
1862        self.assertTrue(fut.cancelled())
1863
1864    def test_task_set_methods(self):
1865        async def notmuch():
1866            return 'ko'
1867
1868        gen = notmuch()
1869        task = self.new_task(self.loop, gen)
1870
1871        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1872            task.set_result('ok')
1873
1874        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1875            task.set_exception(ValueError())
1876
1877        self.assertEqual(
1878            self.loop.run_until_complete(task),
1879            'ko')
1880
1881    def test_step_result(self):
1882        with self.assertWarns(DeprecationWarning):
1883            @asyncio.coroutine
1884            def notmuch():
1885                yield None
1886                yield 1
1887                return 'ko'
1888
1889        self.assertRaises(
1890            RuntimeError, self.loop.run_until_complete, notmuch())
1891
1892    def test_step_result_future(self):
1893        # If coroutine returns future, task waits on this future.
1894
1895        class Fut(asyncio.Future):
1896            def __init__(self, *args, **kwds):
1897                self.cb_added = False
1898                super().__init__(*args, **kwds)
1899
1900            def add_done_callback(self, *args, **kwargs):
1901                self.cb_added = True
1902                super().add_done_callback(*args, **kwargs)
1903
1904        fut = Fut(loop=self.loop)
1905        result = None
1906
1907        async def wait_for_future():
1908            nonlocal result
1909            result = await fut
1910
1911        t = self.new_task(self.loop, wait_for_future())
1912        test_utils.run_briefly(self.loop)
1913        self.assertTrue(fut.cb_added)
1914
1915        res = object()
1916        fut.set_result(res)
1917        test_utils.run_briefly(self.loop)
1918        self.assertIs(res, result)
1919        self.assertTrue(t.done())
1920        self.assertIsNone(t.result())
1921
1922    def test_baseexception_during_cancel(self):
1923
1924        def gen():
1925            when = yield
1926            self.assertAlmostEqual(10.0, when)
1927            yield 0
1928
1929        loop = self.new_test_loop(gen)
1930
1931        async def sleeper():
1932            await asyncio.sleep(10)
1933
1934        base_exc = SystemExit()
1935
1936        async def notmutch():
1937            try:
1938                await sleeper()
1939            except asyncio.CancelledError:
1940                raise base_exc
1941
1942        task = self.new_task(loop, notmutch())
1943        test_utils.run_briefly(loop)
1944
1945        task.cancel()
1946        self.assertFalse(task.done())
1947
1948        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1949
1950        self.assertTrue(task.done())
1951        self.assertFalse(task.cancelled())
1952        self.assertIs(task.exception(), base_exc)
1953
1954    def test_iscoroutinefunction(self):
1955        def fn():
1956            pass
1957
1958        self.assertFalse(asyncio.iscoroutinefunction(fn))
1959
1960        def fn1():
1961            yield
1962        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1963
1964        with self.assertWarns(DeprecationWarning):
1965            @asyncio.coroutine
1966            def fn2():
1967                yield
1968        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1969
1970        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1971
1972    def test_yield_vs_yield_from(self):
1973        fut = self.new_future(self.loop)
1974
1975        with self.assertWarns(DeprecationWarning):
1976            @asyncio.coroutine
1977            def wait_for_future():
1978                yield fut
1979
1980        task = wait_for_future()
1981        with self.assertRaises(RuntimeError):
1982            self.loop.run_until_complete(task)
1983
1984        self.assertFalse(fut.done())
1985
1986    def test_yield_vs_yield_from_generator(self):
1987        with self.assertWarns(DeprecationWarning):
1988            @asyncio.coroutine
1989            def coro():
1990                yield
1991
1992        with self.assertWarns(DeprecationWarning):
1993            @asyncio.coroutine
1994            def wait_for_future():
1995                gen = coro()
1996                try:
1997                    yield gen
1998                finally:
1999                    gen.close()
2000
2001        task = wait_for_future()
2002        self.assertRaises(
2003            RuntimeError,
2004            self.loop.run_until_complete, task)
2005
2006    def test_coroutine_non_gen_function(self):
2007        with self.assertWarns(DeprecationWarning):
2008            @asyncio.coroutine
2009            def func():
2010                return 'test'
2011
2012        self.assertTrue(asyncio.iscoroutinefunction(func))
2013
2014        coro = func()
2015        self.assertTrue(asyncio.iscoroutine(coro))
2016
2017        res = self.loop.run_until_complete(coro)
2018        self.assertEqual(res, 'test')
2019
2020    def test_coroutine_non_gen_function_return_future(self):
2021        fut = self.new_future(self.loop)
2022
2023        with self.assertWarns(DeprecationWarning):
2024            @asyncio.coroutine
2025            def func():
2026                return fut
2027
2028        async def coro():
2029            fut.set_result('test')
2030
2031        t1 = self.new_task(self.loop, func())
2032        t2 = self.new_task(self.loop, coro())
2033        res = self.loop.run_until_complete(t1)
2034        self.assertEqual(res, 'test')
2035        self.assertIsNone(t2.result())
2036
2037    def test_current_task(self):
2038        self.assertIsNone(asyncio.current_task(loop=self.loop))
2039
2040        async def coro(loop):
2041            self.assertIs(asyncio.current_task(), task)
2042
2043            self.assertIs(asyncio.current_task(None), task)
2044            self.assertIs(asyncio.current_task(), task)
2045
2046        task = self.new_task(self.loop, coro(self.loop))
2047        self.loop.run_until_complete(task)
2048        self.assertIsNone(asyncio.current_task(loop=self.loop))
2049
2050    def test_current_task_with_interleaving_tasks(self):
2051        self.assertIsNone(asyncio.current_task(loop=self.loop))
2052
2053        fut1 = self.new_future(self.loop)
2054        fut2 = self.new_future(self.loop)
2055
2056        async def coro1(loop):
2057            self.assertTrue(asyncio.current_task() is task1)
2058            await fut1
2059            self.assertTrue(asyncio.current_task() is task1)
2060            fut2.set_result(True)
2061
2062        async def coro2(loop):
2063            self.assertTrue(asyncio.current_task() is task2)
2064            fut1.set_result(True)
2065            await fut2
2066            self.assertTrue(asyncio.current_task() is task2)
2067
2068        task1 = self.new_task(self.loop, coro1(self.loop))
2069        task2 = self.new_task(self.loop, coro2(self.loop))
2070
2071        self.loop.run_until_complete(asyncio.wait((task1, task2)))
2072        self.assertIsNone(asyncio.current_task(loop=self.loop))
2073
2074    # Some thorough tests for cancellation propagation through
2075    # coroutines, tasks and wait().
2076
2077    def test_yield_future_passes_cancel(self):
2078        # Cancelling outer() cancels inner() cancels waiter.
2079        proof = 0
2080        waiter = self.new_future(self.loop)
2081
2082        async def inner():
2083            nonlocal proof
2084            try:
2085                await waiter
2086            except asyncio.CancelledError:
2087                proof += 1
2088                raise
2089            else:
2090                self.fail('got past sleep() in inner()')
2091
2092        async def outer():
2093            nonlocal proof
2094            try:
2095                await inner()
2096            except asyncio.CancelledError:
2097                proof += 100  # Expect this path.
2098            else:
2099                proof += 10
2100
2101        f = asyncio.ensure_future(outer(), loop=self.loop)
2102        test_utils.run_briefly(self.loop)
2103        f.cancel()
2104        self.loop.run_until_complete(f)
2105        self.assertEqual(proof, 101)
2106        self.assertTrue(waiter.cancelled())
2107
2108    def test_yield_wait_does_not_shield_cancel(self):
2109        # Cancelling outer() makes wait() return early, leaves inner()
2110        # running.
2111        proof = 0
2112        waiter = self.new_future(self.loop)
2113
2114        async def inner():
2115            nonlocal proof
2116            await waiter
2117            proof += 1
2118
2119        async def outer():
2120            nonlocal proof
2121            with self.assertWarns(DeprecationWarning):
2122                d, p = await asyncio.wait([inner()])
2123            proof += 100
2124
2125        f = asyncio.ensure_future(outer(), loop=self.loop)
2126        test_utils.run_briefly(self.loop)
2127        f.cancel()
2128        self.assertRaises(
2129            asyncio.CancelledError, self.loop.run_until_complete, f)
2130        waiter.set_result(None)
2131        test_utils.run_briefly(self.loop)
2132        self.assertEqual(proof, 1)
2133
2134    def test_shield_result(self):
2135        inner = self.new_future(self.loop)
2136        outer = asyncio.shield(inner)
2137        inner.set_result(42)
2138        res = self.loop.run_until_complete(outer)
2139        self.assertEqual(res, 42)
2140
2141    def test_shield_exception(self):
2142        inner = self.new_future(self.loop)
2143        outer = asyncio.shield(inner)
2144        test_utils.run_briefly(self.loop)
2145        exc = RuntimeError('expected')
2146        inner.set_exception(exc)
2147        test_utils.run_briefly(self.loop)
2148        self.assertIs(outer.exception(), exc)
2149
2150    def test_shield_cancel_inner(self):
2151        inner = self.new_future(self.loop)
2152        outer = asyncio.shield(inner)
2153        test_utils.run_briefly(self.loop)
2154        inner.cancel()
2155        test_utils.run_briefly(self.loop)
2156        self.assertTrue(outer.cancelled())
2157
2158    def test_shield_cancel_outer(self):
2159        inner = self.new_future(self.loop)
2160        outer = asyncio.shield(inner)
2161        test_utils.run_briefly(self.loop)
2162        outer.cancel()
2163        test_utils.run_briefly(self.loop)
2164        self.assertTrue(outer.cancelled())
2165        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
2166
2167    def test_shield_shortcut(self):
2168        fut = self.new_future(self.loop)
2169        fut.set_result(42)
2170        res = self.loop.run_until_complete(asyncio.shield(fut))
2171        self.assertEqual(res, 42)
2172
2173    def test_shield_effect(self):
2174        # Cancelling outer() does not affect inner().
2175        proof = 0
2176        waiter = self.new_future(self.loop)
2177
2178        async def inner():
2179            nonlocal proof
2180            await waiter
2181            proof += 1
2182
2183        async def outer():
2184            nonlocal proof
2185            await asyncio.shield(inner())
2186            proof += 100
2187
2188        f = asyncio.ensure_future(outer(), loop=self.loop)
2189        test_utils.run_briefly(self.loop)
2190        f.cancel()
2191        with self.assertRaises(asyncio.CancelledError):
2192            self.loop.run_until_complete(f)
2193        waiter.set_result(None)
2194        test_utils.run_briefly(self.loop)
2195        self.assertEqual(proof, 1)
2196
2197    def test_shield_gather(self):
2198        child1 = self.new_future(self.loop)
2199        child2 = self.new_future(self.loop)
2200        parent = asyncio.gather(child1, child2)
2201        outer = asyncio.shield(parent)
2202        test_utils.run_briefly(self.loop)
2203        outer.cancel()
2204        test_utils.run_briefly(self.loop)
2205        self.assertTrue(outer.cancelled())
2206        child1.set_result(1)
2207        child2.set_result(2)
2208        test_utils.run_briefly(self.loop)
2209        self.assertEqual(parent.result(), [1, 2])
2210
2211    def test_gather_shield(self):
2212        child1 = self.new_future(self.loop)
2213        child2 = self.new_future(self.loop)
2214        inner1 = asyncio.shield(child1)
2215        inner2 = asyncio.shield(child2)
2216        parent = asyncio.gather(inner1, inner2)
2217        test_utils.run_briefly(self.loop)
2218        parent.cancel()
2219        # This should cancel inner1 and inner2 but bot child1 and child2.
2220        test_utils.run_briefly(self.loop)
2221        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
2222        self.assertTrue(inner1.cancelled())
2223        self.assertTrue(inner2.cancelled())
2224        child1.set_result(1)
2225        child2.set_result(2)
2226        test_utils.run_briefly(self.loop)
2227
2228    def test_shield_coroutine_without_loop(self):
2229        async def coro():
2230            return 42
2231
2232        inner = coro()
2233        self.addCleanup(inner.close)
2234        with self.assertWarns(DeprecationWarning) as cm:
2235            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
2236                asyncio.shield(inner)
2237        self.assertEqual(cm.warnings[0].filename, __file__)
2238
2239    def test_shield_coroutine_use_running_loop(self):
2240        async def coro():
2241            return 42
2242
2243        async def test():
2244            return asyncio.shield(coro())
2245        outer = self.loop.run_until_complete(test())
2246        self.assertEqual(outer._loop, self.loop)
2247        res = self.loop.run_until_complete(outer)
2248        self.assertEqual(res, 42)
2249
2250    def test_shield_coroutine_use_global_loop(self):
2251        # Deprecated in 3.10
2252        async def coro():
2253            return 42
2254
2255        asyncio.set_event_loop(self.loop)
2256        self.addCleanup(asyncio.set_event_loop, None)
2257        with self.assertWarns(DeprecationWarning) as cm:
2258            outer = asyncio.shield(coro())
2259        self.assertEqual(cm.warnings[0].filename, __file__)
2260        self.assertEqual(outer._loop, self.loop)
2261        res = self.loop.run_until_complete(outer)
2262        self.assertEqual(res, 42)
2263
2264    def test_as_completed_invalid_args(self):
2265        fut = self.new_future(self.loop)
2266
2267        # as_completed() expects a list of futures, not a future instance
2268        self.assertRaises(TypeError, self.loop.run_until_complete,
2269            asyncio.as_completed(fut))
2270        coro = coroutine_function()
2271        self.assertRaises(TypeError, self.loop.run_until_complete,
2272            asyncio.as_completed(coro))
2273        coro.close()
2274
2275    def test_wait_invalid_args(self):
2276        fut = self.new_future(self.loop)
2277
2278        # wait() expects a list of futures, not a future instance
2279        self.assertRaises(TypeError, self.loop.run_until_complete,
2280            asyncio.wait(fut))
2281        coro = coroutine_function()
2282        self.assertRaises(TypeError, self.loop.run_until_complete,
2283            asyncio.wait(coro))
2284        coro.close()
2285
2286        # wait() expects at least a future
2287        self.assertRaises(ValueError, self.loop.run_until_complete,
2288            asyncio.wait([]))
2289
2290    def test_corowrapper_mocks_generator(self):
2291
2292        def check():
2293            # A function that asserts various things.
2294            # Called twice, with different debug flag values.
2295
2296            with self.assertWarns(DeprecationWarning):
2297                @asyncio.coroutine
2298                def coro():
2299                    # The actual coroutine.
2300                    self.assertTrue(gen.gi_running)
2301                    yield from fut
2302
2303            # A completed Future used to run the coroutine.
2304            fut = self.new_future(self.loop)
2305            fut.set_result(None)
2306
2307            # Call the coroutine.
2308            gen = coro()
2309
2310            # Check some properties.
2311            self.assertTrue(asyncio.iscoroutine(gen))
2312            self.assertIsInstance(gen.gi_frame, types.FrameType)
2313            self.assertFalse(gen.gi_running)
2314            self.assertIsInstance(gen.gi_code, types.CodeType)
2315
2316            # Run it.
2317            self.loop.run_until_complete(gen)
2318
2319            # The frame should have changed.
2320            self.assertIsNone(gen.gi_frame)
2321
2322        # Test with debug flag cleared.
2323        with set_coroutine_debug(False):
2324            check()
2325
2326        # Test with debug flag set.
2327        with set_coroutine_debug(True):
2328            check()
2329
2330    def test_yield_from_corowrapper(self):
2331        with set_coroutine_debug(True):
2332            with self.assertWarns(DeprecationWarning):
2333                @asyncio.coroutine
2334                def t1():
2335                    return (yield from t2())
2336
2337            with self.assertWarns(DeprecationWarning):
2338                @asyncio.coroutine
2339                def t2():
2340                    f = self.new_future(self.loop)
2341                    self.new_task(self.loop, t3(f))
2342                    return (yield from f)
2343
2344            with self.assertWarns(DeprecationWarning):
2345                @asyncio.coroutine
2346                def t3(f):
2347                    f.set_result((1, 2, 3))
2348
2349            task = self.new_task(self.loop, t1())
2350            val = self.loop.run_until_complete(task)
2351            self.assertEqual(val, (1, 2, 3))
2352
2353    def test_yield_from_corowrapper_send(self):
2354        def foo():
2355            a = yield
2356            return a
2357
2358        def call(arg):
2359            cw = asyncio.coroutines.CoroWrapper(foo())
2360            cw.send(None)
2361            try:
2362                cw.send(arg)
2363            except StopIteration as ex:
2364                return ex.args[0]
2365            else:
2366                raise AssertionError('StopIteration was expected')
2367
2368        self.assertEqual(call((1, 2)), (1, 2))
2369        self.assertEqual(call('spam'), 'spam')
2370
2371    def test_corowrapper_weakref(self):
2372        wd = weakref.WeakValueDictionary()
2373        def foo(): yield from []
2374        cw = asyncio.coroutines.CoroWrapper(foo())
2375        wd['cw'] = cw  # Would fail without __weakref__ slot.
2376        cw.gen = None  # Suppress warning from __del__.
2377
2378    def test_corowrapper_throw(self):
2379        # Issue 429: CoroWrapper.throw must be compatible with gen.throw
2380        def foo():
2381            value = None
2382            while True:
2383                try:
2384                    value = yield value
2385                except Exception as e:
2386                    value = e
2387
2388        exception = Exception("foo")
2389        cw = asyncio.coroutines.CoroWrapper(foo())
2390        cw.send(None)
2391        self.assertIs(exception, cw.throw(exception))
2392
2393        cw = asyncio.coroutines.CoroWrapper(foo())
2394        cw.send(None)
2395        self.assertIs(exception, cw.throw(Exception, exception))
2396
2397        cw = asyncio.coroutines.CoroWrapper(foo())
2398        cw.send(None)
2399        exception = cw.throw(Exception, "foo")
2400        self.assertIsInstance(exception, Exception)
2401        self.assertEqual(exception.args, ("foo", ))
2402
2403        cw = asyncio.coroutines.CoroWrapper(foo())
2404        cw.send(None)
2405        exception = cw.throw(Exception, "foo", None)
2406        self.assertIsInstance(exception, Exception)
2407        self.assertEqual(exception.args, ("foo", ))
2408
2409    def test_log_destroyed_pending_task(self):
2410        Task = self.__class__.Task
2411
2412        with self.assertWarns(DeprecationWarning):
2413            @asyncio.coroutine
2414            def kill_me(loop):
2415                future = self.new_future(loop)
2416                yield from future
2417                # at this point, the only reference to kill_me() task is
2418                # the Task._wakeup() method in future._callbacks
2419                raise Exception("code never reached")
2420
2421        mock_handler = mock.Mock()
2422        self.loop.set_debug(True)
2423        self.loop.set_exception_handler(mock_handler)
2424
2425        # schedule the task
2426        coro = kill_me(self.loop)
2427        task = asyncio.ensure_future(coro, loop=self.loop)
2428
2429        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2430
2431        asyncio.set_event_loop(None)
2432
2433        # execute the task so it waits for future
2434        self.loop._run_once()
2435        self.assertEqual(len(self.loop._ready), 0)
2436
2437        # remove the future used in kill_me(), and references to the task
2438        del coro.gi_frame.f_locals['future']
2439        coro = None
2440        source_traceback = task._source_traceback
2441        task = None
2442
2443        # no more reference to kill_me() task: the task is destroyed by the GC
2444        support.gc_collect()
2445
2446        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2447
2448        mock_handler.assert_called_with(self.loop, {
2449            'message': 'Task was destroyed but it is pending!',
2450            'task': mock.ANY,
2451            'source_traceback': source_traceback,
2452        })
2453        mock_handler.reset_mock()
2454
2455    @mock.patch('asyncio.base_events.logger')
2456    def test_tb_logger_not_called_after_cancel(self, m_log):
2457        loop = asyncio.new_event_loop()
2458        self.set_event_loop(loop)
2459
2460        async def coro():
2461            raise TypeError
2462
2463        async def runner():
2464            task = self.new_task(loop, coro())
2465            await asyncio.sleep(0.05)
2466            task.cancel()
2467            task = None
2468
2469        loop.run_until_complete(runner())
2470        self.assertFalse(m_log.error.called)
2471
2472    @mock.patch('asyncio.coroutines.logger')
2473    def test_coroutine_never_yielded(self, m_log):
2474        with set_coroutine_debug(True):
2475            with self.assertWarns(DeprecationWarning):
2476                @asyncio.coroutine
2477                def coro_noop():
2478                    pass
2479
2480        tb_filename = __file__
2481        tb_lineno = sys._getframe().f_lineno + 2
2482        # create a coroutine object but don't use it
2483        coro_noop()
2484        support.gc_collect()
2485
2486        self.assertTrue(m_log.error.called)
2487        message = m_log.error.call_args[0][0]
2488        func_filename, func_lineno = test_utils.get_function_source(coro_noop)
2489
2490        regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
2491                    r'was never yielded from\n'
2492                 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
2493                 r'.*\n'
2494                 r'  File "%s", line %s, in test_coroutine_never_yielded\n'
2495                 r'    coro_noop\(\)$'
2496                 % (re.escape(coro_noop.__qualname__),
2497                    re.escape(func_filename), func_lineno,
2498                    re.escape(tb_filename), tb_lineno))
2499
2500        self.assertRegex(message, re.compile(regex, re.DOTALL))
2501
2502    def test_return_coroutine_from_coroutine(self):
2503        """Return of @asyncio.coroutine()-wrapped function generator object
2504        from @asyncio.coroutine()-wrapped function should have same effect as
2505        returning generator object or Future."""
2506        def check():
2507            with self.assertWarns(DeprecationWarning):
2508                @asyncio.coroutine
2509                def outer_coro():
2510                    with self.assertWarns(DeprecationWarning):
2511                        @asyncio.coroutine
2512                        def inner_coro():
2513                            return 1
2514
2515                    return inner_coro()
2516
2517            result = self.loop.run_until_complete(outer_coro())
2518            self.assertEqual(result, 1)
2519
2520        # Test with debug flag cleared.
2521        with set_coroutine_debug(False):
2522            check()
2523
2524        # Test with debug flag set.
2525        with set_coroutine_debug(True):
2526            check()
2527
2528    def test_task_source_traceback(self):
2529        self.loop.set_debug(True)
2530
2531        task = self.new_task(self.loop, coroutine_function())
2532        lineno = sys._getframe().f_lineno - 1
2533        self.assertIsInstance(task._source_traceback, list)
2534        self.assertEqual(task._source_traceback[-2][:3],
2535                         (__file__,
2536                          lineno,
2537                          'test_task_source_traceback'))
2538        self.loop.run_until_complete(task)
2539
2540    def _test_cancel_wait_for(self, timeout):
2541        loop = asyncio.new_event_loop()
2542        self.addCleanup(loop.close)
2543
2544        async def blocking_coroutine():
2545            fut = self.new_future(loop)
2546            # Block: fut result is never set
2547            await fut
2548
2549        task = loop.create_task(blocking_coroutine())
2550
2551        wait = loop.create_task(asyncio.wait_for(task, timeout))
2552        loop.call_soon(wait.cancel)
2553
2554        self.assertRaises(asyncio.CancelledError,
2555                          loop.run_until_complete, wait)
2556
2557        # Python issue #23219: cancelling the wait must also cancel the task
2558        self.assertTrue(task.cancelled())
2559
2560    def test_cancel_blocking_wait_for(self):
2561        self._test_cancel_wait_for(None)
2562
2563    def test_cancel_wait_for(self):
2564        self._test_cancel_wait_for(60.0)
2565
2566    def test_cancel_gather_1(self):
2567        """Ensure that a gathering future refuses to be cancelled once all
2568        children are done"""
2569        loop = asyncio.new_event_loop()
2570        self.addCleanup(loop.close)
2571
2572        fut = self.new_future(loop)
2573        async def create():
2574            # The indirection fut->child_coro is needed since otherwise the
2575            # gathering task is done at the same time as the child future
2576            def child_coro():
2577                return (yield from fut)
2578            gather_future = asyncio.gather(child_coro())
2579            return asyncio.ensure_future(gather_future)
2580        gather_task = loop.run_until_complete(create())
2581
2582        cancel_result = None
2583        def cancelling_callback(_):
2584            nonlocal cancel_result
2585            cancel_result = gather_task.cancel()
2586        fut.add_done_callback(cancelling_callback)
2587
2588        fut.set_result(42) # calls the cancelling_callback after fut is done()
2589
2590        # At this point the task should complete.
2591        loop.run_until_complete(gather_task)
2592
2593        # Python issue #26923: asyncio.gather drops cancellation
2594        self.assertEqual(cancel_result, False)
2595        self.assertFalse(gather_task.cancelled())
2596        self.assertEqual(gather_task.result(), [42])
2597
2598    def test_cancel_gather_2(self):
2599        cases = [
2600            ((), ()),
2601            ((None,), ()),
2602            (('my message',), ('my message',)),
2603            # Non-string values should roundtrip.
2604            ((5,), (5,)),
2605        ]
2606        for cancel_args, expected_args in cases:
2607            with self.subTest(cancel_args=cancel_args):
2608                loop = asyncio.new_event_loop()
2609                self.addCleanup(loop.close)
2610
2611                async def test():
2612                    time = 0
2613                    while True:
2614                        time += 0.05
2615                        await asyncio.gather(asyncio.sleep(0.05),
2616                                             return_exceptions=True)
2617                        if time > 1:
2618                            return
2619
2620                async def main():
2621                    qwe = self.new_task(loop, test())
2622                    await asyncio.sleep(0.2)
2623                    qwe.cancel(*cancel_args)
2624                    await qwe
2625
2626                try:
2627                    loop.run_until_complete(main())
2628                except asyncio.CancelledError as exc:
2629                    self.assertEqual(exc.args, ())
2630                    exc_type, exc_args, depth = get_innermost_context(exc)
2631                    self.assertEqual((exc_type, exc_args),
2632                        (asyncio.CancelledError, expected_args))
2633                    # The exact traceback seems to vary in CI.
2634                    self.assertIn(depth, (2, 3))
2635                else:
2636                    self.fail('gather did not propagate the cancellation '
2637                              'request')
2638
2639    def test_exception_traceback(self):
2640        # See http://bugs.python.org/issue28843
2641
2642        async def foo():
2643            1 / 0
2644
2645        async def main():
2646            task = self.new_task(self.loop, foo())
2647            await asyncio.sleep(0)  # skip one loop iteration
2648            self.assertIsNotNone(task.exception().__traceback__)
2649
2650        self.loop.run_until_complete(main())
2651
2652    @mock.patch('asyncio.base_events.logger')
2653    def test_error_in_call_soon(self, m_log):
2654        def call_soon(callback, *args, **kwargs):
2655            raise ValueError
2656        self.loop.call_soon = call_soon
2657
2658        with self.assertWarns(DeprecationWarning):
2659            @asyncio.coroutine
2660            def coro():
2661                pass
2662
2663        self.assertFalse(m_log.error.called)
2664
2665        with self.assertRaises(ValueError):
2666            gen = coro()
2667            try:
2668                self.new_task(self.loop, gen)
2669            finally:
2670                gen.close()
2671        gc.collect()  # For PyPy or other GCs.
2672
2673        self.assertTrue(m_log.error.called)
2674        message = m_log.error.call_args[0][0]
2675        self.assertIn('Task was destroyed but it is pending', message)
2676
2677        self.assertEqual(asyncio.all_tasks(self.loop), set())
2678
2679    def test_create_task_with_noncoroutine(self):
2680        with self.assertRaisesRegex(TypeError,
2681                                    "a coroutine was expected, got 123"):
2682            self.new_task(self.loop, 123)
2683
2684        # test it for the second time to ensure that caching
2685        # in asyncio.iscoroutine() doesn't break things.
2686        with self.assertRaisesRegex(TypeError,
2687                                    "a coroutine was expected, got 123"):
2688            self.new_task(self.loop, 123)
2689
2690    def test_create_task_with_oldstyle_coroutine(self):
2691
2692        with self.assertWarns(DeprecationWarning):
2693            @asyncio.coroutine
2694            def coro():
2695                pass
2696
2697        task = self.new_task(self.loop, coro())
2698        self.assertIsInstance(task, self.Task)
2699        self.loop.run_until_complete(task)
2700
2701        # test it for the second time to ensure that caching
2702        # in asyncio.iscoroutine() doesn't break things.
2703        task = self.new_task(self.loop, coro())
2704        self.assertIsInstance(task, self.Task)
2705        self.loop.run_until_complete(task)
2706
2707    def test_create_task_with_async_function(self):
2708
2709        async def coro():
2710            pass
2711
2712        task = self.new_task(self.loop, coro())
2713        self.assertIsInstance(task, self.Task)
2714        self.loop.run_until_complete(task)
2715
2716        # test it for the second time to ensure that caching
2717        # in asyncio.iscoroutine() doesn't break things.
2718        task = self.new_task(self.loop, coro())
2719        self.assertIsInstance(task, self.Task)
2720        self.loop.run_until_complete(task)
2721
2722    def test_create_task_with_asynclike_function(self):
2723        task = self.new_task(self.loop, CoroLikeObject())
2724        self.assertIsInstance(task, self.Task)
2725        self.assertEqual(self.loop.run_until_complete(task), 42)
2726
2727        # test it for the second time to ensure that caching
2728        # in asyncio.iscoroutine() doesn't break things.
2729        task = self.new_task(self.loop, CoroLikeObject())
2730        self.assertIsInstance(task, self.Task)
2731        self.assertEqual(self.loop.run_until_complete(task), 42)
2732
2733    def test_bare_create_task(self):
2734
2735        async def inner():
2736            return 1
2737
2738        async def coro():
2739            task = asyncio.create_task(inner())
2740            self.assertIsInstance(task, self.Task)
2741            ret = await task
2742            self.assertEqual(1, ret)
2743
2744        self.loop.run_until_complete(coro())
2745
2746    def test_bare_create_named_task(self):
2747
2748        async def coro_noop():
2749            pass
2750
2751        async def coro():
2752            task = asyncio.create_task(coro_noop(), name='No-op')
2753            self.assertEqual(task.get_name(), 'No-op')
2754            await task
2755
2756        self.loop.run_until_complete(coro())
2757
2758    def test_context_1(self):
2759        cvar = contextvars.ContextVar('cvar', default='nope')
2760
2761        async def sub():
2762            await asyncio.sleep(0.01)
2763            self.assertEqual(cvar.get(), 'nope')
2764            cvar.set('something else')
2765
2766        async def main():
2767            self.assertEqual(cvar.get(), 'nope')
2768            subtask = self.new_task(loop, sub())
2769            cvar.set('yes')
2770            self.assertEqual(cvar.get(), 'yes')
2771            await subtask
2772            self.assertEqual(cvar.get(), 'yes')
2773
2774        loop = asyncio.new_event_loop()
2775        try:
2776            task = self.new_task(loop, main())
2777            loop.run_until_complete(task)
2778        finally:
2779            loop.close()
2780
2781    def test_context_2(self):
2782        cvar = contextvars.ContextVar('cvar', default='nope')
2783
2784        async def main():
2785            def fut_on_done(fut):
2786                # This change must not pollute the context
2787                # of the "main()" task.
2788                cvar.set('something else')
2789
2790            self.assertEqual(cvar.get(), 'nope')
2791
2792            for j in range(2):
2793                fut = self.new_future(loop)
2794                fut.add_done_callback(fut_on_done)
2795                cvar.set(f'yes{j}')
2796                loop.call_soon(fut.set_result, None)
2797                await fut
2798                self.assertEqual(cvar.get(), f'yes{j}')
2799
2800                for i in range(3):
2801                    # Test that task passed its context to add_done_callback:
2802                    cvar.set(f'yes{i}-{j}')
2803                    await asyncio.sleep(0.001)
2804                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2805
2806        loop = asyncio.new_event_loop()
2807        try:
2808            task = self.new_task(loop, main())
2809            loop.run_until_complete(task)
2810        finally:
2811            loop.close()
2812
2813        self.assertEqual(cvar.get(), 'nope')
2814
2815    def test_context_3(self):
2816        # Run 100 Tasks in parallel, each modifying cvar.
2817
2818        cvar = contextvars.ContextVar('cvar', default=-1)
2819
2820        async def sub(num):
2821            for i in range(10):
2822                cvar.set(num + i)
2823                await asyncio.sleep(random.uniform(0.001, 0.05))
2824                self.assertEqual(cvar.get(), num + i)
2825
2826        async def main():
2827            tasks = []
2828            for i in range(100):
2829                task = loop.create_task(sub(random.randint(0, 10)))
2830                tasks.append(task)
2831
2832            await asyncio.gather(*tasks)
2833
2834        loop = asyncio.new_event_loop()
2835        try:
2836            loop.run_until_complete(main())
2837        finally:
2838            loop.close()
2839
2840        self.assertEqual(cvar.get(), -1)
2841
2842    def test_get_coro(self):
2843        loop = asyncio.new_event_loop()
2844        coro = coroutine_function()
2845        try:
2846            task = self.new_task(loop, coro)
2847            loop.run_until_complete(task)
2848            self.assertIs(task.get_coro(), coro)
2849        finally:
2850            loop.close()
2851
2852
2853def add_subclass_tests(cls):
2854    BaseTask = cls.Task
2855    BaseFuture = cls.Future
2856
2857    if BaseTask is None or BaseFuture is None:
2858        return cls
2859
2860    class CommonFuture:
2861        def __init__(self, *args, **kwargs):
2862            self.calls = collections.defaultdict(lambda: 0)
2863            super().__init__(*args, **kwargs)
2864
2865        def add_done_callback(self, *args, **kwargs):
2866            self.calls['add_done_callback'] += 1
2867            return super().add_done_callback(*args, **kwargs)
2868
2869    class Task(CommonFuture, BaseTask):
2870        pass
2871
2872    class Future(CommonFuture, BaseFuture):
2873        pass
2874
2875    def test_subclasses_ctask_cfuture(self):
2876        fut = self.Future(loop=self.loop)
2877
2878        async def func():
2879            self.loop.call_soon(lambda: fut.set_result('spam'))
2880            return await fut
2881
2882        task = self.Task(func(), loop=self.loop)
2883
2884        result = self.loop.run_until_complete(task)
2885
2886        self.assertEqual(result, 'spam')
2887
2888        self.assertEqual(
2889            dict(task.calls),
2890            {'add_done_callback': 1})
2891
2892        self.assertEqual(
2893            dict(fut.calls),
2894            {'add_done_callback': 1})
2895
2896    # Add patched Task & Future back to the test case
2897    cls.Task = Task
2898    cls.Future = Future
2899
2900    # Add an extra unit-test
2901    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2902
2903    # Disable the "test_task_source_traceback" test
2904    # (the test is hardcoded for a particular call stack, which
2905    # is slightly different for Task subclasses)
2906    cls.test_task_source_traceback = None
2907
2908    return cls
2909
2910
2911class SetMethodsTest:
2912
2913    def test_set_result_causes_invalid_state(self):
2914        Future = type(self).Future
2915        self.loop.call_exception_handler = exc_handler = mock.Mock()
2916
2917        async def foo():
2918            await asyncio.sleep(0.1)
2919            return 10
2920
2921        coro = foo()
2922        task = self.new_task(self.loop, coro)
2923        Future.set_result(task, 'spam')
2924
2925        self.assertEqual(
2926            self.loop.run_until_complete(task),
2927            'spam')
2928
2929        exc_handler.assert_called_once()
2930        exc = exc_handler.call_args[0][0]['exception']
2931        with self.assertRaisesRegex(asyncio.InvalidStateError,
2932                                    r'step\(\): already done'):
2933            raise exc
2934
2935        coro.close()
2936
2937    def test_set_exception_causes_invalid_state(self):
2938        class MyExc(Exception):
2939            pass
2940
2941        Future = type(self).Future
2942        self.loop.call_exception_handler = exc_handler = mock.Mock()
2943
2944        async def foo():
2945            await asyncio.sleep(0.1)
2946            return 10
2947
2948        coro = foo()
2949        task = self.new_task(self.loop, coro)
2950        Future.set_exception(task, MyExc())
2951
2952        with self.assertRaises(MyExc):
2953            self.loop.run_until_complete(task)
2954
2955        exc_handler.assert_called_once()
2956        exc = exc_handler.call_args[0][0]['exception']
2957        with self.assertRaisesRegex(asyncio.InvalidStateError,
2958                                    r'step\(\): already done'):
2959            raise exc
2960
2961        coro.close()
2962
2963
2964@unittest.skipUnless(hasattr(futures, '_CFuture') and
2965                     hasattr(tasks, '_CTask'),
2966                     'requires the C _asyncio module')
2967class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2968                          test_utils.TestCase):
2969
2970    Task = getattr(tasks, '_CTask', None)
2971    Future = getattr(futures, '_CFuture', None)
2972
2973    @support.refcount_test
2974    def test_refleaks_in_task___init__(self):
2975        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2976        async def coro():
2977            pass
2978        task = self.new_task(self.loop, coro())
2979        self.loop.run_until_complete(task)
2980        refs_before = gettotalrefcount()
2981        for i in range(100):
2982            task.__init__(coro(), loop=self.loop)
2983            self.loop.run_until_complete(task)
2984        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2985
2986    def test_del__log_destroy_pending_segfault(self):
2987        async def coro():
2988            pass
2989        task = self.new_task(self.loop, coro())
2990        self.loop.run_until_complete(task)
2991        with self.assertRaises(AttributeError):
2992            del task._log_destroy_pending
2993
2994
2995@unittest.skipUnless(hasattr(futures, '_CFuture') and
2996                     hasattr(tasks, '_CTask'),
2997                     'requires the C _asyncio module')
2998@add_subclass_tests
2999class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
3000
3001    Task = getattr(tasks, '_CTask', None)
3002    Future = getattr(futures, '_CFuture', None)
3003
3004
3005@unittest.skipUnless(hasattr(tasks, '_CTask'),
3006                     'requires the C _asyncio module')
3007@add_subclass_tests
3008class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
3009
3010    Task = getattr(tasks, '_CTask', None)
3011    Future = futures._PyFuture
3012
3013
3014@unittest.skipUnless(hasattr(futures, '_CFuture'),
3015                     'requires the C _asyncio module')
3016@add_subclass_tests
3017class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
3018
3019    Future = getattr(futures, '_CFuture', None)
3020    Task = tasks._PyTask
3021
3022
3023@unittest.skipUnless(hasattr(tasks, '_CTask'),
3024                     'requires the C _asyncio module')
3025class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
3026
3027    Task = getattr(tasks, '_CTask', None)
3028    Future = futures._PyFuture
3029
3030
3031@unittest.skipUnless(hasattr(futures, '_CFuture'),
3032                     'requires the C _asyncio module')
3033class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
3034
3035    Task = tasks._PyTask
3036    Future = getattr(futures, '_CFuture', None)
3037
3038
3039class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
3040                            test_utils.TestCase):
3041
3042    Task = tasks._PyTask
3043    Future = futures._PyFuture
3044
3045
3046@add_subclass_tests
3047class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
3048    Task = tasks._PyTask
3049    Future = futures._PyFuture
3050
3051
3052@unittest.skipUnless(hasattr(tasks, '_CTask'),
3053                     'requires the C _asyncio module')
3054class CTask_Future_Tests(test_utils.TestCase):
3055
3056    def test_foobar(self):
3057        class Fut(asyncio.Future):
3058            @property
3059            def get_loop(self):
3060                raise AttributeError
3061
3062        async def coro():
3063            await fut
3064            return 'spam'
3065
3066        self.loop = asyncio.new_event_loop()
3067        try:
3068            fut = Fut(loop=self.loop)
3069            self.loop.call_later(0.1, fut.set_result, 1)
3070            task = self.loop.create_task(coro())
3071            res = self.loop.run_until_complete(task)
3072        finally:
3073            self.loop.close()
3074
3075        self.assertEqual(res, 'spam')
3076
3077
3078class BaseTaskIntrospectionTests:
3079    _register_task = None
3080    _unregister_task = None
3081    _enter_task = None
3082    _leave_task = None
3083
3084    def test__register_task_1(self):
3085        class TaskLike:
3086            @property
3087            def _loop(self):
3088                return loop
3089
3090            def done(self):
3091                return False
3092
3093        task = TaskLike()
3094        loop = mock.Mock()
3095
3096        self.assertEqual(asyncio.all_tasks(loop), set())
3097        self._register_task(task)
3098        self.assertEqual(asyncio.all_tasks(loop), {task})
3099        self._unregister_task(task)
3100
3101    def test__register_task_2(self):
3102        class TaskLike:
3103            def get_loop(self):
3104                return loop
3105
3106            def done(self):
3107                return False
3108
3109        task = TaskLike()
3110        loop = mock.Mock()
3111
3112        self.assertEqual(asyncio.all_tasks(loop), set())
3113        self._register_task(task)
3114        self.assertEqual(asyncio.all_tasks(loop), {task})
3115        self._unregister_task(task)
3116
3117    def test__register_task_3(self):
3118        class TaskLike:
3119            def get_loop(self):
3120                return loop
3121
3122            def done(self):
3123                return True
3124
3125        task = TaskLike()
3126        loop = mock.Mock()
3127
3128        self.assertEqual(asyncio.all_tasks(loop), set())
3129        self._register_task(task)
3130        self.assertEqual(asyncio.all_tasks(loop), set())
3131        self._unregister_task(task)
3132
3133    def test__enter_task(self):
3134        task = mock.Mock()
3135        loop = mock.Mock()
3136        self.assertIsNone(asyncio.current_task(loop))
3137        self._enter_task(loop, task)
3138        self.assertIs(asyncio.current_task(loop), task)
3139        self._leave_task(loop, task)
3140
3141    def test__enter_task_failure(self):
3142        task1 = mock.Mock()
3143        task2 = mock.Mock()
3144        loop = mock.Mock()
3145        self._enter_task(loop, task1)
3146        with self.assertRaises(RuntimeError):
3147            self._enter_task(loop, task2)
3148        self.assertIs(asyncio.current_task(loop), task1)
3149        self._leave_task(loop, task1)
3150
3151    def test__leave_task(self):
3152        task = mock.Mock()
3153        loop = mock.Mock()
3154        self._enter_task(loop, task)
3155        self._leave_task(loop, task)
3156        self.assertIsNone(asyncio.current_task(loop))
3157
3158    def test__leave_task_failure1(self):
3159        task1 = mock.Mock()
3160        task2 = mock.Mock()
3161        loop = mock.Mock()
3162        self._enter_task(loop, task1)
3163        with self.assertRaises(RuntimeError):
3164            self._leave_task(loop, task2)
3165        self.assertIs(asyncio.current_task(loop), task1)
3166        self._leave_task(loop, task1)
3167
3168    def test__leave_task_failure2(self):
3169        task = mock.Mock()
3170        loop = mock.Mock()
3171        with self.assertRaises(RuntimeError):
3172            self._leave_task(loop, task)
3173        self.assertIsNone(asyncio.current_task(loop))
3174
3175    def test__unregister_task(self):
3176        task = mock.Mock()
3177        loop = mock.Mock()
3178        task.get_loop = lambda: loop
3179        self._register_task(task)
3180        self._unregister_task(task)
3181        self.assertEqual(asyncio.all_tasks(loop), set())
3182
3183    def test__unregister_task_not_registered(self):
3184        task = mock.Mock()
3185        loop = mock.Mock()
3186        self._unregister_task(task)
3187        self.assertEqual(asyncio.all_tasks(loop), set())
3188
3189
3190class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3191    _register_task = staticmethod(tasks._py_register_task)
3192    _unregister_task = staticmethod(tasks._py_unregister_task)
3193    _enter_task = staticmethod(tasks._py_enter_task)
3194    _leave_task = staticmethod(tasks._py_leave_task)
3195
3196
3197@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
3198                     'requires the C _asyncio module')
3199class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3200    if hasattr(tasks, '_c_register_task'):
3201        _register_task = staticmethod(tasks._c_register_task)
3202        _unregister_task = staticmethod(tasks._c_unregister_task)
3203        _enter_task = staticmethod(tasks._c_enter_task)
3204        _leave_task = staticmethod(tasks._c_leave_task)
3205    else:
3206        _register_task = _unregister_task = _enter_task = _leave_task = None
3207
3208
3209class BaseCurrentLoopTests:
3210
3211    def setUp(self):
3212        super().setUp()
3213        self.loop = asyncio.new_event_loop()
3214        self.set_event_loop(self.loop)
3215
3216    def new_task(self, coro):
3217        raise NotImplementedError
3218
3219    def test_current_task_no_running_loop(self):
3220        self.assertIsNone(asyncio.current_task(loop=self.loop))
3221
3222    def test_current_task_no_running_loop_implicit(self):
3223        with self.assertRaises(RuntimeError):
3224            asyncio.current_task()
3225
3226    def test_current_task_with_implicit_loop(self):
3227        async def coro():
3228            self.assertIs(asyncio.current_task(loop=self.loop), task)
3229
3230            self.assertIs(asyncio.current_task(None), task)
3231            self.assertIs(asyncio.current_task(), task)
3232
3233        task = self.new_task(coro())
3234        self.loop.run_until_complete(task)
3235        self.assertIsNone(asyncio.current_task(loop=self.loop))
3236
3237
3238class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3239
3240    def new_task(self, coro):
3241        return tasks._PyTask(coro, loop=self.loop)
3242
3243
3244@unittest.skipUnless(hasattr(tasks, '_CTask'),
3245                     'requires the C _asyncio module')
3246class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3247
3248    def new_task(self, coro):
3249        return getattr(tasks, '_CTask')(coro, loop=self.loop)
3250
3251
3252class GenericTaskTests(test_utils.TestCase):
3253
3254    def test_future_subclass(self):
3255        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
3256
3257    @support.cpython_only
3258    def test_asyncio_module_compiled(self):
3259        # Because of circular imports it's easy to make _asyncio
3260        # module non-importable.  This is a simple test that will
3261        # fail on systems where C modules were successfully compiled
3262        # (hence the test for _functools etc), but _asyncio somehow didn't.
3263        try:
3264            import _functools
3265            import _json
3266            import _pickle
3267        except ImportError:
3268            self.skipTest('C modules are not available')
3269        else:
3270            try:
3271                import _asyncio
3272            except ImportError:
3273                self.fail('_asyncio module is missing')
3274
3275
3276class GatherTestsBase:
3277
3278    def setUp(self):
3279        super().setUp()
3280        self.one_loop = self.new_test_loop()
3281        self.other_loop = self.new_test_loop()
3282        self.set_event_loop(self.one_loop, cleanup=False)
3283
3284    def _run_loop(self, loop):
3285        while loop._ready:
3286            test_utils.run_briefly(loop)
3287
3288    def _check_success(self, **kwargs):
3289        a, b, c = [self.one_loop.create_future() for i in range(3)]
3290        fut = self._gather(*self.wrap_futures(a, b, c), **kwargs)
3291        cb = test_utils.MockCallback()
3292        fut.add_done_callback(cb)
3293        b.set_result(1)
3294        a.set_result(2)
3295        self._run_loop(self.one_loop)
3296        self.assertEqual(cb.called, False)
3297        self.assertFalse(fut.done())
3298        c.set_result(3)
3299        self._run_loop(self.one_loop)
3300        cb.assert_called_once_with(fut)
3301        self.assertEqual(fut.result(), [2, 1, 3])
3302
3303    def test_success(self):
3304        self._check_success()
3305        self._check_success(return_exceptions=False)
3306
3307    def test_result_exception_success(self):
3308        self._check_success(return_exceptions=True)
3309
3310    def test_one_exception(self):
3311        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3312        fut = self._gather(*self.wrap_futures(a, b, c, d, e))
3313        cb = test_utils.MockCallback()
3314        fut.add_done_callback(cb)
3315        exc = ZeroDivisionError()
3316        a.set_result(1)
3317        b.set_exception(exc)
3318        self._run_loop(self.one_loop)
3319        self.assertTrue(fut.done())
3320        cb.assert_called_once_with(fut)
3321        self.assertIs(fut.exception(), exc)
3322        # Does nothing
3323        c.set_result(3)
3324        d.cancel()
3325        e.set_exception(RuntimeError())
3326        e.exception()
3327
3328    def test_return_exceptions(self):
3329        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
3330        fut = self._gather(*self.wrap_futures(a, b, c, d),
3331                           return_exceptions=True)
3332        cb = test_utils.MockCallback()
3333        fut.add_done_callback(cb)
3334        exc = ZeroDivisionError()
3335        exc2 = RuntimeError()
3336        b.set_result(1)
3337        c.set_exception(exc)
3338        a.set_result(3)
3339        self._run_loop(self.one_loop)
3340        self.assertFalse(fut.done())
3341        d.set_exception(exc2)
3342        self._run_loop(self.one_loop)
3343        self.assertTrue(fut.done())
3344        cb.assert_called_once_with(fut)
3345        self.assertEqual(fut.result(), [3, 1, exc, exc2])
3346
3347    def test_env_var_debug(self):
3348        code = '\n'.join((
3349            'import asyncio.coroutines',
3350            'print(asyncio.coroutines._DEBUG)'))
3351
3352        # Test with -E to not fail if the unit test was run with
3353        # PYTHONASYNCIODEBUG set to a non-empty string
3354        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
3355        self.assertEqual(stdout.rstrip(), b'False')
3356
3357        sts, stdout, stderr = assert_python_ok('-c', code,
3358                                               PYTHONASYNCIODEBUG='',
3359                                               PYTHONDEVMODE='')
3360        self.assertEqual(stdout.rstrip(), b'False')
3361
3362        sts, stdout, stderr = assert_python_ok('-c', code,
3363                                               PYTHONASYNCIODEBUG='1',
3364                                               PYTHONDEVMODE='')
3365        self.assertEqual(stdout.rstrip(), b'True')
3366
3367        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
3368                                               PYTHONASYNCIODEBUG='1',
3369                                               PYTHONDEVMODE='')
3370        self.assertEqual(stdout.rstrip(), b'False')
3371
3372        # -X dev
3373        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
3374                                               '-c', code)
3375        self.assertEqual(stdout.rstrip(), b'True')
3376
3377
3378class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
3379
3380    def wrap_futures(self, *futures):
3381        return futures
3382
3383    def _gather(self, *args, **kwargs):
3384        return asyncio.gather(*args, **kwargs)
3385
3386    def test_constructor_empty_sequence_without_loop(self):
3387        with self.assertWarns(DeprecationWarning) as cm:
3388            with self.assertRaises(RuntimeError):
3389                asyncio.gather()
3390        self.assertEqual(cm.warnings[0].filename, __file__)
3391
3392    def test_constructor_empty_sequence_use_running_loop(self):
3393        async def gather():
3394            return asyncio.gather()
3395        fut = self.one_loop.run_until_complete(gather())
3396        self.assertIsInstance(fut, asyncio.Future)
3397        self.assertIs(fut._loop, self.one_loop)
3398        self._run_loop(self.one_loop)
3399        self.assertTrue(fut.done())
3400        self.assertEqual(fut.result(), [])
3401
3402    def test_constructor_empty_sequence_use_global_loop(self):
3403        # Deprecated in 3.10
3404        asyncio.set_event_loop(self.one_loop)
3405        self.addCleanup(asyncio.set_event_loop, None)
3406        with self.assertWarns(DeprecationWarning) as cm:
3407            fut = asyncio.gather()
3408        self.assertEqual(cm.warnings[0].filename, __file__)
3409        self.assertIsInstance(fut, asyncio.Future)
3410        self.assertIs(fut._loop, self.one_loop)
3411        self._run_loop(self.one_loop)
3412        self.assertTrue(fut.done())
3413        self.assertEqual(fut.result(), [])
3414
3415    def test_constructor_heterogenous_futures(self):
3416        fut1 = self.one_loop.create_future()
3417        fut2 = self.other_loop.create_future()
3418        with self.assertRaises(ValueError):
3419            asyncio.gather(fut1, fut2)
3420
3421    def test_constructor_homogenous_futures(self):
3422        children = [self.other_loop.create_future() for i in range(3)]
3423        fut = asyncio.gather(*children)
3424        self.assertIs(fut._loop, self.other_loop)
3425        self._run_loop(self.other_loop)
3426        self.assertFalse(fut.done())
3427        fut = asyncio.gather(*children)
3428        self.assertIs(fut._loop, self.other_loop)
3429        self._run_loop(self.other_loop)
3430        self.assertFalse(fut.done())
3431
3432    def test_one_cancellation(self):
3433        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3434        fut = asyncio.gather(a, b, c, d, e)
3435        cb = test_utils.MockCallback()
3436        fut.add_done_callback(cb)
3437        a.set_result(1)
3438        b.cancel()
3439        self._run_loop(self.one_loop)
3440        self.assertTrue(fut.done())
3441        cb.assert_called_once_with(fut)
3442        self.assertFalse(fut.cancelled())
3443        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3444        # Does nothing
3445        c.set_result(3)
3446        d.cancel()
3447        e.set_exception(RuntimeError())
3448        e.exception()
3449
3450    def test_result_exception_one_cancellation(self):
3451        a, b, c, d, e, f = [self.one_loop.create_future()
3452                            for i in range(6)]
3453        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3454        cb = test_utils.MockCallback()
3455        fut.add_done_callback(cb)
3456        a.set_result(1)
3457        zde = ZeroDivisionError()
3458        b.set_exception(zde)
3459        c.cancel()
3460        self._run_loop(self.one_loop)
3461        self.assertFalse(fut.done())
3462        d.set_result(3)
3463        e.cancel()
3464        rte = RuntimeError()
3465        f.set_exception(rte)
3466        res = self.one_loop.run_until_complete(fut)
3467        self.assertIsInstance(res[2], asyncio.CancelledError)
3468        self.assertIsInstance(res[4], asyncio.CancelledError)
3469        res[2] = res[4] = None
3470        self.assertEqual(res, [1, zde, None, 3, None, rte])
3471        cb.assert_called_once_with(fut)
3472
3473
3474class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3475
3476    def wrap_futures(self, *futures):
3477        coros = []
3478        for fut in futures:
3479            async def coro(fut=fut):
3480                return await fut
3481            coros.append(coro())
3482        return coros
3483
3484    def _gather(self, *args, **kwargs):
3485        async def coro():
3486            return asyncio.gather(*args, **kwargs)
3487        return self.one_loop.run_until_complete(coro())
3488
3489    def test_constructor_without_loop(self):
3490        async def coro():
3491            return 'abc'
3492        gen1 = coro()
3493        self.addCleanup(gen1.close)
3494        gen2 = coro()
3495        self.addCleanup(gen2.close)
3496        with self.assertWarns(DeprecationWarning) as cm:
3497            with self.assertRaises(RuntimeError):
3498                asyncio.gather(gen1, gen2)
3499        self.assertEqual(cm.warnings[0].filename, __file__)
3500
3501    def test_constructor_use_running_loop(self):
3502        async def coro():
3503            return 'abc'
3504        gen1 = coro()
3505        gen2 = coro()
3506        async def gather():
3507            return asyncio.gather(gen1, gen2)
3508        fut = self.one_loop.run_until_complete(gather())
3509        self.assertIs(fut._loop, self.one_loop)
3510        self.one_loop.run_until_complete(fut)
3511
3512    def test_constructor_use_global_loop(self):
3513        # Deprecated in 3.10
3514        async def coro():
3515            return 'abc'
3516        asyncio.set_event_loop(self.other_loop)
3517        self.addCleanup(asyncio.set_event_loop, None)
3518        gen1 = coro()
3519        gen2 = coro()
3520        with self.assertWarns(DeprecationWarning) as cm:
3521            fut = asyncio.gather(gen1, gen2)
3522        self.assertEqual(cm.warnings[0].filename, __file__)
3523        self.assertIs(fut._loop, self.other_loop)
3524        self.other_loop.run_until_complete(fut)
3525
3526    def test_duplicate_coroutines(self):
3527        with self.assertWarns(DeprecationWarning):
3528            @asyncio.coroutine
3529            def coro(s):
3530                return s
3531        c = coro('abc')
3532        fut = self._gather(c, c, coro('def'), c)
3533        self._run_loop(self.one_loop)
3534        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3535
3536    def test_cancellation_broadcast(self):
3537        # Cancelling outer() cancels all children.
3538        proof = 0
3539        waiter = self.one_loop.create_future()
3540
3541        async def inner():
3542            nonlocal proof
3543            await waiter
3544            proof += 1
3545
3546        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3547        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3548        gatherer = None
3549
3550        async def outer():
3551            nonlocal proof, gatherer
3552            gatherer = asyncio.gather(child1, child2)
3553            await gatherer
3554            proof += 100
3555
3556        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3557        test_utils.run_briefly(self.one_loop)
3558        self.assertTrue(f.cancel())
3559        with self.assertRaises(asyncio.CancelledError):
3560            self.one_loop.run_until_complete(f)
3561        self.assertFalse(gatherer.cancel())
3562        self.assertTrue(waiter.cancelled())
3563        self.assertTrue(child1.cancelled())
3564        self.assertTrue(child2.cancelled())
3565        test_utils.run_briefly(self.one_loop)
3566        self.assertEqual(proof, 0)
3567
3568    def test_exception_marking(self):
3569        # Test for the first line marked "Mark exception retrieved."
3570
3571        async def inner(f):
3572            await f
3573            raise RuntimeError('should not be ignored')
3574
3575        a = self.one_loop.create_future()
3576        b = self.one_loop.create_future()
3577
3578        async def outer():
3579            await asyncio.gather(inner(a), inner(b))
3580
3581        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3582        test_utils.run_briefly(self.one_loop)
3583        a.set_result(None)
3584        test_utils.run_briefly(self.one_loop)
3585        b.set_result(None)
3586        test_utils.run_briefly(self.one_loop)
3587        self.assertIsInstance(f.exception(), RuntimeError)
3588
3589
3590class RunCoroutineThreadsafeTests(test_utils.TestCase):
3591    """Test case for asyncio.run_coroutine_threadsafe."""
3592
3593    def setUp(self):
3594        super().setUp()
3595        self.loop = asyncio.new_event_loop()
3596        self.set_event_loop(self.loop) # Will cleanup properly
3597
3598    async def add(self, a, b, fail=False, cancel=False):
3599        """Wait 0.05 second and return a + b."""
3600        await asyncio.sleep(0.05)
3601        if fail:
3602            raise RuntimeError("Fail!")
3603        if cancel:
3604            asyncio.current_task(self.loop).cancel()
3605            await asyncio.sleep(0)
3606        return a + b
3607
3608    def target(self, fail=False, cancel=False, timeout=None,
3609               advance_coro=False):
3610        """Run add coroutine in the event loop."""
3611        coro = self.add(1, 2, fail=fail, cancel=cancel)
3612        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3613        if advance_coro:
3614            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3615            # otherwise it spills errors and breaks **other** unittests, since
3616            # 'target' is interacting with threads.
3617
3618            # With this call, `coro` will be advanced, so that
3619            # CoroWrapper.__del__ won't do anything when asyncio tests run
3620            # in debug mode.
3621            self.loop.call_soon_threadsafe(coro.send, None)
3622        try:
3623            return future.result(timeout)
3624        finally:
3625            future.done() or future.cancel()
3626
3627    def test_run_coroutine_threadsafe(self):
3628        """Test coroutine submission from a thread to an event loop."""
3629        future = self.loop.run_in_executor(None, self.target)
3630        result = self.loop.run_until_complete(future)
3631        self.assertEqual(result, 3)
3632
3633    def test_run_coroutine_threadsafe_with_exception(self):
3634        """Test coroutine submission from a thread to an event loop
3635        when an exception is raised."""
3636        future = self.loop.run_in_executor(None, self.target, True)
3637        with self.assertRaises(RuntimeError) as exc_context:
3638            self.loop.run_until_complete(future)
3639        self.assertIn("Fail!", exc_context.exception.args)
3640
3641    def test_run_coroutine_threadsafe_with_timeout(self):
3642        """Test coroutine submission from a thread to an event loop
3643        when a timeout is raised."""
3644        callback = lambda: self.target(timeout=0)
3645        future = self.loop.run_in_executor(None, callback)
3646        with self.assertRaises(asyncio.TimeoutError):
3647            self.loop.run_until_complete(future)
3648        test_utils.run_briefly(self.loop)
3649        # Check that there's no pending task (add has been cancelled)
3650        for task in asyncio.all_tasks(self.loop):
3651            self.assertTrue(task.done())
3652
3653    def test_run_coroutine_threadsafe_task_cancelled(self):
3654        """Test coroutine submission from a thread to an event loop
3655        when the task is cancelled."""
3656        callback = lambda: self.target(cancel=True)
3657        future = self.loop.run_in_executor(None, callback)
3658        with self.assertRaises(asyncio.CancelledError):
3659            self.loop.run_until_complete(future)
3660
3661    def test_run_coroutine_threadsafe_task_factory_exception(self):
3662        """Test coroutine submission from a thread to an event loop
3663        when the task factory raise an exception."""
3664
3665        def task_factory(loop, coro):
3666            raise NameError
3667
3668        run = self.loop.run_in_executor(
3669            None, lambda: self.target(advance_coro=True))
3670
3671        # Set exception handler
3672        callback = test_utils.MockCallback()
3673        self.loop.set_exception_handler(callback)
3674
3675        # Set corrupted task factory
3676        self.addCleanup(self.loop.set_task_factory,
3677                        self.loop.get_task_factory())
3678        self.loop.set_task_factory(task_factory)
3679
3680        # Run event loop
3681        with self.assertRaises(NameError) as exc_context:
3682            self.loop.run_until_complete(run)
3683
3684        # Check exceptions
3685        self.assertEqual(len(callback.call_args_list), 1)
3686        (loop, context), kwargs = callback.call_args
3687        self.assertEqual(context['exception'], exc_context.exception)
3688
3689
3690class SleepTests(test_utils.TestCase):
3691    def setUp(self):
3692        super().setUp()
3693        self.loop = asyncio.new_event_loop()
3694        self.set_event_loop(self.loop)
3695
3696    def tearDown(self):
3697        self.loop.close()
3698        self.loop = None
3699        super().tearDown()
3700
3701    def test_sleep_zero(self):
3702        result = 0
3703
3704        def inc_result(num):
3705            nonlocal result
3706            result += num
3707
3708        async def coro():
3709            self.loop.call_soon(inc_result, 1)
3710            self.assertEqual(result, 0)
3711            num = await asyncio.sleep(0, result=10)
3712            self.assertEqual(result, 1) # inc'ed by call_soon
3713            inc_result(num) # num should be 11
3714
3715        self.loop.run_until_complete(coro())
3716        self.assertEqual(result, 11)
3717
3718
3719class WaitTests(test_utils.TestCase):
3720    def setUp(self):
3721        super().setUp()
3722        self.loop = asyncio.new_event_loop()
3723        self.set_event_loop(self.loop)
3724
3725    def tearDown(self):
3726        self.loop.close()
3727        self.loop = None
3728        super().tearDown()
3729
3730    def test_coro_is_deprecated_in_wait(self):
3731        # Remove test when passing coros to asyncio.wait() is removed in 3.11
3732        with self.assertWarns(DeprecationWarning):
3733            self.loop.run_until_complete(
3734                asyncio.wait([coroutine_function()]))
3735
3736        task = self.loop.create_task(coroutine_function())
3737        with self.assertWarns(DeprecationWarning):
3738            self.loop.run_until_complete(
3739                asyncio.wait([task, coroutine_function()]))
3740
3741
3742class CompatibilityTests(test_utils.TestCase):
3743    # Tests for checking a bridge between old-styled coroutines
3744    # and async/await syntax
3745
3746    def setUp(self):
3747        super().setUp()
3748        self.loop = asyncio.new_event_loop()
3749        self.set_event_loop(self.loop)
3750
3751    def tearDown(self):
3752        self.loop.close()
3753        self.loop = None
3754        super().tearDown()
3755
3756    def test_yield_from_awaitable(self):
3757
3758        with self.assertWarns(DeprecationWarning):
3759            @asyncio.coroutine
3760            def coro():
3761                yield from asyncio.sleep(0)
3762                return 'ok'
3763
3764        result = self.loop.run_until_complete(coro())
3765        self.assertEqual('ok', result)
3766
3767    def test_await_old_style_coro(self):
3768
3769        with self.assertWarns(DeprecationWarning):
3770            @asyncio.coroutine
3771            def coro1():
3772                return 'ok1'
3773
3774        with self.assertWarns(DeprecationWarning):
3775            @asyncio.coroutine
3776            def coro2():
3777                yield from asyncio.sleep(0)
3778                return 'ok2'
3779
3780        async def inner():
3781            return await asyncio.gather(coro1(), coro2())
3782
3783        result = self.loop.run_until_complete(inner())
3784        self.assertEqual(['ok1', 'ok2'], result)
3785
3786    def test_debug_mode_interop(self):
3787        # https://bugs.python.org/issue32636
3788        code = textwrap.dedent("""
3789            import asyncio
3790
3791            async def native_coro():
3792                pass
3793
3794            @asyncio.coroutine
3795            def old_style_coro():
3796                yield from native_coro()
3797
3798            asyncio.run(old_style_coro())
3799        """)
3800
3801        assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
3802                         PYTHONASYNCIODEBUG="1")
3803
3804
3805if __name__ == '__main__':
3806    unittest.main()
3807