• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import traceback
14import types
15import unittest
16import weakref
17from unittest import mock
18from types import GenericAlias
19
20import asyncio
21from asyncio import coroutines
22from asyncio import futures
23from asyncio import tasks
24from test.test_asyncio import utils as test_utils
25from test import support
26from test.support.script_helper import assert_python_ok
27
28
29def tearDownModule():
30    asyncio.set_event_loop_policy(None)
31
32
33async def coroutine_function():
34    pass
35
36
37@contextlib.contextmanager
38def set_coroutine_debug(enabled):
39    coroutines = asyncio.coroutines
40
41    old_debug = coroutines._DEBUG
42    try:
43        coroutines._DEBUG = enabled
44        yield
45    finally:
46        coroutines._DEBUG = old_debug
47
48
49def format_coroutine(qualname, state, src, source_traceback, generator=False):
50    if generator:
51        state = '%s' % state
52    else:
53        state = '%s, defined' % state
54    if source_traceback is not None:
55        frame = source_traceback[-1]
56        return ('coro=<%s() %s at %s> created at %s:%s'
57                % (qualname, state, src, frame[0], frame[1]))
58    else:
59        return 'coro=<%s() %s at %s>' % (qualname, state, src)
60
61
62def get_innermost_context(exc):
63    """
64    Return information about the innermost exception context in the chain.
65    """
66    depth = 0
67    while True:
68        context = exc.__context__
69        if context is None:
70            break
71
72        exc = context
73        depth += 1
74
75    return (type(exc), exc.args, depth)
76
77
78class Dummy:
79
80    def __repr__(self):
81        return '<Dummy>'
82
83    def __call__(self, *args):
84        pass
85
86
87class CoroLikeObject:
88    def send(self, v):
89        raise StopIteration(42)
90
91    def throw(self, *exc):
92        pass
93
94    def close(self):
95        pass
96
97    def __await__(self):
98        return self
99
100
101# The following value can be used as a very small timeout:
102# it passes check "timeout > 0", but has almost
103# no effect on the test performance
104_EPSILON = 0.0001
105
106
107class BaseTaskTests:
108
109    Task = None
110    Future = None
111
112    def new_task(self, loop, coro, name='TestTask'):
113        return self.__class__.Task(coro, loop=loop, name=name)
114
115    def new_future(self, loop):
116        return self.__class__.Future(loop=loop)
117
118    def setUp(self):
119        super().setUp()
120        self.loop = self.new_test_loop()
121        self.loop.set_task_factory(self.new_task)
122        self.loop.create_future = lambda: self.new_future(self.loop)
123
124
125    def test_generic_alias(self):
126        task = self.__class__.Task[str]
127        self.assertEqual(task.__args__, (str,))
128        self.assertIsInstance(task, GenericAlias)
129
130    def test_task_cancel_message_getter(self):
131        async def coro():
132            pass
133        t = self.new_task(self.loop, coro())
134        self.assertTrue(hasattr(t, '_cancel_message'))
135        self.assertEqual(t._cancel_message, None)
136
137        t.cancel('my message')
138        self.assertEqual(t._cancel_message, 'my message')
139
140        with self.assertRaises(asyncio.CancelledError):
141            self.loop.run_until_complete(t)
142
143    def test_task_cancel_message_setter(self):
144        async def coro():
145            pass
146        t = self.new_task(self.loop, coro())
147        t.cancel('my message')
148        t._cancel_message = 'my new message'
149        self.assertEqual(t._cancel_message, 'my new message')
150
151        with self.assertRaises(asyncio.CancelledError):
152            self.loop.run_until_complete(t)
153
154    def test_task_del_collect(self):
155        class Evil:
156            def __del__(self):
157                gc.collect()
158
159        async def run():
160            return Evil()
161
162        self.loop.run_until_complete(
163            asyncio.gather(*[
164                self.new_task(self.loop, run()) for _ in range(100)
165            ]))
166
167    def test_other_loop_future(self):
168        other_loop = asyncio.new_event_loop()
169        fut = self.new_future(other_loop)
170
171        async def run(fut):
172            await fut
173
174        try:
175            with self.assertRaisesRegex(RuntimeError,
176                                        r'Task .* got Future .* attached'):
177                self.loop.run_until_complete(run(fut))
178        finally:
179            other_loop.close()
180
181    def test_task_awaits_on_itself(self):
182
183        async def test():
184            await task
185
186        task = asyncio.ensure_future(test(), loop=self.loop)
187
188        with self.assertRaisesRegex(RuntimeError,
189                                    'Task cannot await on itself'):
190            self.loop.run_until_complete(task)
191
192    def test_task_class(self):
193        async def notmuch():
194            return 'ok'
195        t = self.new_task(self.loop, notmuch())
196        self.loop.run_until_complete(t)
197        self.assertTrue(t.done())
198        self.assertEqual(t.result(), 'ok')
199        self.assertIs(t._loop, self.loop)
200        self.assertIs(t.get_loop(), self.loop)
201
202        loop = asyncio.new_event_loop()
203        self.set_event_loop(loop)
204        t = self.new_task(loop, notmuch())
205        self.assertIs(t._loop, loop)
206        loop.run_until_complete(t)
207        loop.close()
208
209    def test_ensure_future_coroutine(self):
210        async def notmuch():
211            return 'ok'
212        t = asyncio.ensure_future(notmuch(), loop=self.loop)
213        self.assertIs(t._loop, self.loop)
214        self.loop.run_until_complete(t)
215        self.assertTrue(t.done())
216        self.assertEqual(t.result(), 'ok')
217
218        a = notmuch()
219        self.addCleanup(a.close)
220        with self.assertWarns(DeprecationWarning) as cm:
221            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
222                asyncio.ensure_future(a)
223        self.assertEqual(cm.warnings[0].filename, __file__)
224
225        async def test():
226            return asyncio.ensure_future(notmuch())
227        t = self.loop.run_until_complete(test())
228        self.assertIs(t._loop, self.loop)
229        self.loop.run_until_complete(t)
230        self.assertTrue(t.done())
231        self.assertEqual(t.result(), 'ok')
232
233        # Deprecated in 3.10
234        asyncio.set_event_loop(self.loop)
235        self.addCleanup(asyncio.set_event_loop, None)
236        with self.assertWarns(DeprecationWarning) as cm:
237            t = asyncio.ensure_future(notmuch())
238        self.assertEqual(cm.warnings[0].filename, __file__)
239        self.assertIs(t._loop, self.loop)
240        self.loop.run_until_complete(t)
241        self.assertTrue(t.done())
242        self.assertEqual(t.result(), 'ok')
243
244    def test_ensure_future_coroutine_2(self):
245        with self.assertWarns(DeprecationWarning):
246            @asyncio.coroutine
247            def notmuch():
248                return 'ok'
249        t = asyncio.ensure_future(notmuch(), loop=self.loop)
250        self.assertIs(t._loop, self.loop)
251        self.loop.run_until_complete(t)
252        self.assertTrue(t.done())
253        self.assertEqual(t.result(), 'ok')
254
255        a = notmuch()
256        self.addCleanup(a.close)
257        with self.assertWarns(DeprecationWarning) as cm:
258            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
259                asyncio.ensure_future(a)
260        self.assertEqual(cm.warnings[0].filename, __file__)
261
262        async def test():
263            return asyncio.ensure_future(notmuch())
264        t = self.loop.run_until_complete(test())
265        self.assertIs(t._loop, self.loop)
266        self.loop.run_until_complete(t)
267        self.assertTrue(t.done())
268        self.assertEqual(t.result(), 'ok')
269
270        # Deprecated in 3.10
271        asyncio.set_event_loop(self.loop)
272        self.addCleanup(asyncio.set_event_loop, None)
273        with self.assertWarns(DeprecationWarning) as cm:
274            t = asyncio.ensure_future(notmuch())
275        self.assertEqual(cm.warnings[0].filename, __file__)
276        self.assertIs(t._loop, self.loop)
277        self.loop.run_until_complete(t)
278        self.assertTrue(t.done())
279        self.assertEqual(t.result(), 'ok')
280
281    def test_ensure_future_future(self):
282        f_orig = self.new_future(self.loop)
283        f_orig.set_result('ko')
284
285        f = asyncio.ensure_future(f_orig)
286        self.loop.run_until_complete(f)
287        self.assertTrue(f.done())
288        self.assertEqual(f.result(), 'ko')
289        self.assertIs(f, f_orig)
290
291        loop = asyncio.new_event_loop()
292        self.set_event_loop(loop)
293
294        with self.assertRaises(ValueError):
295            f = asyncio.ensure_future(f_orig, loop=loop)
296
297        loop.close()
298
299        f = asyncio.ensure_future(f_orig, loop=self.loop)
300        self.assertIs(f, f_orig)
301
302    def test_ensure_future_task(self):
303        async def notmuch():
304            return 'ok'
305        t_orig = self.new_task(self.loop, notmuch())
306        t = asyncio.ensure_future(t_orig)
307        self.loop.run_until_complete(t)
308        self.assertTrue(t.done())
309        self.assertEqual(t.result(), 'ok')
310        self.assertIs(t, t_orig)
311
312        loop = asyncio.new_event_loop()
313        self.set_event_loop(loop)
314
315        with self.assertRaises(ValueError):
316            t = asyncio.ensure_future(t_orig, loop=loop)
317
318        loop.close()
319
320        t = asyncio.ensure_future(t_orig, loop=self.loop)
321        self.assertIs(t, t_orig)
322
323    def test_ensure_future_awaitable(self):
324        class Aw:
325            def __init__(self, coro):
326                self.coro = coro
327            def __await__(self):
328                return (yield from self.coro)
329
330        with self.assertWarns(DeprecationWarning):
331            @asyncio.coroutine
332            def coro():
333                return 'ok'
334
335        loop = asyncio.new_event_loop()
336        self.set_event_loop(loop)
337        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
338        loop.run_until_complete(fut)
339        self.assertEqual(fut.result(), 'ok')
340
341    def test_ensure_future_neither(self):
342        with self.assertRaises(TypeError):
343            asyncio.ensure_future('ok')
344
345    def test_ensure_future_error_msg(self):
346        loop = asyncio.new_event_loop()
347        f = self.new_future(self.loop)
348        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
349                                    'different loop than the one specified as '
350                                    'the loop argument'):
351            asyncio.ensure_future(f, loop=loop)
352        loop.close()
353
354    def test_get_stack(self):
355        T = None
356
357        async def foo():
358            await bar()
359
360        async def bar():
361            # test get_stack()
362            f = T.get_stack(limit=1)
363            try:
364                self.assertEqual(f[0].f_code.co_name, 'foo')
365            finally:
366                f = None
367
368            # test print_stack()
369            file = io.StringIO()
370            T.print_stack(limit=1, file=file)
371            file.seek(0)
372            tb = file.read()
373            self.assertRegex(tb, r'foo\(\) running')
374
375        async def runner():
376            nonlocal T
377            T = asyncio.ensure_future(foo(), loop=self.loop)
378            await T
379
380        self.loop.run_until_complete(runner())
381
382    def test_task_repr(self):
383        self.loop.set_debug(False)
384
385        async def notmuch():
386            return 'abc'
387
388        # test coroutine function
389        self.assertEqual(notmuch.__name__, 'notmuch')
390        self.assertRegex(notmuch.__qualname__,
391                         r'\w+.test_task_repr.<locals>.notmuch')
392        self.assertEqual(notmuch.__module__, __name__)
393
394        filename, lineno = test_utils.get_function_source(notmuch)
395        src = "%s:%s" % (filename, lineno)
396
397        # test coroutine object
398        gen = notmuch()
399        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
400        self.assertEqual(gen.__name__, 'notmuch')
401        self.assertEqual(gen.__qualname__, coro_qualname)
402
403        # test pending Task
404        t = self.new_task(self.loop, gen)
405        t.add_done_callback(Dummy())
406
407        coro = format_coroutine(coro_qualname, 'running', src,
408                                t._source_traceback, generator=True)
409        self.assertEqual(repr(t),
410                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
411
412        # test cancelling Task
413        t.cancel()  # Does not take immediate effect!
414        self.assertEqual(repr(t),
415                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
416
417        # test cancelled Task
418        self.assertRaises(asyncio.CancelledError,
419                          self.loop.run_until_complete, t)
420        coro = format_coroutine(coro_qualname, 'done', src,
421                                t._source_traceback)
422        self.assertEqual(repr(t),
423                         "<Task cancelled name='TestTask' %s>" % coro)
424
425        # test finished Task
426        t = self.new_task(self.loop, notmuch())
427        self.loop.run_until_complete(t)
428        coro = format_coroutine(coro_qualname, 'done', src,
429                                t._source_traceback)
430        self.assertEqual(repr(t),
431                         "<Task finished name='TestTask' %s result='abc'>" % coro)
432
433    def test_task_repr_autogenerated(self):
434        async def notmuch():
435            return 123
436
437        t1 = self.new_task(self.loop, notmuch(), None)
438        t2 = self.new_task(self.loop, notmuch(), None)
439        self.assertNotEqual(repr(t1), repr(t2))
440
441        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
442        self.assertIsNotNone(match1)
443        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
444        self.assertIsNotNone(match2)
445
446        # Autogenerated task names should have monotonically increasing numbers
447        self.assertLess(int(match1.group(1)), int(match2.group(1)))
448        self.loop.run_until_complete(t1)
449        self.loop.run_until_complete(t2)
450
451    def test_task_repr_name_not_str(self):
452        async def notmuch():
453            return 123
454
455        t = self.new_task(self.loop, notmuch())
456        t.set_name({6})
457        self.assertEqual(t.get_name(), '{6}')
458        self.loop.run_until_complete(t)
459
460    def test_task_repr_coro_decorator(self):
461        self.loop.set_debug(False)
462
463        with self.assertWarns(DeprecationWarning):
464            @asyncio.coroutine
465            def notmuch():
466                # notmuch() function doesn't use yield from: it will be wrapped by
467                # @coroutine decorator
468                return 123
469
470        # test coroutine function
471        self.assertEqual(notmuch.__name__, 'notmuch')
472        self.assertRegex(notmuch.__qualname__,
473                         r'\w+.test_task_repr_coro_decorator'
474                         r'\.<locals>\.notmuch')
475        self.assertEqual(notmuch.__module__, __name__)
476
477        # test coroutine object
478        gen = notmuch()
479        # On Python >= 3.5, generators now inherit the name of the
480        # function, as expected, and have a qualified name (__qualname__
481        # attribute).
482        coro_name = 'notmuch'
483        coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
484                         '.<locals>.notmuch')
485        self.assertEqual(gen.__name__, coro_name)
486        self.assertEqual(gen.__qualname__, coro_qualname)
487
488        # test repr(CoroWrapper)
489        if coroutines._DEBUG:
490            # format the coroutine object
491            if coroutines._DEBUG:
492                filename, lineno = test_utils.get_function_source(notmuch)
493                frame = gen._source_traceback[-1]
494                coro = ('%s() running, defined at %s:%s, created at %s:%s'
495                        % (coro_qualname, filename, lineno,
496                           frame[0], frame[1]))
497            else:
498                code = gen.gi_code
499                coro = ('%s() running at %s:%s'
500                        % (coro_qualname, code.co_filename,
501                           code.co_firstlineno))
502
503            self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
504
505        # test pending Task
506        t = self.new_task(self.loop, gen)
507        t.add_done_callback(Dummy())
508
509        # format the coroutine object
510        if coroutines._DEBUG:
511            src = '%s:%s' % test_utils.get_function_source(notmuch)
512        else:
513            code = gen.gi_code
514            src = '%s:%s' % (code.co_filename, code.co_firstlineno)
515        coro = format_coroutine(coro_qualname, 'running', src,
516                                t._source_traceback,
517                                generator=not coroutines._DEBUG)
518        self.assertEqual(repr(t),
519                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
520        self.loop.run_until_complete(t)
521
522    def test_task_repr_wait_for(self):
523        self.loop.set_debug(False)
524
525        async def wait_for(fut):
526            return await fut
527
528        fut = self.new_future(self.loop)
529        task = self.new_task(self.loop, wait_for(fut))
530        test_utils.run_briefly(self.loop)
531        self.assertRegex(repr(task),
532                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
533
534        fut.set_result(None)
535        self.loop.run_until_complete(task)
536
537    def test_task_repr_partial_corowrapper(self):
538        # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
539        # coroutine is a partial function
540        with set_coroutine_debug(True):
541            self.loop.set_debug(True)
542
543            async def func(x, y):
544                await asyncio.sleep(0)
545
546            with self.assertWarns(DeprecationWarning):
547                partial_func = asyncio.coroutine(functools.partial(func, 1))
548            task = self.loop.create_task(partial_func(2))
549
550            # make warnings quiet
551            task._log_destroy_pending = False
552            self.addCleanup(task._coro.close)
553
554        coro_repr = repr(task._coro)
555        expected = (
556            r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
557            r'\.<locals>\.func at'
558        )
559        self.assertRegex(coro_repr, expected)
560
561    def test_task_basics(self):
562
563        async def outer():
564            a = await inner1()
565            b = await inner2()
566            return a+b
567
568        async def inner1():
569            return 42
570
571        async def inner2():
572            return 1000
573
574        t = outer()
575        self.assertEqual(self.loop.run_until_complete(t), 1042)
576
577    def test_exception_chaining_after_await(self):
578        # Test that when awaiting on a task when an exception is already
579        # active, if the task raises an exception it will be chained
580        # with the original.
581        loop = asyncio.new_event_loop()
582        self.set_event_loop(loop)
583
584        async def raise_error():
585            raise ValueError
586
587        async def run():
588            try:
589                raise KeyError(3)
590            except Exception as exc:
591                task = self.new_task(loop, raise_error())
592                try:
593                    await task
594                except Exception as exc:
595                    self.assertEqual(type(exc), ValueError)
596                    chained = exc.__context__
597                    self.assertEqual((type(chained), chained.args),
598                        (KeyError, (3,)))
599
600        try:
601            task = self.new_task(loop, run())
602            loop.run_until_complete(task)
603        finally:
604            loop.close()
605
606    def test_exception_chaining_after_await_with_context_cycle(self):
607        # Check trying to create an exception context cycle:
608        # https://bugs.python.org/issue40696
609        has_cycle = None
610        loop = asyncio.new_event_loop()
611        self.set_event_loop(loop)
612
613        async def process_exc(exc):
614            raise exc
615
616        async def run():
617            nonlocal has_cycle
618            try:
619                raise KeyError('a')
620            except Exception as exc:
621                task = self.new_task(loop, process_exc(exc))
622                try:
623                    await task
624                except BaseException as exc:
625                    has_cycle = (exc is exc.__context__)
626                    # Prevent a hang if has_cycle is True.
627                    exc.__context__ = None
628
629        try:
630            task = self.new_task(loop, run())
631            loop.run_until_complete(task)
632        finally:
633            loop.close()
634        # This also distinguishes from the initial has_cycle=None.
635        self.assertEqual(has_cycle, False)
636
637    def test_cancel(self):
638
639        def gen():
640            when = yield
641            self.assertAlmostEqual(10.0, when)
642            yield 0
643
644        loop = self.new_test_loop(gen)
645
646        async def task():
647            await asyncio.sleep(10.0)
648            return 12
649
650        t = self.new_task(loop, task())
651        loop.call_soon(t.cancel)
652        with self.assertRaises(asyncio.CancelledError):
653            loop.run_until_complete(t)
654        self.assertTrue(t.done())
655        self.assertTrue(t.cancelled())
656        self.assertFalse(t.cancel())
657
658    def test_cancel_with_message_then_future_result(self):
659        # Test Future.result() after calling cancel() with a message.
660        cases = [
661            ((), ()),
662            ((None,), ()),
663            (('my message',), ('my message',)),
664            # Non-string values should roundtrip.
665            ((5,), (5,)),
666        ]
667        for cancel_args, expected_args in cases:
668            with self.subTest(cancel_args=cancel_args):
669                loop = asyncio.new_event_loop()
670                self.set_event_loop(loop)
671
672                async def sleep():
673                    await asyncio.sleep(10)
674
675                async def coro():
676                    task = self.new_task(loop, sleep())
677                    await asyncio.sleep(0)
678                    task.cancel(*cancel_args)
679                    done, pending = await asyncio.wait([task])
680                    task.result()
681
682                task = self.new_task(loop, coro())
683                with self.assertRaises(asyncio.CancelledError) as cm:
684                    loop.run_until_complete(task)
685                exc = cm.exception
686                self.assertEqual(exc.args, ())
687
688                actual = get_innermost_context(exc)
689                self.assertEqual(actual,
690                    (asyncio.CancelledError, expected_args, 2))
691
692    def test_cancel_with_message_then_future_exception(self):
693        # Test Future.exception() after calling cancel() with a message.
694        cases = [
695            ((), ()),
696            ((None,), ()),
697            (('my message',), ('my message',)),
698            # Non-string values should roundtrip.
699            ((5,), (5,)),
700        ]
701        for cancel_args, expected_args in cases:
702            with self.subTest(cancel_args=cancel_args):
703                loop = asyncio.new_event_loop()
704                self.set_event_loop(loop)
705
706                async def sleep():
707                    await asyncio.sleep(10)
708
709                async def coro():
710                    task = self.new_task(loop, sleep())
711                    await asyncio.sleep(0)
712                    task.cancel(*cancel_args)
713                    done, pending = await asyncio.wait([task])
714                    task.exception()
715
716                task = self.new_task(loop, coro())
717                with self.assertRaises(asyncio.CancelledError) as cm:
718                    loop.run_until_complete(task)
719                exc = cm.exception
720                self.assertEqual(exc.args, ())
721
722                actual = get_innermost_context(exc)
723                self.assertEqual(actual,
724                    (asyncio.CancelledError, expected_args, 2))
725
726    def test_cancel_with_message_before_starting_task(self):
727        loop = asyncio.new_event_loop()
728        self.set_event_loop(loop)
729
730        async def sleep():
731            await asyncio.sleep(10)
732
733        async def coro():
734            task = self.new_task(loop, sleep())
735            # We deliberately leave out the sleep here.
736            task.cancel('my message')
737            done, pending = await asyncio.wait([task])
738            task.exception()
739
740        task = self.new_task(loop, coro())
741        with self.assertRaises(asyncio.CancelledError) as cm:
742            loop.run_until_complete(task)
743        exc = cm.exception
744        self.assertEqual(exc.args, ())
745
746        actual = get_innermost_context(exc)
747        self.assertEqual(actual,
748            (asyncio.CancelledError, ('my message',), 2))
749
750    def test_cancel_yield(self):
751        with self.assertWarns(DeprecationWarning):
752            @asyncio.coroutine
753            def task():
754                yield
755                yield
756                return 12
757
758        t = self.new_task(self.loop, task())
759        test_utils.run_briefly(self.loop)  # start coro
760        t.cancel()
761        self.assertRaises(
762            asyncio.CancelledError, self.loop.run_until_complete, t)
763        self.assertTrue(t.done())
764        self.assertTrue(t.cancelled())
765        self.assertFalse(t.cancel())
766
767    def test_cancel_inner_future(self):
768        f = self.new_future(self.loop)
769
770        async def task():
771            await f
772            return 12
773
774        t = self.new_task(self.loop, task())
775        test_utils.run_briefly(self.loop)  # start task
776        f.cancel()
777        with self.assertRaises(asyncio.CancelledError):
778            self.loop.run_until_complete(t)
779        self.assertTrue(f.cancelled())
780        self.assertTrue(t.cancelled())
781
782    def test_cancel_both_task_and_inner_future(self):
783        f = self.new_future(self.loop)
784
785        async def task():
786            await f
787            return 12
788
789        t = self.new_task(self.loop, task())
790        test_utils.run_briefly(self.loop)
791
792        f.cancel()
793        t.cancel()
794
795        with self.assertRaises(asyncio.CancelledError):
796            self.loop.run_until_complete(t)
797
798        self.assertTrue(t.done())
799        self.assertTrue(f.cancelled())
800        self.assertTrue(t.cancelled())
801
802    def test_cancel_task_catching(self):
803        fut1 = self.new_future(self.loop)
804        fut2 = self.new_future(self.loop)
805
806        async def task():
807            await fut1
808            try:
809                await fut2
810            except asyncio.CancelledError:
811                return 42
812
813        t = self.new_task(self.loop, task())
814        test_utils.run_briefly(self.loop)
815        self.assertIs(t._fut_waiter, fut1)  # White-box test.
816        fut1.set_result(None)
817        test_utils.run_briefly(self.loop)
818        self.assertIs(t._fut_waiter, fut2)  # White-box test.
819        t.cancel()
820        self.assertTrue(fut2.cancelled())
821        res = self.loop.run_until_complete(t)
822        self.assertEqual(res, 42)
823        self.assertFalse(t.cancelled())
824
825    def test_cancel_task_ignoring(self):
826        fut1 = self.new_future(self.loop)
827        fut2 = self.new_future(self.loop)
828        fut3 = self.new_future(self.loop)
829
830        async def task():
831            await fut1
832            try:
833                await fut2
834            except asyncio.CancelledError:
835                pass
836            res = await fut3
837            return res
838
839        t = self.new_task(self.loop, task())
840        test_utils.run_briefly(self.loop)
841        self.assertIs(t._fut_waiter, fut1)  # White-box test.
842        fut1.set_result(None)
843        test_utils.run_briefly(self.loop)
844        self.assertIs(t._fut_waiter, fut2)  # White-box test.
845        t.cancel()
846        self.assertTrue(fut2.cancelled())
847        test_utils.run_briefly(self.loop)
848        self.assertIs(t._fut_waiter, fut3)  # White-box test.
849        fut3.set_result(42)
850        res = self.loop.run_until_complete(t)
851        self.assertEqual(res, 42)
852        self.assertFalse(fut3.cancelled())
853        self.assertFalse(t.cancelled())
854
855    def test_cancel_current_task(self):
856        loop = asyncio.new_event_loop()
857        self.set_event_loop(loop)
858
859        async def task():
860            t.cancel()
861            self.assertTrue(t._must_cancel)  # White-box test.
862            # The sleep should be cancelled immediately.
863            await asyncio.sleep(100)
864            return 12
865
866        t = self.new_task(loop, task())
867        self.assertFalse(t.cancelled())
868        self.assertRaises(
869            asyncio.CancelledError, loop.run_until_complete, t)
870        self.assertTrue(t.done())
871        self.assertTrue(t.cancelled())
872        self.assertFalse(t._must_cancel)  # White-box test.
873        self.assertFalse(t.cancel())
874
875    def test_cancel_at_end(self):
876        """coroutine end right after task is cancelled"""
877        loop = asyncio.new_event_loop()
878        self.set_event_loop(loop)
879
880        async def task():
881            t.cancel()
882            self.assertTrue(t._must_cancel)  # White-box test.
883            return 12
884
885        t = self.new_task(loop, task())
886        self.assertFalse(t.cancelled())
887        self.assertRaises(
888            asyncio.CancelledError, loop.run_until_complete, t)
889        self.assertTrue(t.done())
890        self.assertTrue(t.cancelled())
891        self.assertFalse(t._must_cancel)  # White-box test.
892        self.assertFalse(t.cancel())
893
894    def test_cancel_awaited_task(self):
895        # This tests for a relatively rare condition when
896        # a task cancellation is requested for a task which is not
897        # currently blocked, such as a task cancelling itself.
898        # In this situation we must ensure that whatever next future
899        # or task the cancelled task blocks on is cancelled correctly
900        # as well.  See also bpo-34872.
901        loop = asyncio.new_event_loop()
902        self.addCleanup(lambda: loop.close())
903
904        task = nested_task = None
905        fut = self.new_future(loop)
906
907        async def nested():
908            await fut
909
910        async def coro():
911            nonlocal nested_task
912            # Create a sub-task and wait for it to run.
913            nested_task = self.new_task(loop, nested())
914            await asyncio.sleep(0)
915
916            # Request the current task to be cancelled.
917            task.cancel()
918            # Block on the nested task, which should be immediately
919            # cancelled.
920            await nested_task
921
922        task = self.new_task(loop, coro())
923        with self.assertRaises(asyncio.CancelledError):
924            loop.run_until_complete(task)
925
926        self.assertTrue(task.cancelled())
927        self.assertTrue(nested_task.cancelled())
928        self.assertTrue(fut.cancelled())
929
930    def assert_text_contains(self, text, substr):
931        if substr not in text:
932            raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
933
934    def test_cancel_traceback_for_future_result(self):
935        # When calling Future.result() on a cancelled task, check that the
936        # line of code that was interrupted is included in the traceback.
937        loop = asyncio.new_event_loop()
938        self.set_event_loop(loop)
939
940        async def nested():
941            # This will get cancelled immediately.
942            await asyncio.sleep(10)
943
944        async def coro():
945            task = self.new_task(loop, nested())
946            await asyncio.sleep(0)
947            task.cancel()
948            await task  # search target
949
950        task = self.new_task(loop, coro())
951        try:
952            loop.run_until_complete(task)
953        except asyncio.CancelledError:
954            tb = traceback.format_exc()
955            self.assert_text_contains(tb, "await asyncio.sleep(10)")
956            # The intermediate await should also be included.
957            self.assert_text_contains(tb, "await task  # search target")
958        else:
959            self.fail('CancelledError did not occur')
960
961    def test_cancel_traceback_for_future_exception(self):
962        # When calling Future.exception() on a cancelled task, check that the
963        # line of code that was interrupted is included in the traceback.
964        loop = asyncio.new_event_loop()
965        self.set_event_loop(loop)
966
967        async def nested():
968            # This will get cancelled immediately.
969            await asyncio.sleep(10)
970
971        async def coro():
972            task = self.new_task(loop, nested())
973            await asyncio.sleep(0)
974            task.cancel()
975            done, pending = await asyncio.wait([task])
976            task.exception()  # search target
977
978        task = self.new_task(loop, coro())
979        try:
980            loop.run_until_complete(task)
981        except asyncio.CancelledError:
982            tb = traceback.format_exc()
983            self.assert_text_contains(tb, "await asyncio.sleep(10)")
984            # The intermediate await should also be included.
985            self.assert_text_contains(tb,
986                "task.exception()  # search target")
987        else:
988            self.fail('CancelledError did not occur')
989
990    def test_stop_while_run_in_complete(self):
991
992        def gen():
993            when = yield
994            self.assertAlmostEqual(0.1, when)
995            when = yield 0.1
996            self.assertAlmostEqual(0.2, when)
997            when = yield 0.1
998            self.assertAlmostEqual(0.3, when)
999            yield 0.1
1000
1001        loop = self.new_test_loop(gen)
1002
1003        x = 0
1004
1005        async def task():
1006            nonlocal x
1007            while x < 10:
1008                await asyncio.sleep(0.1)
1009                x += 1
1010                if x == 2:
1011                    loop.stop()
1012
1013        t = self.new_task(loop, task())
1014        with self.assertRaises(RuntimeError) as cm:
1015            loop.run_until_complete(t)
1016        self.assertEqual(str(cm.exception),
1017                         'Event loop stopped before Future completed.')
1018        self.assertFalse(t.done())
1019        self.assertEqual(x, 2)
1020        self.assertAlmostEqual(0.3, loop.time())
1021
1022        t.cancel()
1023        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
1024
1025    def test_log_traceback(self):
1026        async def coro():
1027            pass
1028
1029        task = self.new_task(self.loop, coro())
1030        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
1031            task._log_traceback = True
1032        self.loop.run_until_complete(task)
1033
1034    def test_wait_for_timeout_less_then_0_or_0_future_done(self):
1035        def gen():
1036            when = yield
1037            self.assertAlmostEqual(0, when)
1038
1039        loop = self.new_test_loop(gen)
1040
1041        fut = self.new_future(loop)
1042        fut.set_result('done')
1043
1044        ret = loop.run_until_complete(asyncio.wait_for(fut, 0))
1045
1046        self.assertEqual(ret, 'done')
1047        self.assertTrue(fut.done())
1048        self.assertAlmostEqual(0, loop.time())
1049
1050    def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
1051        def gen():
1052            when = yield
1053            self.assertAlmostEqual(0, when)
1054
1055        loop = self.new_test_loop(gen)
1056
1057        foo_started = False
1058
1059        async def foo():
1060            nonlocal foo_started
1061            foo_started = True
1062
1063        with self.assertRaises(asyncio.TimeoutError):
1064            loop.run_until_complete(asyncio.wait_for(foo(), 0))
1065
1066        self.assertAlmostEqual(0, loop.time())
1067        self.assertEqual(foo_started, False)
1068
1069    def test_wait_for_timeout_less_then_0_or_0(self):
1070        def gen():
1071            when = yield
1072            self.assertAlmostEqual(0.2, when)
1073            when = yield 0
1074            self.assertAlmostEqual(0, when)
1075
1076        for timeout in [0, -1]:
1077            with self.subTest(timeout=timeout):
1078                loop = self.new_test_loop(gen)
1079
1080                foo_running = None
1081
1082                async def foo():
1083                    nonlocal foo_running
1084                    foo_running = True
1085                    try:
1086                        await asyncio.sleep(0.2)
1087                    finally:
1088                        foo_running = False
1089                    return 'done'
1090
1091                fut = self.new_task(loop, foo())
1092
1093                with self.assertRaises(asyncio.TimeoutError):
1094                    loop.run_until_complete(asyncio.wait_for(fut, timeout))
1095                self.assertTrue(fut.done())
1096                # it should have been cancelled due to the timeout
1097                self.assertTrue(fut.cancelled())
1098                self.assertAlmostEqual(0, loop.time())
1099                self.assertEqual(foo_running, False)
1100
1101    def test_wait_for(self):
1102
1103        def gen():
1104            when = yield
1105            self.assertAlmostEqual(0.2, when)
1106            when = yield 0
1107            self.assertAlmostEqual(0.1, when)
1108            when = yield 0.1
1109
1110        loop = self.new_test_loop(gen)
1111
1112        foo_running = None
1113
1114        async def foo():
1115            nonlocal foo_running
1116            foo_running = True
1117            try:
1118                await asyncio.sleep(0.2)
1119            finally:
1120                foo_running = False
1121            return 'done'
1122
1123        fut = self.new_task(loop, foo())
1124
1125        with self.assertRaises(asyncio.TimeoutError):
1126            loop.run_until_complete(asyncio.wait_for(fut, 0.1))
1127        self.assertTrue(fut.done())
1128        # it should have been cancelled due to the timeout
1129        self.assertTrue(fut.cancelled())
1130        self.assertAlmostEqual(0.1, loop.time())
1131        self.assertEqual(foo_running, False)
1132
1133    def test_wait_for_blocking(self):
1134        loop = self.new_test_loop()
1135
1136        async def coro():
1137            return 'done'
1138
1139        res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
1140        self.assertEqual(res, 'done')
1141
1142    def test_wait_for_race_condition(self):
1143
1144        def gen():
1145            yield 0.1
1146            yield 0.1
1147            yield 0.1
1148
1149        loop = self.new_test_loop(gen)
1150
1151        fut = self.new_future(loop)
1152        task = asyncio.wait_for(fut, timeout=0.2)
1153        loop.call_later(0.1, fut.set_result, "ok")
1154        res = loop.run_until_complete(task)
1155        self.assertEqual(res, "ok")
1156
1157    def test_wait_for_cancellation_race_condition(self):
1158        async def inner():
1159            with contextlib.suppress(asyncio.CancelledError):
1160                await asyncio.sleep(1)
1161            return 1
1162
1163        async def main():
1164            result = await asyncio.wait_for(inner(), timeout=.01)
1165            self.assertEqual(result, 1)
1166
1167        asyncio.run(main())
1168
1169    def test_wait_for_waits_for_task_cancellation(self):
1170        loop = asyncio.new_event_loop()
1171        self.addCleanup(loop.close)
1172
1173        task_done = False
1174
1175        async def foo():
1176            async def inner():
1177                nonlocal task_done
1178                try:
1179                    await asyncio.sleep(0.2)
1180                except asyncio.CancelledError:
1181                    await asyncio.sleep(_EPSILON)
1182                    raise
1183                finally:
1184                    task_done = True
1185
1186            inner_task = self.new_task(loop, inner())
1187
1188            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1189
1190        with self.assertRaises(asyncio.TimeoutError) as cm:
1191            loop.run_until_complete(foo())
1192
1193        self.assertTrue(task_done)
1194        chained = cm.exception.__context__
1195        self.assertEqual(type(chained), asyncio.CancelledError)
1196
1197    def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
1198        loop = asyncio.new_event_loop()
1199        self.addCleanup(loop.close)
1200
1201        task_done = False
1202
1203        async def foo():
1204            async def inner():
1205                nonlocal task_done
1206                try:
1207                    await asyncio.sleep(10)
1208                except asyncio.CancelledError:
1209                    await asyncio.sleep(_EPSILON)
1210                    raise
1211                finally:
1212                    task_done = True
1213
1214            inner_task = self.new_task(loop, inner())
1215            await asyncio.sleep(_EPSILON)
1216            await asyncio.wait_for(inner_task, timeout=0)
1217
1218        with self.assertRaises(asyncio.TimeoutError) as cm:
1219            loop.run_until_complete(foo())
1220
1221        self.assertTrue(task_done)
1222        chained = cm.exception.__context__
1223        self.assertEqual(type(chained), asyncio.CancelledError)
1224
1225    def test_wait_for_reraises_exception_during_cancellation(self):
1226        loop = asyncio.new_event_loop()
1227        self.addCleanup(loop.close)
1228
1229        class FooException(Exception):
1230            pass
1231
1232        async def foo():
1233            async def inner():
1234                try:
1235                    await asyncio.sleep(0.2)
1236                finally:
1237                    raise FooException
1238
1239            inner_task = self.new_task(loop, inner())
1240
1241            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1242
1243        with self.assertRaises(FooException):
1244            loop.run_until_complete(foo())
1245
1246    def test_wait_for_self_cancellation(self):
1247        loop = asyncio.new_event_loop()
1248        self.addCleanup(loop.close)
1249
1250        async def foo():
1251            async def inner():
1252                try:
1253                    await asyncio.sleep(0.3)
1254                except asyncio.CancelledError:
1255                    try:
1256                        await asyncio.sleep(0.3)
1257                    except asyncio.CancelledError:
1258                        await asyncio.sleep(0.3)
1259
1260                return 42
1261
1262            inner_task = self.new_task(loop, inner())
1263
1264            wait = asyncio.wait_for(inner_task, timeout=0.1)
1265
1266            # Test that wait_for itself is properly cancellable
1267            # even when the initial task holds up the initial cancellation.
1268            task = self.new_task(loop, wait)
1269            await asyncio.sleep(0.2)
1270            task.cancel()
1271
1272            with self.assertRaises(asyncio.CancelledError):
1273                await task
1274
1275            self.assertEqual(await inner_task, 42)
1276
1277        loop.run_until_complete(foo())
1278
1279    def test_wait(self):
1280
1281        def gen():
1282            when = yield
1283            self.assertAlmostEqual(0.1, when)
1284            when = yield 0
1285            self.assertAlmostEqual(0.15, when)
1286            yield 0.15
1287
1288        loop = self.new_test_loop(gen)
1289
1290        a = self.new_task(loop, asyncio.sleep(0.1))
1291        b = self.new_task(loop, asyncio.sleep(0.15))
1292
1293        async def foo():
1294            done, pending = await asyncio.wait([b, a])
1295            self.assertEqual(done, set([a, b]))
1296            self.assertEqual(pending, set())
1297            return 42
1298
1299        res = loop.run_until_complete(self.new_task(loop, foo()))
1300        self.assertEqual(res, 42)
1301        self.assertAlmostEqual(0.15, loop.time())
1302
1303        # Doing it again should take no time and exercise a different path.
1304        res = loop.run_until_complete(self.new_task(loop, foo()))
1305        self.assertAlmostEqual(0.15, loop.time())
1306        self.assertEqual(res, 42)
1307
1308    def test_wait_duplicate_coroutines(self):
1309
1310        with self.assertWarns(DeprecationWarning):
1311            @asyncio.coroutine
1312            def coro(s):
1313                return s
1314        c = coro('test')
1315        task = self.new_task(
1316            self.loop,
1317            asyncio.wait([c, c, coro('spam')]))
1318
1319        with self.assertWarns(DeprecationWarning):
1320            done, pending = self.loop.run_until_complete(task)
1321
1322        self.assertFalse(pending)
1323        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1324
1325    def test_wait_errors(self):
1326        self.assertRaises(
1327            ValueError, self.loop.run_until_complete,
1328            asyncio.wait(set()))
1329
1330        # -1 is an invalid return_when value
1331        sleep_coro = asyncio.sleep(10.0)
1332        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1333        self.assertRaises(ValueError,
1334                          self.loop.run_until_complete, wait_coro)
1335
1336        sleep_coro.close()
1337
1338    def test_wait_first_completed(self):
1339
1340        def gen():
1341            when = yield
1342            self.assertAlmostEqual(10.0, when)
1343            when = yield 0
1344            self.assertAlmostEqual(0.1, when)
1345            yield 0.1
1346
1347        loop = self.new_test_loop(gen)
1348
1349        a = self.new_task(loop, asyncio.sleep(10.0))
1350        b = self.new_task(loop, asyncio.sleep(0.1))
1351        task = self.new_task(
1352            loop,
1353            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1354
1355        done, pending = loop.run_until_complete(task)
1356        self.assertEqual({b}, done)
1357        self.assertEqual({a}, pending)
1358        self.assertFalse(a.done())
1359        self.assertTrue(b.done())
1360        self.assertIsNone(b.result())
1361        self.assertAlmostEqual(0.1, loop.time())
1362
1363        # move forward to close generator
1364        loop.advance_time(10)
1365        loop.run_until_complete(asyncio.wait([a, b]))
1366
1367    def test_wait_really_done(self):
1368        # there is possibility that some tasks in the pending list
1369        # became done but their callbacks haven't all been called yet
1370
1371        async def coro1():
1372            await asyncio.sleep(0)
1373
1374        async def coro2():
1375            await asyncio.sleep(0)
1376            await asyncio.sleep(0)
1377
1378        a = self.new_task(self.loop, coro1())
1379        b = self.new_task(self.loop, coro2())
1380        task = self.new_task(
1381            self.loop,
1382            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1383
1384        done, pending = self.loop.run_until_complete(task)
1385        self.assertEqual({a, b}, done)
1386        self.assertTrue(a.done())
1387        self.assertIsNone(a.result())
1388        self.assertTrue(b.done())
1389        self.assertIsNone(b.result())
1390
1391    def test_wait_first_exception(self):
1392
1393        def gen():
1394            when = yield
1395            self.assertAlmostEqual(10.0, when)
1396            yield 0
1397
1398        loop = self.new_test_loop(gen)
1399
1400        # first_exception, task already has exception
1401        a = self.new_task(loop, asyncio.sleep(10.0))
1402
1403        async def exc():
1404            raise ZeroDivisionError('err')
1405
1406        b = self.new_task(loop, exc())
1407        task = self.new_task(
1408            loop,
1409            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1410
1411        done, pending = loop.run_until_complete(task)
1412        self.assertEqual({b}, done)
1413        self.assertEqual({a}, pending)
1414        self.assertAlmostEqual(0, loop.time())
1415
1416        # move forward to close generator
1417        loop.advance_time(10)
1418        loop.run_until_complete(asyncio.wait([a, b]))
1419
1420    def test_wait_first_exception_in_wait(self):
1421
1422        def gen():
1423            when = yield
1424            self.assertAlmostEqual(10.0, when)
1425            when = yield 0
1426            self.assertAlmostEqual(0.01, when)
1427            yield 0.01
1428
1429        loop = self.new_test_loop(gen)
1430
1431        # first_exception, exception during waiting
1432        a = self.new_task(loop, asyncio.sleep(10.0))
1433
1434        async def exc():
1435            await asyncio.sleep(0.01)
1436            raise ZeroDivisionError('err')
1437
1438        b = self.new_task(loop, exc())
1439        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1440
1441        done, pending = loop.run_until_complete(task)
1442        self.assertEqual({b}, done)
1443        self.assertEqual({a}, pending)
1444        self.assertAlmostEqual(0.01, loop.time())
1445
1446        # move forward to close generator
1447        loop.advance_time(10)
1448        loop.run_until_complete(asyncio.wait([a, b]))
1449
1450    def test_wait_with_exception(self):
1451
1452        def gen():
1453            when = yield
1454            self.assertAlmostEqual(0.1, when)
1455            when = yield 0
1456            self.assertAlmostEqual(0.15, when)
1457            yield 0.15
1458
1459        loop = self.new_test_loop(gen)
1460
1461        a = self.new_task(loop, asyncio.sleep(0.1))
1462
1463        async def sleeper():
1464            await asyncio.sleep(0.15)
1465            raise ZeroDivisionError('really')
1466
1467        b = self.new_task(loop, sleeper())
1468
1469        async def foo():
1470            done, pending = await asyncio.wait([b, a])
1471            self.assertEqual(len(done), 2)
1472            self.assertEqual(pending, set())
1473            errors = set(f for f in done if f.exception() is not None)
1474            self.assertEqual(len(errors), 1)
1475
1476        loop.run_until_complete(self.new_task(loop, foo()))
1477        self.assertAlmostEqual(0.15, loop.time())
1478
1479        loop.run_until_complete(self.new_task(loop, foo()))
1480        self.assertAlmostEqual(0.15, loop.time())
1481
1482    def test_wait_with_timeout(self):
1483
1484        def gen():
1485            when = yield
1486            self.assertAlmostEqual(0.1, when)
1487            when = yield 0
1488            self.assertAlmostEqual(0.15, when)
1489            when = yield 0
1490            self.assertAlmostEqual(0.11, when)
1491            yield 0.11
1492
1493        loop = self.new_test_loop(gen)
1494
1495        a = self.new_task(loop, asyncio.sleep(0.1))
1496        b = self.new_task(loop, asyncio.sleep(0.15))
1497
1498        async def foo():
1499            done, pending = await asyncio.wait([b, a], timeout=0.11)
1500            self.assertEqual(done, set([a]))
1501            self.assertEqual(pending, set([b]))
1502
1503        loop.run_until_complete(self.new_task(loop, foo()))
1504        self.assertAlmostEqual(0.11, loop.time())
1505
1506        # move forward to close generator
1507        loop.advance_time(10)
1508        loop.run_until_complete(asyncio.wait([a, b]))
1509
1510    def test_wait_concurrent_complete(self):
1511
1512        def gen():
1513            when = yield
1514            self.assertAlmostEqual(0.1, when)
1515            when = yield 0
1516            self.assertAlmostEqual(0.15, when)
1517            when = yield 0
1518            self.assertAlmostEqual(0.1, when)
1519            yield 0.1
1520
1521        loop = self.new_test_loop(gen)
1522
1523        a = self.new_task(loop, asyncio.sleep(0.1))
1524        b = self.new_task(loop, asyncio.sleep(0.15))
1525
1526        done, pending = loop.run_until_complete(
1527            asyncio.wait([b, a], timeout=0.1))
1528
1529        self.assertEqual(done, set([a]))
1530        self.assertEqual(pending, set([b]))
1531        self.assertAlmostEqual(0.1, loop.time())
1532
1533        # move forward to close generator
1534        loop.advance_time(10)
1535        loop.run_until_complete(asyncio.wait([a, b]))
1536
1537    def test_wait_with_iterator_of_tasks(self):
1538
1539        def gen():
1540            when = yield
1541            self.assertAlmostEqual(0.1, when)
1542            when = yield 0
1543            self.assertAlmostEqual(0.15, when)
1544            yield 0.15
1545
1546        loop = self.new_test_loop(gen)
1547
1548        a = self.new_task(loop, asyncio.sleep(0.1))
1549        b = self.new_task(loop, asyncio.sleep(0.15))
1550
1551        async def foo():
1552            done, pending = await asyncio.wait(iter([b, a]))
1553            self.assertEqual(done, set([a, b]))
1554            self.assertEqual(pending, set())
1555            return 42
1556
1557        res = loop.run_until_complete(self.new_task(loop, foo()))
1558        self.assertEqual(res, 42)
1559        self.assertAlmostEqual(0.15, loop.time())
1560
1561    def test_as_completed(self):
1562
1563        def gen():
1564            yield 0
1565            yield 0
1566            yield 0.01
1567            yield 0
1568
1569        loop = self.new_test_loop(gen)
1570        # disable "slow callback" warning
1571        loop.slow_callback_duration = 1.0
1572        completed = set()
1573        time_shifted = False
1574
1575        with self.assertWarns(DeprecationWarning):
1576            @asyncio.coroutine
1577            def sleeper(dt, x):
1578                nonlocal time_shifted
1579                yield from  asyncio.sleep(dt)
1580                completed.add(x)
1581                if not time_shifted and 'a' in completed and 'b' in completed:
1582                    time_shifted = True
1583                    loop.advance_time(0.14)
1584                return x
1585
1586        a = sleeper(0.01, 'a')
1587        b = sleeper(0.01, 'b')
1588        c = sleeper(0.15, 'c')
1589
1590        async def foo():
1591            values = []
1592            for f in asyncio.as_completed([b, c, a]):
1593                values.append(await f)
1594            return values
1595
1596        res = loop.run_until_complete(self.new_task(loop, foo()))
1597        self.assertAlmostEqual(0.15, loop.time())
1598        self.assertTrue('a' in res[:2])
1599        self.assertTrue('b' in res[:2])
1600        self.assertEqual(res[2], 'c')
1601
1602        # Doing it again should take no time and exercise a different path.
1603        res = loop.run_until_complete(self.new_task(loop, foo()))
1604        self.assertAlmostEqual(0.15, loop.time())
1605
1606    def test_as_completed_with_timeout(self):
1607
1608        def gen():
1609            yield
1610            yield 0
1611            yield 0
1612            yield 0.1
1613
1614        loop = self.new_test_loop(gen)
1615
1616        a = loop.create_task(asyncio.sleep(0.1, 'a'))
1617        b = loop.create_task(asyncio.sleep(0.15, 'b'))
1618
1619        async def foo():
1620            values = []
1621            for f in asyncio.as_completed([a, b], timeout=0.12):
1622                if values:
1623                    loop.advance_time(0.02)
1624                try:
1625                    v = await f
1626                    values.append((1, v))
1627                except asyncio.TimeoutError as exc:
1628                    values.append((2, exc))
1629            return values
1630
1631        res = loop.run_until_complete(self.new_task(loop, foo()))
1632        self.assertEqual(len(res), 2, res)
1633        self.assertEqual(res[0], (1, 'a'))
1634        self.assertEqual(res[1][0], 2)
1635        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1636        self.assertAlmostEqual(0.12, loop.time())
1637
1638        # move forward to close generator
1639        loop.advance_time(10)
1640        loop.run_until_complete(asyncio.wait([a, b]))
1641
1642    def test_as_completed_with_unused_timeout(self):
1643
1644        def gen():
1645            yield
1646            yield 0
1647            yield 0.01
1648
1649        loop = self.new_test_loop(gen)
1650
1651        a = asyncio.sleep(0.01, 'a')
1652
1653        async def foo():
1654            for f in asyncio.as_completed([a], timeout=1):
1655                v = await f
1656                self.assertEqual(v, 'a')
1657
1658        loop.run_until_complete(self.new_task(loop, foo()))
1659
1660    def test_as_completed_reverse_wait(self):
1661
1662        def gen():
1663            yield 0
1664            yield 0.05
1665            yield 0
1666
1667        loop = self.new_test_loop(gen)
1668
1669        a = asyncio.sleep(0.05, 'a')
1670        b = asyncio.sleep(0.10, 'b')
1671        fs = {a, b}
1672
1673        async def test():
1674            futs = list(asyncio.as_completed(fs))
1675            self.assertEqual(len(futs), 2)
1676
1677            x = await futs[1]
1678            self.assertEqual(x, 'a')
1679            self.assertAlmostEqual(0.05, loop.time())
1680            loop.advance_time(0.05)
1681            y = await futs[0]
1682            self.assertEqual(y, 'b')
1683            self.assertAlmostEqual(0.10, loop.time())
1684
1685        loop.run_until_complete(test())
1686
1687    def test_as_completed_concurrent(self):
1688
1689        def gen():
1690            when = yield
1691            self.assertAlmostEqual(0.05, when)
1692            when = yield 0
1693            self.assertAlmostEqual(0.05, when)
1694            yield 0.05
1695
1696        a = asyncio.sleep(0.05, 'a')
1697        b = asyncio.sleep(0.05, 'b')
1698        fs = {a, b}
1699
1700        async def test():
1701            futs = list(asyncio.as_completed(fs))
1702            self.assertEqual(len(futs), 2)
1703            waiter = asyncio.wait(futs)
1704            # Deprecation from passing coros in futs to asyncio.wait()
1705            with self.assertWarns(DeprecationWarning) as cm:
1706                done, pending = await waiter
1707            self.assertEqual(cm.warnings[0].filename, __file__)
1708            self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1709
1710        loop = self.new_test_loop(gen)
1711        loop.run_until_complete(test())
1712
1713    def test_as_completed_duplicate_coroutines(self):
1714
1715        with self.assertWarns(DeprecationWarning):
1716            @asyncio.coroutine
1717            def coro(s):
1718                return s
1719
1720        with self.assertWarns(DeprecationWarning):
1721            @asyncio.coroutine
1722            def runner():
1723                result = []
1724                c = coro('ham')
1725                for f in asyncio.as_completed([c, c, coro('spam')]):
1726                    result.append((yield from f))
1727                return result
1728
1729        fut = self.new_task(self.loop, runner())
1730        self.loop.run_until_complete(fut)
1731        result = fut.result()
1732        self.assertEqual(set(result), {'ham', 'spam'})
1733        self.assertEqual(len(result), 2)
1734
1735    def test_as_completed_coroutine_without_loop(self):
1736        async def coro():
1737            return 42
1738
1739        a = coro()
1740        self.addCleanup(a.close)
1741
1742        futs = asyncio.as_completed([a])
1743        with self.assertWarns(DeprecationWarning) as cm:
1744            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
1745                list(futs)
1746        self.assertEqual(cm.warnings[0].filename, __file__)
1747
1748    def test_as_completed_coroutine_use_running_loop(self):
1749        loop = self.new_test_loop()
1750
1751        async def coro():
1752            return 42
1753
1754        async def test():
1755            futs = list(asyncio.as_completed([coro()]))
1756            self.assertEqual(len(futs), 1)
1757            self.assertEqual(await futs[0], 42)
1758
1759        loop.run_until_complete(test())
1760
1761    def test_as_completed_coroutine_use_global_loop(self):
1762        # Deprecated in 3.10
1763        async def coro():
1764            return 42
1765
1766        loop = self.new_test_loop()
1767        asyncio.set_event_loop(loop)
1768        self.addCleanup(asyncio.set_event_loop, None)
1769        futs = asyncio.as_completed([coro()])
1770        with self.assertWarns(DeprecationWarning) as cm:
1771            futs = list(futs)
1772        self.assertEqual(cm.warnings[0].filename, __file__)
1773        self.assertEqual(len(futs), 1)
1774        self.assertEqual(loop.run_until_complete(futs[0]), 42)
1775
1776    def test_sleep(self):
1777
1778        def gen():
1779            when = yield
1780            self.assertAlmostEqual(0.05, when)
1781            when = yield 0.05
1782            self.assertAlmostEqual(0.1, when)
1783            yield 0.05
1784
1785        loop = self.new_test_loop(gen)
1786
1787        async def sleeper(dt, arg):
1788            await asyncio.sleep(dt/2)
1789            res = await asyncio.sleep(dt/2, arg)
1790            return res
1791
1792        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1793        loop.run_until_complete(t)
1794        self.assertTrue(t.done())
1795        self.assertEqual(t.result(), 'yeah')
1796        self.assertAlmostEqual(0.1, loop.time())
1797
1798    def test_sleep_cancel(self):
1799
1800        def gen():
1801            when = yield
1802            self.assertAlmostEqual(10.0, when)
1803            yield 0
1804
1805        loop = self.new_test_loop(gen)
1806
1807        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1808
1809        handle = None
1810        orig_call_later = loop.call_later
1811
1812        def call_later(delay, callback, *args):
1813            nonlocal handle
1814            handle = orig_call_later(delay, callback, *args)
1815            return handle
1816
1817        loop.call_later = call_later
1818        test_utils.run_briefly(loop)
1819
1820        self.assertFalse(handle._cancelled)
1821
1822        t.cancel()
1823        test_utils.run_briefly(loop)
1824        self.assertTrue(handle._cancelled)
1825
1826    def test_task_cancel_sleeping_task(self):
1827
1828        def gen():
1829            when = yield
1830            self.assertAlmostEqual(0.1, when)
1831            when = yield 0
1832            self.assertAlmostEqual(5000, when)
1833            yield 0.1
1834
1835        loop = self.new_test_loop(gen)
1836
1837        async def sleep(dt):
1838            await asyncio.sleep(dt)
1839
1840        async def doit():
1841            sleeper = self.new_task(loop, sleep(5000))
1842            loop.call_later(0.1, sleeper.cancel)
1843            try:
1844                await sleeper
1845            except asyncio.CancelledError:
1846                return 'cancelled'
1847            else:
1848                return 'slept in'
1849
1850        doer = doit()
1851        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1852        self.assertAlmostEqual(0.1, loop.time())
1853
1854    def test_task_cancel_waiter_future(self):
1855        fut = self.new_future(self.loop)
1856
1857        async def coro():
1858            await fut
1859
1860        task = self.new_task(self.loop, coro())
1861        test_utils.run_briefly(self.loop)
1862        self.assertIs(task._fut_waiter, fut)
1863
1864        task.cancel()
1865        test_utils.run_briefly(self.loop)
1866        self.assertRaises(
1867            asyncio.CancelledError, self.loop.run_until_complete, task)
1868        self.assertIsNone(task._fut_waiter)
1869        self.assertTrue(fut.cancelled())
1870
1871    def test_task_set_methods(self):
1872        async def notmuch():
1873            return 'ko'
1874
1875        gen = notmuch()
1876        task = self.new_task(self.loop, gen)
1877
1878        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1879            task.set_result('ok')
1880
1881        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1882            task.set_exception(ValueError())
1883
1884        self.assertEqual(
1885            self.loop.run_until_complete(task),
1886            'ko')
1887
1888    def test_step_result(self):
1889        with self.assertWarns(DeprecationWarning):
1890            @asyncio.coroutine
1891            def notmuch():
1892                yield None
1893                yield 1
1894                return 'ko'
1895
1896        self.assertRaises(
1897            RuntimeError, self.loop.run_until_complete, notmuch())
1898
1899    def test_step_result_future(self):
1900        # If coroutine returns future, task waits on this future.
1901
1902        class Fut(asyncio.Future):
1903            def __init__(self, *args, **kwds):
1904                self.cb_added = False
1905                super().__init__(*args, **kwds)
1906
1907            def add_done_callback(self, *args, **kwargs):
1908                self.cb_added = True
1909                super().add_done_callback(*args, **kwargs)
1910
1911        fut = Fut(loop=self.loop)
1912        result = None
1913
1914        async def wait_for_future():
1915            nonlocal result
1916            result = await fut
1917
1918        t = self.new_task(self.loop, wait_for_future())
1919        test_utils.run_briefly(self.loop)
1920        self.assertTrue(fut.cb_added)
1921
1922        res = object()
1923        fut.set_result(res)
1924        test_utils.run_briefly(self.loop)
1925        self.assertIs(res, result)
1926        self.assertTrue(t.done())
1927        self.assertIsNone(t.result())
1928
1929    def test_baseexception_during_cancel(self):
1930
1931        def gen():
1932            when = yield
1933            self.assertAlmostEqual(10.0, when)
1934            yield 0
1935
1936        loop = self.new_test_loop(gen)
1937
1938        async def sleeper():
1939            await asyncio.sleep(10)
1940
1941        base_exc = SystemExit()
1942
1943        async def notmutch():
1944            try:
1945                await sleeper()
1946            except asyncio.CancelledError:
1947                raise base_exc
1948
1949        task = self.new_task(loop, notmutch())
1950        test_utils.run_briefly(loop)
1951
1952        task.cancel()
1953        self.assertFalse(task.done())
1954
1955        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1956
1957        self.assertTrue(task.done())
1958        self.assertFalse(task.cancelled())
1959        self.assertIs(task.exception(), base_exc)
1960
1961    def test_iscoroutinefunction(self):
1962        def fn():
1963            pass
1964
1965        self.assertFalse(asyncio.iscoroutinefunction(fn))
1966
1967        def fn1():
1968            yield
1969        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1970
1971        with self.assertWarns(DeprecationWarning):
1972            @asyncio.coroutine
1973            def fn2():
1974                yield
1975        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1976
1977        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1978
1979    def test_yield_vs_yield_from(self):
1980        fut = self.new_future(self.loop)
1981
1982        with self.assertWarns(DeprecationWarning):
1983            @asyncio.coroutine
1984            def wait_for_future():
1985                yield fut
1986
1987        task = wait_for_future()
1988        with self.assertRaises(RuntimeError):
1989            self.loop.run_until_complete(task)
1990
1991        self.assertFalse(fut.done())
1992
1993    def test_yield_vs_yield_from_generator(self):
1994        with self.assertWarns(DeprecationWarning):
1995            @asyncio.coroutine
1996            def coro():
1997                yield
1998
1999        with self.assertWarns(DeprecationWarning):
2000            @asyncio.coroutine
2001            def wait_for_future():
2002                gen = coro()
2003                try:
2004                    yield gen
2005                finally:
2006                    gen.close()
2007
2008        task = wait_for_future()
2009        self.assertRaises(
2010            RuntimeError,
2011            self.loop.run_until_complete, task)
2012
2013    def test_coroutine_non_gen_function(self):
2014        with self.assertWarns(DeprecationWarning):
2015            @asyncio.coroutine
2016            def func():
2017                return 'test'
2018
2019        self.assertTrue(asyncio.iscoroutinefunction(func))
2020
2021        coro = func()
2022        self.assertTrue(asyncio.iscoroutine(coro))
2023
2024        res = self.loop.run_until_complete(coro)
2025        self.assertEqual(res, 'test')
2026
2027    def test_coroutine_non_gen_function_return_future(self):
2028        fut = self.new_future(self.loop)
2029
2030        with self.assertWarns(DeprecationWarning):
2031            @asyncio.coroutine
2032            def func():
2033                return fut
2034
2035        async def coro():
2036            fut.set_result('test')
2037
2038        t1 = self.new_task(self.loop, func())
2039        t2 = self.new_task(self.loop, coro())
2040        res = self.loop.run_until_complete(t1)
2041        self.assertEqual(res, 'test')
2042        self.assertIsNone(t2.result())
2043
2044    def test_current_task(self):
2045        self.assertIsNone(asyncio.current_task(loop=self.loop))
2046
2047        async def coro(loop):
2048            self.assertIs(asyncio.current_task(), task)
2049
2050            self.assertIs(asyncio.current_task(None), task)
2051            self.assertIs(asyncio.current_task(), task)
2052
2053        task = self.new_task(self.loop, coro(self.loop))
2054        self.loop.run_until_complete(task)
2055        self.assertIsNone(asyncio.current_task(loop=self.loop))
2056
2057    def test_current_task_with_interleaving_tasks(self):
2058        self.assertIsNone(asyncio.current_task(loop=self.loop))
2059
2060        fut1 = self.new_future(self.loop)
2061        fut2 = self.new_future(self.loop)
2062
2063        async def coro1(loop):
2064            self.assertTrue(asyncio.current_task() is task1)
2065            await fut1
2066            self.assertTrue(asyncio.current_task() is task1)
2067            fut2.set_result(True)
2068
2069        async def coro2(loop):
2070            self.assertTrue(asyncio.current_task() is task2)
2071            fut1.set_result(True)
2072            await fut2
2073            self.assertTrue(asyncio.current_task() is task2)
2074
2075        task1 = self.new_task(self.loop, coro1(self.loop))
2076        task2 = self.new_task(self.loop, coro2(self.loop))
2077
2078        self.loop.run_until_complete(asyncio.wait((task1, task2)))
2079        self.assertIsNone(asyncio.current_task(loop=self.loop))
2080
2081    # Some thorough tests for cancellation propagation through
2082    # coroutines, tasks and wait().
2083
2084    def test_yield_future_passes_cancel(self):
2085        # Cancelling outer() cancels inner() cancels waiter.
2086        proof = 0
2087        waiter = self.new_future(self.loop)
2088
2089        async def inner():
2090            nonlocal proof
2091            try:
2092                await waiter
2093            except asyncio.CancelledError:
2094                proof += 1
2095                raise
2096            else:
2097                self.fail('got past sleep() in inner()')
2098
2099        async def outer():
2100            nonlocal proof
2101            try:
2102                await inner()
2103            except asyncio.CancelledError:
2104                proof += 100  # Expect this path.
2105            else:
2106                proof += 10
2107
2108        f = asyncio.ensure_future(outer(), loop=self.loop)
2109        test_utils.run_briefly(self.loop)
2110        f.cancel()
2111        self.loop.run_until_complete(f)
2112        self.assertEqual(proof, 101)
2113        self.assertTrue(waiter.cancelled())
2114
2115    def test_yield_wait_does_not_shield_cancel(self):
2116        # Cancelling outer() makes wait() return early, leaves inner()
2117        # running.
2118        proof = 0
2119        waiter = self.new_future(self.loop)
2120
2121        async def inner():
2122            nonlocal proof
2123            await waiter
2124            proof += 1
2125
2126        async def outer():
2127            nonlocal proof
2128            with self.assertWarns(DeprecationWarning):
2129                d, p = await asyncio.wait([inner()])
2130            proof += 100
2131
2132        f = asyncio.ensure_future(outer(), loop=self.loop)
2133        test_utils.run_briefly(self.loop)
2134        f.cancel()
2135        self.assertRaises(
2136            asyncio.CancelledError, self.loop.run_until_complete, f)
2137        waiter.set_result(None)
2138        test_utils.run_briefly(self.loop)
2139        self.assertEqual(proof, 1)
2140
2141    def test_shield_result(self):
2142        inner = self.new_future(self.loop)
2143        outer = asyncio.shield(inner)
2144        inner.set_result(42)
2145        res = self.loop.run_until_complete(outer)
2146        self.assertEqual(res, 42)
2147
2148    def test_shield_exception(self):
2149        inner = self.new_future(self.loop)
2150        outer = asyncio.shield(inner)
2151        test_utils.run_briefly(self.loop)
2152        exc = RuntimeError('expected')
2153        inner.set_exception(exc)
2154        test_utils.run_briefly(self.loop)
2155        self.assertIs(outer.exception(), exc)
2156
2157    def test_shield_cancel_inner(self):
2158        inner = self.new_future(self.loop)
2159        outer = asyncio.shield(inner)
2160        test_utils.run_briefly(self.loop)
2161        inner.cancel()
2162        test_utils.run_briefly(self.loop)
2163        self.assertTrue(outer.cancelled())
2164
2165    def test_shield_cancel_outer(self):
2166        inner = self.new_future(self.loop)
2167        outer = asyncio.shield(inner)
2168        test_utils.run_briefly(self.loop)
2169        outer.cancel()
2170        test_utils.run_briefly(self.loop)
2171        self.assertTrue(outer.cancelled())
2172        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
2173
2174    def test_shield_shortcut(self):
2175        fut = self.new_future(self.loop)
2176        fut.set_result(42)
2177        res = self.loop.run_until_complete(asyncio.shield(fut))
2178        self.assertEqual(res, 42)
2179
2180    def test_shield_effect(self):
2181        # Cancelling outer() does not affect inner().
2182        proof = 0
2183        waiter = self.new_future(self.loop)
2184
2185        async def inner():
2186            nonlocal proof
2187            await waiter
2188            proof += 1
2189
2190        async def outer():
2191            nonlocal proof
2192            await asyncio.shield(inner())
2193            proof += 100
2194
2195        f = asyncio.ensure_future(outer(), loop=self.loop)
2196        test_utils.run_briefly(self.loop)
2197        f.cancel()
2198        with self.assertRaises(asyncio.CancelledError):
2199            self.loop.run_until_complete(f)
2200        waiter.set_result(None)
2201        test_utils.run_briefly(self.loop)
2202        self.assertEqual(proof, 1)
2203
2204    def test_shield_gather(self):
2205        child1 = self.new_future(self.loop)
2206        child2 = self.new_future(self.loop)
2207        parent = asyncio.gather(child1, child2)
2208        outer = asyncio.shield(parent)
2209        test_utils.run_briefly(self.loop)
2210        outer.cancel()
2211        test_utils.run_briefly(self.loop)
2212        self.assertTrue(outer.cancelled())
2213        child1.set_result(1)
2214        child2.set_result(2)
2215        test_utils.run_briefly(self.loop)
2216        self.assertEqual(parent.result(), [1, 2])
2217
2218    def test_gather_shield(self):
2219        child1 = self.new_future(self.loop)
2220        child2 = self.new_future(self.loop)
2221        inner1 = asyncio.shield(child1)
2222        inner2 = asyncio.shield(child2)
2223        parent = asyncio.gather(inner1, inner2)
2224        test_utils.run_briefly(self.loop)
2225        parent.cancel()
2226        # This should cancel inner1 and inner2 but bot child1 and child2.
2227        test_utils.run_briefly(self.loop)
2228        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
2229        self.assertTrue(inner1.cancelled())
2230        self.assertTrue(inner2.cancelled())
2231        child1.set_result(1)
2232        child2.set_result(2)
2233        test_utils.run_briefly(self.loop)
2234
2235    def test_shield_coroutine_without_loop(self):
2236        async def coro():
2237            return 42
2238
2239        inner = coro()
2240        self.addCleanup(inner.close)
2241        with self.assertWarns(DeprecationWarning) as cm:
2242            with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
2243                asyncio.shield(inner)
2244        self.assertEqual(cm.warnings[0].filename, __file__)
2245
2246    def test_shield_coroutine_use_running_loop(self):
2247        async def coro():
2248            return 42
2249
2250        async def test():
2251            return asyncio.shield(coro())
2252        outer = self.loop.run_until_complete(test())
2253        self.assertEqual(outer._loop, self.loop)
2254        res = self.loop.run_until_complete(outer)
2255        self.assertEqual(res, 42)
2256
2257    def test_shield_coroutine_use_global_loop(self):
2258        # Deprecated in 3.10
2259        async def coro():
2260            return 42
2261
2262        asyncio.set_event_loop(self.loop)
2263        self.addCleanup(asyncio.set_event_loop, None)
2264        with self.assertWarns(DeprecationWarning) as cm:
2265            outer = asyncio.shield(coro())
2266        self.assertEqual(cm.warnings[0].filename, __file__)
2267        self.assertEqual(outer._loop, self.loop)
2268        res = self.loop.run_until_complete(outer)
2269        self.assertEqual(res, 42)
2270
2271    def test_as_completed_invalid_args(self):
2272        fut = self.new_future(self.loop)
2273
2274        # as_completed() expects a list of futures, not a future instance
2275        self.assertRaises(TypeError, self.loop.run_until_complete,
2276            asyncio.as_completed(fut))
2277        coro = coroutine_function()
2278        self.assertRaises(TypeError, self.loop.run_until_complete,
2279            asyncio.as_completed(coro))
2280        coro.close()
2281
2282    def test_wait_invalid_args(self):
2283        fut = self.new_future(self.loop)
2284
2285        # wait() expects a list of futures, not a future instance
2286        self.assertRaises(TypeError, self.loop.run_until_complete,
2287            asyncio.wait(fut))
2288        coro = coroutine_function()
2289        self.assertRaises(TypeError, self.loop.run_until_complete,
2290            asyncio.wait(coro))
2291        coro.close()
2292
2293        # wait() expects at least a future
2294        self.assertRaises(ValueError, self.loop.run_until_complete,
2295            asyncio.wait([]))
2296
2297    def test_corowrapper_mocks_generator(self):
2298
2299        def check():
2300            # A function that asserts various things.
2301            # Called twice, with different debug flag values.
2302
2303            with self.assertWarns(DeprecationWarning):
2304                @asyncio.coroutine
2305                def coro():
2306                    # The actual coroutine.
2307                    self.assertTrue(gen.gi_running)
2308                    yield from fut
2309
2310            # A completed Future used to run the coroutine.
2311            fut = self.new_future(self.loop)
2312            fut.set_result(None)
2313
2314            # Call the coroutine.
2315            gen = coro()
2316
2317            # Check some properties.
2318            self.assertTrue(asyncio.iscoroutine(gen))
2319            self.assertIsInstance(gen.gi_frame, types.FrameType)
2320            self.assertFalse(gen.gi_running)
2321            self.assertIsInstance(gen.gi_code, types.CodeType)
2322
2323            # Run it.
2324            self.loop.run_until_complete(gen)
2325
2326            # The frame should have changed.
2327            self.assertIsNone(gen.gi_frame)
2328
2329        # Test with debug flag cleared.
2330        with set_coroutine_debug(False):
2331            check()
2332
2333        # Test with debug flag set.
2334        with set_coroutine_debug(True):
2335            check()
2336
2337    def test_yield_from_corowrapper(self):
2338        with set_coroutine_debug(True):
2339            with self.assertWarns(DeprecationWarning):
2340                @asyncio.coroutine
2341                def t1():
2342                    return (yield from t2())
2343
2344            with self.assertWarns(DeprecationWarning):
2345                @asyncio.coroutine
2346                def t2():
2347                    f = self.new_future(self.loop)
2348                    self.new_task(self.loop, t3(f))
2349                    return (yield from f)
2350
2351            with self.assertWarns(DeprecationWarning):
2352                @asyncio.coroutine
2353                def t3(f):
2354                    f.set_result((1, 2, 3))
2355
2356            task = self.new_task(self.loop, t1())
2357            val = self.loop.run_until_complete(task)
2358            self.assertEqual(val, (1, 2, 3))
2359
2360    def test_yield_from_corowrapper_send(self):
2361        def foo():
2362            a = yield
2363            return a
2364
2365        def call(arg):
2366            cw = asyncio.coroutines.CoroWrapper(foo())
2367            cw.send(None)
2368            try:
2369                cw.send(arg)
2370            except StopIteration as ex:
2371                return ex.args[0]
2372            else:
2373                raise AssertionError('StopIteration was expected')
2374
2375        self.assertEqual(call((1, 2)), (1, 2))
2376        self.assertEqual(call('spam'), 'spam')
2377
2378    def test_corowrapper_weakref(self):
2379        wd = weakref.WeakValueDictionary()
2380        def foo(): yield from []
2381        cw = asyncio.coroutines.CoroWrapper(foo())
2382        wd['cw'] = cw  # Would fail without __weakref__ slot.
2383        cw.gen = None  # Suppress warning from __del__.
2384
2385    def test_corowrapper_throw(self):
2386        # Issue 429: CoroWrapper.throw must be compatible with gen.throw
2387        def foo():
2388            value = None
2389            while True:
2390                try:
2391                    value = yield value
2392                except Exception as e:
2393                    value = e
2394
2395        exception = Exception("foo")
2396        cw = asyncio.coroutines.CoroWrapper(foo())
2397        cw.send(None)
2398        self.assertIs(exception, cw.throw(exception))
2399
2400        cw = asyncio.coroutines.CoroWrapper(foo())
2401        cw.send(None)
2402        self.assertIs(exception, cw.throw(Exception, exception))
2403
2404        cw = asyncio.coroutines.CoroWrapper(foo())
2405        cw.send(None)
2406        exception = cw.throw(Exception, "foo")
2407        self.assertIsInstance(exception, Exception)
2408        self.assertEqual(exception.args, ("foo", ))
2409
2410        cw = asyncio.coroutines.CoroWrapper(foo())
2411        cw.send(None)
2412        exception = cw.throw(Exception, "foo", None)
2413        self.assertIsInstance(exception, Exception)
2414        self.assertEqual(exception.args, ("foo", ))
2415
2416    def test_log_destroyed_pending_task(self):
2417        Task = self.__class__.Task
2418
2419        with self.assertWarns(DeprecationWarning):
2420            @asyncio.coroutine
2421            def kill_me(loop):
2422                future = self.new_future(loop)
2423                yield from future
2424                # at this point, the only reference to kill_me() task is
2425                # the Task._wakeup() method in future._callbacks
2426                raise Exception("code never reached")
2427
2428        mock_handler = mock.Mock()
2429        self.loop.set_debug(True)
2430        self.loop.set_exception_handler(mock_handler)
2431
2432        # schedule the task
2433        coro = kill_me(self.loop)
2434        task = asyncio.ensure_future(coro, loop=self.loop)
2435
2436        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2437
2438        asyncio.set_event_loop(None)
2439
2440        # execute the task so it waits for future
2441        self.loop._run_once()
2442        self.assertEqual(len(self.loop._ready), 0)
2443
2444        # remove the future used in kill_me(), and references to the task
2445        del coro.gi_frame.f_locals['future']
2446        coro = None
2447        source_traceback = task._source_traceback
2448        task = None
2449
2450        # no more reference to kill_me() task: the task is destroyed by the GC
2451        support.gc_collect()
2452
2453        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2454
2455        mock_handler.assert_called_with(self.loop, {
2456            'message': 'Task was destroyed but it is pending!',
2457            'task': mock.ANY,
2458            'source_traceback': source_traceback,
2459        })
2460        mock_handler.reset_mock()
2461
2462    @mock.patch('asyncio.base_events.logger')
2463    def test_tb_logger_not_called_after_cancel(self, m_log):
2464        loop = asyncio.new_event_loop()
2465        self.set_event_loop(loop)
2466
2467        async def coro():
2468            raise TypeError
2469
2470        async def runner():
2471            task = self.new_task(loop, coro())
2472            await asyncio.sleep(0.05)
2473            task.cancel()
2474            task = None
2475
2476        loop.run_until_complete(runner())
2477        self.assertFalse(m_log.error.called)
2478
2479    @mock.patch('asyncio.coroutines.logger')
2480    def test_coroutine_never_yielded(self, m_log):
2481        with set_coroutine_debug(True):
2482            with self.assertWarns(DeprecationWarning):
2483                @asyncio.coroutine
2484                def coro_noop():
2485                    pass
2486
2487        tb_filename = __file__
2488        tb_lineno = sys._getframe().f_lineno + 2
2489        # create a coroutine object but don't use it
2490        coro_noop()
2491        support.gc_collect()
2492
2493        self.assertTrue(m_log.error.called)
2494        message = m_log.error.call_args[0][0]
2495        func_filename, func_lineno = test_utils.get_function_source(coro_noop)
2496
2497        regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
2498                    r'was never yielded from\n'
2499                 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
2500                 r'.*\n'
2501                 r'  File "%s", line %s, in test_coroutine_never_yielded\n'
2502                 r'    coro_noop\(\)$'
2503                 % (re.escape(coro_noop.__qualname__),
2504                    re.escape(func_filename), func_lineno,
2505                    re.escape(tb_filename), tb_lineno))
2506
2507        self.assertRegex(message, re.compile(regex, re.DOTALL))
2508
2509    def test_return_coroutine_from_coroutine(self):
2510        """Return of @asyncio.coroutine()-wrapped function generator object
2511        from @asyncio.coroutine()-wrapped function should have same effect as
2512        returning generator object or Future."""
2513        def check():
2514            with self.assertWarns(DeprecationWarning):
2515                @asyncio.coroutine
2516                def outer_coro():
2517                    with self.assertWarns(DeprecationWarning):
2518                        @asyncio.coroutine
2519                        def inner_coro():
2520                            return 1
2521
2522                    return inner_coro()
2523
2524            result = self.loop.run_until_complete(outer_coro())
2525            self.assertEqual(result, 1)
2526
2527        # Test with debug flag cleared.
2528        with set_coroutine_debug(False):
2529            check()
2530
2531        # Test with debug flag set.
2532        with set_coroutine_debug(True):
2533            check()
2534
2535    def test_task_source_traceback(self):
2536        self.loop.set_debug(True)
2537
2538        task = self.new_task(self.loop, coroutine_function())
2539        lineno = sys._getframe().f_lineno - 1
2540        self.assertIsInstance(task._source_traceback, list)
2541        self.assertEqual(task._source_traceback[-2][:3],
2542                         (__file__,
2543                          lineno,
2544                          'test_task_source_traceback'))
2545        self.loop.run_until_complete(task)
2546
2547    def _test_cancel_wait_for(self, timeout):
2548        loop = asyncio.new_event_loop()
2549        self.addCleanup(loop.close)
2550
2551        async def blocking_coroutine():
2552            fut = self.new_future(loop)
2553            # Block: fut result is never set
2554            await fut
2555
2556        task = loop.create_task(blocking_coroutine())
2557
2558        wait = loop.create_task(asyncio.wait_for(task, timeout))
2559        loop.call_soon(wait.cancel)
2560
2561        self.assertRaises(asyncio.CancelledError,
2562                          loop.run_until_complete, wait)
2563
2564        # Python issue #23219: cancelling the wait must also cancel the task
2565        self.assertTrue(task.cancelled())
2566
2567    def test_cancel_blocking_wait_for(self):
2568        self._test_cancel_wait_for(None)
2569
2570    def test_cancel_wait_for(self):
2571        self._test_cancel_wait_for(60.0)
2572
2573    def test_cancel_gather_1(self):
2574        """Ensure that a gathering future refuses to be cancelled once all
2575        children are done"""
2576        loop = asyncio.new_event_loop()
2577        self.addCleanup(loop.close)
2578
2579        fut = self.new_future(loop)
2580        async def create():
2581            # The indirection fut->child_coro is needed since otherwise the
2582            # gathering task is done at the same time as the child future
2583            def child_coro():
2584                return (yield from fut)
2585            gather_future = asyncio.gather(child_coro())
2586            return asyncio.ensure_future(gather_future)
2587        gather_task = loop.run_until_complete(create())
2588
2589        cancel_result = None
2590        def cancelling_callback(_):
2591            nonlocal cancel_result
2592            cancel_result = gather_task.cancel()
2593        fut.add_done_callback(cancelling_callback)
2594
2595        fut.set_result(42) # calls the cancelling_callback after fut is done()
2596
2597        # At this point the task should complete.
2598        loop.run_until_complete(gather_task)
2599
2600        # Python issue #26923: asyncio.gather drops cancellation
2601        self.assertEqual(cancel_result, False)
2602        self.assertFalse(gather_task.cancelled())
2603        self.assertEqual(gather_task.result(), [42])
2604
2605    def test_cancel_gather_2(self):
2606        cases = [
2607            ((), ()),
2608            ((None,), ()),
2609            (('my message',), ('my message',)),
2610            # Non-string values should roundtrip.
2611            ((5,), (5,)),
2612        ]
2613        for cancel_args, expected_args in cases:
2614            with self.subTest(cancel_args=cancel_args):
2615                loop = asyncio.new_event_loop()
2616                self.addCleanup(loop.close)
2617
2618                async def test():
2619                    time = 0
2620                    while True:
2621                        time += 0.05
2622                        await asyncio.gather(asyncio.sleep(0.05),
2623                                             return_exceptions=True)
2624                        if time > 1:
2625                            return
2626
2627                async def main():
2628                    qwe = self.new_task(loop, test())
2629                    await asyncio.sleep(0.2)
2630                    qwe.cancel(*cancel_args)
2631                    await qwe
2632
2633                try:
2634                    loop.run_until_complete(main())
2635                except asyncio.CancelledError as exc:
2636                    self.assertEqual(exc.args, ())
2637                    exc_type, exc_args, depth = get_innermost_context(exc)
2638                    self.assertEqual((exc_type, exc_args),
2639                        (asyncio.CancelledError, expected_args))
2640                    # The exact traceback seems to vary in CI.
2641                    self.assertIn(depth, (2, 3))
2642                else:
2643                    self.fail('gather did not propagate the cancellation '
2644                              'request')
2645
2646    def test_exception_traceback(self):
2647        # See http://bugs.python.org/issue28843
2648
2649        async def foo():
2650            1 / 0
2651
2652        async def main():
2653            task = self.new_task(self.loop, foo())
2654            await asyncio.sleep(0)  # skip one loop iteration
2655            self.assertIsNotNone(task.exception().__traceback__)
2656
2657        self.loop.run_until_complete(main())
2658
2659    @mock.patch('asyncio.base_events.logger')
2660    def test_error_in_call_soon(self, m_log):
2661        def call_soon(callback, *args, **kwargs):
2662            raise ValueError
2663        self.loop.call_soon = call_soon
2664
2665        with self.assertWarns(DeprecationWarning):
2666            @asyncio.coroutine
2667            def coro():
2668                pass
2669
2670        self.assertFalse(m_log.error.called)
2671
2672        with self.assertRaises(ValueError):
2673            gen = coro()
2674            try:
2675                self.new_task(self.loop, gen)
2676            finally:
2677                gen.close()
2678        gc.collect()  # For PyPy or other GCs.
2679
2680        self.assertTrue(m_log.error.called)
2681        message = m_log.error.call_args[0][0]
2682        self.assertIn('Task was destroyed but it is pending', message)
2683
2684        self.assertEqual(asyncio.all_tasks(self.loop), set())
2685
2686    def test_create_task_with_noncoroutine(self):
2687        with self.assertRaisesRegex(TypeError,
2688                                    "a coroutine was expected, got 123"):
2689            self.new_task(self.loop, 123)
2690
2691        # test it for the second time to ensure that caching
2692        # in asyncio.iscoroutine() doesn't break things.
2693        with self.assertRaisesRegex(TypeError,
2694                                    "a coroutine was expected, got 123"):
2695            self.new_task(self.loop, 123)
2696
2697    def test_create_task_with_oldstyle_coroutine(self):
2698
2699        with self.assertWarns(DeprecationWarning):
2700            @asyncio.coroutine
2701            def coro():
2702                pass
2703
2704        task = self.new_task(self.loop, coro())
2705        self.assertIsInstance(task, self.Task)
2706        self.loop.run_until_complete(task)
2707
2708        # test it for the second time to ensure that caching
2709        # in asyncio.iscoroutine() doesn't break things.
2710        task = self.new_task(self.loop, coro())
2711        self.assertIsInstance(task, self.Task)
2712        self.loop.run_until_complete(task)
2713
2714    def test_create_task_with_async_function(self):
2715
2716        async def coro():
2717            pass
2718
2719        task = self.new_task(self.loop, coro())
2720        self.assertIsInstance(task, self.Task)
2721        self.loop.run_until_complete(task)
2722
2723        # test it for the second time to ensure that caching
2724        # in asyncio.iscoroutine() doesn't break things.
2725        task = self.new_task(self.loop, coro())
2726        self.assertIsInstance(task, self.Task)
2727        self.loop.run_until_complete(task)
2728
2729    def test_create_task_with_asynclike_function(self):
2730        task = self.new_task(self.loop, CoroLikeObject())
2731        self.assertIsInstance(task, self.Task)
2732        self.assertEqual(self.loop.run_until_complete(task), 42)
2733
2734        # test it for the second time to ensure that caching
2735        # in asyncio.iscoroutine() doesn't break things.
2736        task = self.new_task(self.loop, CoroLikeObject())
2737        self.assertIsInstance(task, self.Task)
2738        self.assertEqual(self.loop.run_until_complete(task), 42)
2739
2740    def test_bare_create_task(self):
2741
2742        async def inner():
2743            return 1
2744
2745        async def coro():
2746            task = asyncio.create_task(inner())
2747            self.assertIsInstance(task, self.Task)
2748            ret = await task
2749            self.assertEqual(1, ret)
2750
2751        self.loop.run_until_complete(coro())
2752
2753    def test_bare_create_named_task(self):
2754
2755        async def coro_noop():
2756            pass
2757
2758        async def coro():
2759            task = asyncio.create_task(coro_noop(), name='No-op')
2760            self.assertEqual(task.get_name(), 'No-op')
2761            await task
2762
2763        self.loop.run_until_complete(coro())
2764
2765    def test_context_1(self):
2766        cvar = contextvars.ContextVar('cvar', default='nope')
2767
2768        async def sub():
2769            await asyncio.sleep(0.01)
2770            self.assertEqual(cvar.get(), 'nope')
2771            cvar.set('something else')
2772
2773        async def main():
2774            self.assertEqual(cvar.get(), 'nope')
2775            subtask = self.new_task(loop, sub())
2776            cvar.set('yes')
2777            self.assertEqual(cvar.get(), 'yes')
2778            await subtask
2779            self.assertEqual(cvar.get(), 'yes')
2780
2781        loop = asyncio.new_event_loop()
2782        try:
2783            task = self.new_task(loop, main())
2784            loop.run_until_complete(task)
2785        finally:
2786            loop.close()
2787
2788    def test_context_2(self):
2789        cvar = contextvars.ContextVar('cvar', default='nope')
2790
2791        async def main():
2792            def fut_on_done(fut):
2793                # This change must not pollute the context
2794                # of the "main()" task.
2795                cvar.set('something else')
2796
2797            self.assertEqual(cvar.get(), 'nope')
2798
2799            for j in range(2):
2800                fut = self.new_future(loop)
2801                fut.add_done_callback(fut_on_done)
2802                cvar.set(f'yes{j}')
2803                loop.call_soon(fut.set_result, None)
2804                await fut
2805                self.assertEqual(cvar.get(), f'yes{j}')
2806
2807                for i in range(3):
2808                    # Test that task passed its context to add_done_callback:
2809                    cvar.set(f'yes{i}-{j}')
2810                    await asyncio.sleep(0.001)
2811                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2812
2813        loop = asyncio.new_event_loop()
2814        try:
2815            task = self.new_task(loop, main())
2816            loop.run_until_complete(task)
2817        finally:
2818            loop.close()
2819
2820        self.assertEqual(cvar.get(), 'nope')
2821
2822    def test_context_3(self):
2823        # Run 100 Tasks in parallel, each modifying cvar.
2824
2825        cvar = contextvars.ContextVar('cvar', default=-1)
2826
2827        async def sub(num):
2828            for i in range(10):
2829                cvar.set(num + i)
2830                await asyncio.sleep(random.uniform(0.001, 0.05))
2831                self.assertEqual(cvar.get(), num + i)
2832
2833        async def main():
2834            tasks = []
2835            for i in range(100):
2836                task = loop.create_task(sub(random.randint(0, 10)))
2837                tasks.append(task)
2838
2839            await asyncio.gather(*tasks)
2840
2841        loop = asyncio.new_event_loop()
2842        try:
2843            loop.run_until_complete(main())
2844        finally:
2845            loop.close()
2846
2847        self.assertEqual(cvar.get(), -1)
2848
2849    def test_get_coro(self):
2850        loop = asyncio.new_event_loop()
2851        coro = coroutine_function()
2852        try:
2853            task = self.new_task(loop, coro)
2854            loop.run_until_complete(task)
2855            self.assertIs(task.get_coro(), coro)
2856        finally:
2857            loop.close()
2858
2859
2860def add_subclass_tests(cls):
2861    BaseTask = cls.Task
2862    BaseFuture = cls.Future
2863
2864    if BaseTask is None or BaseFuture is None:
2865        return cls
2866
2867    class CommonFuture:
2868        def __init__(self, *args, **kwargs):
2869            self.calls = collections.defaultdict(lambda: 0)
2870            super().__init__(*args, **kwargs)
2871
2872        def add_done_callback(self, *args, **kwargs):
2873            self.calls['add_done_callback'] += 1
2874            return super().add_done_callback(*args, **kwargs)
2875
2876    class Task(CommonFuture, BaseTask):
2877        pass
2878
2879    class Future(CommonFuture, BaseFuture):
2880        pass
2881
2882    def test_subclasses_ctask_cfuture(self):
2883        fut = self.Future(loop=self.loop)
2884
2885        async def func():
2886            self.loop.call_soon(lambda: fut.set_result('spam'))
2887            return await fut
2888
2889        task = self.Task(func(), loop=self.loop)
2890
2891        result = self.loop.run_until_complete(task)
2892
2893        self.assertEqual(result, 'spam')
2894
2895        self.assertEqual(
2896            dict(task.calls),
2897            {'add_done_callback': 1})
2898
2899        self.assertEqual(
2900            dict(fut.calls),
2901            {'add_done_callback': 1})
2902
2903    # Add patched Task & Future back to the test case
2904    cls.Task = Task
2905    cls.Future = Future
2906
2907    # Add an extra unit-test
2908    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2909
2910    # Disable the "test_task_source_traceback" test
2911    # (the test is hardcoded for a particular call stack, which
2912    # is slightly different for Task subclasses)
2913    cls.test_task_source_traceback = None
2914
2915    return cls
2916
2917
2918class SetMethodsTest:
2919
2920    def test_set_result_causes_invalid_state(self):
2921        Future = type(self).Future
2922        self.loop.call_exception_handler = exc_handler = mock.Mock()
2923
2924        async def foo():
2925            await asyncio.sleep(0.1)
2926            return 10
2927
2928        coro = foo()
2929        task = self.new_task(self.loop, coro)
2930        Future.set_result(task, 'spam')
2931
2932        self.assertEqual(
2933            self.loop.run_until_complete(task),
2934            'spam')
2935
2936        exc_handler.assert_called_once()
2937        exc = exc_handler.call_args[0][0]['exception']
2938        with self.assertRaisesRegex(asyncio.InvalidStateError,
2939                                    r'step\(\): already done'):
2940            raise exc
2941
2942        coro.close()
2943
2944    def test_set_exception_causes_invalid_state(self):
2945        class MyExc(Exception):
2946            pass
2947
2948        Future = type(self).Future
2949        self.loop.call_exception_handler = exc_handler = mock.Mock()
2950
2951        async def foo():
2952            await asyncio.sleep(0.1)
2953            return 10
2954
2955        coro = foo()
2956        task = self.new_task(self.loop, coro)
2957        Future.set_exception(task, MyExc())
2958
2959        with self.assertRaises(MyExc):
2960            self.loop.run_until_complete(task)
2961
2962        exc_handler.assert_called_once()
2963        exc = exc_handler.call_args[0][0]['exception']
2964        with self.assertRaisesRegex(asyncio.InvalidStateError,
2965                                    r'step\(\): already done'):
2966            raise exc
2967
2968        coro.close()
2969
2970
2971@unittest.skipUnless(hasattr(futures, '_CFuture') and
2972                     hasattr(tasks, '_CTask'),
2973                     'requires the C _asyncio module')
2974class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2975                          test_utils.TestCase):
2976
2977    Task = getattr(tasks, '_CTask', None)
2978    Future = getattr(futures, '_CFuture', None)
2979
2980    @support.refcount_test
2981    def test_refleaks_in_task___init__(self):
2982        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2983        async def coro():
2984            pass
2985        task = self.new_task(self.loop, coro())
2986        self.loop.run_until_complete(task)
2987        refs_before = gettotalrefcount()
2988        for i in range(100):
2989            task.__init__(coro(), loop=self.loop)
2990            self.loop.run_until_complete(task)
2991        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2992
2993    def test_del__log_destroy_pending_segfault(self):
2994        async def coro():
2995            pass
2996        task = self.new_task(self.loop, coro())
2997        self.loop.run_until_complete(task)
2998        with self.assertRaises(AttributeError):
2999            del task._log_destroy_pending
3000
3001
3002@unittest.skipUnless(hasattr(futures, '_CFuture') and
3003                     hasattr(tasks, '_CTask'),
3004                     'requires the C _asyncio module')
3005@add_subclass_tests
3006class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
3007
3008    Task = getattr(tasks, '_CTask', None)
3009    Future = getattr(futures, '_CFuture', None)
3010
3011
3012@unittest.skipUnless(hasattr(tasks, '_CTask'),
3013                     'requires the C _asyncio module')
3014@add_subclass_tests
3015class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
3016
3017    Task = getattr(tasks, '_CTask', None)
3018    Future = futures._PyFuture
3019
3020
3021@unittest.skipUnless(hasattr(futures, '_CFuture'),
3022                     'requires the C _asyncio module')
3023@add_subclass_tests
3024class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
3025
3026    Future = getattr(futures, '_CFuture', None)
3027    Task = tasks._PyTask
3028
3029
3030@unittest.skipUnless(hasattr(tasks, '_CTask'),
3031                     'requires the C _asyncio module')
3032class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
3033
3034    Task = getattr(tasks, '_CTask', None)
3035    Future = futures._PyFuture
3036
3037
3038@unittest.skipUnless(hasattr(futures, '_CFuture'),
3039                     'requires the C _asyncio module')
3040class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
3041
3042    Task = tasks._PyTask
3043    Future = getattr(futures, '_CFuture', None)
3044
3045
3046class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
3047                            test_utils.TestCase):
3048
3049    Task = tasks._PyTask
3050    Future = futures._PyFuture
3051
3052
3053@add_subclass_tests
3054class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
3055    Task = tasks._PyTask
3056    Future = futures._PyFuture
3057
3058
3059@unittest.skipUnless(hasattr(tasks, '_CTask'),
3060                     'requires the C _asyncio module')
3061class CTask_Future_Tests(test_utils.TestCase):
3062
3063    def test_foobar(self):
3064        class Fut(asyncio.Future):
3065            @property
3066            def get_loop(self):
3067                raise AttributeError
3068
3069        async def coro():
3070            await fut
3071            return 'spam'
3072
3073        self.loop = asyncio.new_event_loop()
3074        try:
3075            fut = Fut(loop=self.loop)
3076            self.loop.call_later(0.1, fut.set_result, 1)
3077            task = self.loop.create_task(coro())
3078            res = self.loop.run_until_complete(task)
3079        finally:
3080            self.loop.close()
3081
3082        self.assertEqual(res, 'spam')
3083
3084
3085class BaseTaskIntrospectionTests:
3086    _register_task = None
3087    _unregister_task = None
3088    _enter_task = None
3089    _leave_task = None
3090
3091    def test__register_task_1(self):
3092        class TaskLike:
3093            @property
3094            def _loop(self):
3095                return loop
3096
3097            def done(self):
3098                return False
3099
3100        task = TaskLike()
3101        loop = mock.Mock()
3102
3103        self.assertEqual(asyncio.all_tasks(loop), set())
3104        self._register_task(task)
3105        self.assertEqual(asyncio.all_tasks(loop), {task})
3106        self._unregister_task(task)
3107
3108    def test__register_task_2(self):
3109        class TaskLike:
3110            def get_loop(self):
3111                return loop
3112
3113            def done(self):
3114                return False
3115
3116        task = TaskLike()
3117        loop = mock.Mock()
3118
3119        self.assertEqual(asyncio.all_tasks(loop), set())
3120        self._register_task(task)
3121        self.assertEqual(asyncio.all_tasks(loop), {task})
3122        self._unregister_task(task)
3123
3124    def test__register_task_3(self):
3125        class TaskLike:
3126            def get_loop(self):
3127                return loop
3128
3129            def done(self):
3130                return True
3131
3132        task = TaskLike()
3133        loop = mock.Mock()
3134
3135        self.assertEqual(asyncio.all_tasks(loop), set())
3136        self._register_task(task)
3137        self.assertEqual(asyncio.all_tasks(loop), set())
3138        self._unregister_task(task)
3139
3140    def test__enter_task(self):
3141        task = mock.Mock()
3142        loop = mock.Mock()
3143        self.assertIsNone(asyncio.current_task(loop))
3144        self._enter_task(loop, task)
3145        self.assertIs(asyncio.current_task(loop), task)
3146        self._leave_task(loop, task)
3147
3148    def test__enter_task_failure(self):
3149        task1 = mock.Mock()
3150        task2 = mock.Mock()
3151        loop = mock.Mock()
3152        self._enter_task(loop, task1)
3153        with self.assertRaises(RuntimeError):
3154            self._enter_task(loop, task2)
3155        self.assertIs(asyncio.current_task(loop), task1)
3156        self._leave_task(loop, task1)
3157
3158    def test__leave_task(self):
3159        task = mock.Mock()
3160        loop = mock.Mock()
3161        self._enter_task(loop, task)
3162        self._leave_task(loop, task)
3163        self.assertIsNone(asyncio.current_task(loop))
3164
3165    def test__leave_task_failure1(self):
3166        task1 = mock.Mock()
3167        task2 = mock.Mock()
3168        loop = mock.Mock()
3169        self._enter_task(loop, task1)
3170        with self.assertRaises(RuntimeError):
3171            self._leave_task(loop, task2)
3172        self.assertIs(asyncio.current_task(loop), task1)
3173        self._leave_task(loop, task1)
3174
3175    def test__leave_task_failure2(self):
3176        task = mock.Mock()
3177        loop = mock.Mock()
3178        with self.assertRaises(RuntimeError):
3179            self._leave_task(loop, task)
3180        self.assertIsNone(asyncio.current_task(loop))
3181
3182    def test__unregister_task(self):
3183        task = mock.Mock()
3184        loop = mock.Mock()
3185        task.get_loop = lambda: loop
3186        self._register_task(task)
3187        self._unregister_task(task)
3188        self.assertEqual(asyncio.all_tasks(loop), set())
3189
3190    def test__unregister_task_not_registered(self):
3191        task = mock.Mock()
3192        loop = mock.Mock()
3193        self._unregister_task(task)
3194        self.assertEqual(asyncio.all_tasks(loop), set())
3195
3196
3197class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3198    _register_task = staticmethod(tasks._py_register_task)
3199    _unregister_task = staticmethod(tasks._py_unregister_task)
3200    _enter_task = staticmethod(tasks._py_enter_task)
3201    _leave_task = staticmethod(tasks._py_leave_task)
3202
3203
3204@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
3205                     'requires the C _asyncio module')
3206class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3207    if hasattr(tasks, '_c_register_task'):
3208        _register_task = staticmethod(tasks._c_register_task)
3209        _unregister_task = staticmethod(tasks._c_unregister_task)
3210        _enter_task = staticmethod(tasks._c_enter_task)
3211        _leave_task = staticmethod(tasks._c_leave_task)
3212    else:
3213        _register_task = _unregister_task = _enter_task = _leave_task = None
3214
3215
3216class BaseCurrentLoopTests:
3217
3218    def setUp(self):
3219        super().setUp()
3220        self.loop = asyncio.new_event_loop()
3221        self.set_event_loop(self.loop)
3222
3223    def new_task(self, coro):
3224        raise NotImplementedError
3225
3226    def test_current_task_no_running_loop(self):
3227        self.assertIsNone(asyncio.current_task(loop=self.loop))
3228
3229    def test_current_task_no_running_loop_implicit(self):
3230        with self.assertRaises(RuntimeError):
3231            asyncio.current_task()
3232
3233    def test_current_task_with_implicit_loop(self):
3234        async def coro():
3235            self.assertIs(asyncio.current_task(loop=self.loop), task)
3236
3237            self.assertIs(asyncio.current_task(None), task)
3238            self.assertIs(asyncio.current_task(), task)
3239
3240        task = self.new_task(coro())
3241        self.loop.run_until_complete(task)
3242        self.assertIsNone(asyncio.current_task(loop=self.loop))
3243
3244
3245class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3246
3247    def new_task(self, coro):
3248        return tasks._PyTask(coro, loop=self.loop)
3249
3250
3251@unittest.skipUnless(hasattr(tasks, '_CTask'),
3252                     'requires the C _asyncio module')
3253class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3254
3255    def new_task(self, coro):
3256        return getattr(tasks, '_CTask')(coro, loop=self.loop)
3257
3258
3259class GenericTaskTests(test_utils.TestCase):
3260
3261    def test_future_subclass(self):
3262        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
3263
3264    @support.cpython_only
3265    def test_asyncio_module_compiled(self):
3266        # Because of circular imports it's easy to make _asyncio
3267        # module non-importable.  This is a simple test that will
3268        # fail on systems where C modules were successfully compiled
3269        # (hence the test for _functools etc), but _asyncio somehow didn't.
3270        try:
3271            import _functools
3272            import _json
3273            import _pickle
3274        except ImportError:
3275            self.skipTest('C modules are not available')
3276        else:
3277            try:
3278                import _asyncio
3279            except ImportError:
3280                self.fail('_asyncio module is missing')
3281
3282
3283class GatherTestsBase:
3284
3285    def setUp(self):
3286        super().setUp()
3287        self.one_loop = self.new_test_loop()
3288        self.other_loop = self.new_test_loop()
3289        self.set_event_loop(self.one_loop, cleanup=False)
3290
3291    def _run_loop(self, loop):
3292        while loop._ready:
3293            test_utils.run_briefly(loop)
3294
3295    def _check_success(self, **kwargs):
3296        a, b, c = [self.one_loop.create_future() for i in range(3)]
3297        fut = self._gather(*self.wrap_futures(a, b, c), **kwargs)
3298        cb = test_utils.MockCallback()
3299        fut.add_done_callback(cb)
3300        b.set_result(1)
3301        a.set_result(2)
3302        self._run_loop(self.one_loop)
3303        self.assertEqual(cb.called, False)
3304        self.assertFalse(fut.done())
3305        c.set_result(3)
3306        self._run_loop(self.one_loop)
3307        cb.assert_called_once_with(fut)
3308        self.assertEqual(fut.result(), [2, 1, 3])
3309
3310    def test_success(self):
3311        self._check_success()
3312        self._check_success(return_exceptions=False)
3313
3314    def test_result_exception_success(self):
3315        self._check_success(return_exceptions=True)
3316
3317    def test_one_exception(self):
3318        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3319        fut = self._gather(*self.wrap_futures(a, b, c, d, e))
3320        cb = test_utils.MockCallback()
3321        fut.add_done_callback(cb)
3322        exc = ZeroDivisionError()
3323        a.set_result(1)
3324        b.set_exception(exc)
3325        self._run_loop(self.one_loop)
3326        self.assertTrue(fut.done())
3327        cb.assert_called_once_with(fut)
3328        self.assertIs(fut.exception(), exc)
3329        # Does nothing
3330        c.set_result(3)
3331        d.cancel()
3332        e.set_exception(RuntimeError())
3333        e.exception()
3334
3335    def test_return_exceptions(self):
3336        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
3337        fut = self._gather(*self.wrap_futures(a, b, c, d),
3338                           return_exceptions=True)
3339        cb = test_utils.MockCallback()
3340        fut.add_done_callback(cb)
3341        exc = ZeroDivisionError()
3342        exc2 = RuntimeError()
3343        b.set_result(1)
3344        c.set_exception(exc)
3345        a.set_result(3)
3346        self._run_loop(self.one_loop)
3347        self.assertFalse(fut.done())
3348        d.set_exception(exc2)
3349        self._run_loop(self.one_loop)
3350        self.assertTrue(fut.done())
3351        cb.assert_called_once_with(fut)
3352        self.assertEqual(fut.result(), [3, 1, exc, exc2])
3353
3354    def test_env_var_debug(self):
3355        code = '\n'.join((
3356            'import asyncio.coroutines',
3357            'print(asyncio.coroutines._DEBUG)'))
3358
3359        # Test with -E to not fail if the unit test was run with
3360        # PYTHONASYNCIODEBUG set to a non-empty string
3361        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
3362        self.assertEqual(stdout.rstrip(), b'False')
3363
3364        sts, stdout, stderr = assert_python_ok('-c', code,
3365                                               PYTHONASYNCIODEBUG='',
3366                                               PYTHONDEVMODE='')
3367        self.assertEqual(stdout.rstrip(), b'False')
3368
3369        sts, stdout, stderr = assert_python_ok('-c', code,
3370                                               PYTHONASYNCIODEBUG='1',
3371                                               PYTHONDEVMODE='')
3372        self.assertEqual(stdout.rstrip(), b'True')
3373
3374        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
3375                                               PYTHONASYNCIODEBUG='1',
3376                                               PYTHONDEVMODE='')
3377        self.assertEqual(stdout.rstrip(), b'False')
3378
3379        # -X dev
3380        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
3381                                               '-c', code)
3382        self.assertEqual(stdout.rstrip(), b'True')
3383
3384
3385class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
3386
3387    def wrap_futures(self, *futures):
3388        return futures
3389
3390    def _gather(self, *args, **kwargs):
3391        return asyncio.gather(*args, **kwargs)
3392
3393    def test_constructor_empty_sequence_without_loop(self):
3394        with self.assertWarns(DeprecationWarning) as cm:
3395            with self.assertRaises(RuntimeError):
3396                asyncio.gather()
3397        self.assertEqual(cm.warnings[0].filename, __file__)
3398
3399    def test_constructor_empty_sequence_use_running_loop(self):
3400        async def gather():
3401            return asyncio.gather()
3402        fut = self.one_loop.run_until_complete(gather())
3403        self.assertIsInstance(fut, asyncio.Future)
3404        self.assertIs(fut._loop, self.one_loop)
3405        self._run_loop(self.one_loop)
3406        self.assertTrue(fut.done())
3407        self.assertEqual(fut.result(), [])
3408
3409    def test_constructor_empty_sequence_use_global_loop(self):
3410        # Deprecated in 3.10
3411        asyncio.set_event_loop(self.one_loop)
3412        self.addCleanup(asyncio.set_event_loop, None)
3413        with self.assertWarns(DeprecationWarning) as cm:
3414            fut = asyncio.gather()
3415        self.assertEqual(cm.warnings[0].filename, __file__)
3416        self.assertIsInstance(fut, asyncio.Future)
3417        self.assertIs(fut._loop, self.one_loop)
3418        self._run_loop(self.one_loop)
3419        self.assertTrue(fut.done())
3420        self.assertEqual(fut.result(), [])
3421
3422    def test_constructor_heterogenous_futures(self):
3423        fut1 = self.one_loop.create_future()
3424        fut2 = self.other_loop.create_future()
3425        with self.assertRaises(ValueError):
3426            asyncio.gather(fut1, fut2)
3427
3428    def test_constructor_homogenous_futures(self):
3429        children = [self.other_loop.create_future() for i in range(3)]
3430        fut = asyncio.gather(*children)
3431        self.assertIs(fut._loop, self.other_loop)
3432        self._run_loop(self.other_loop)
3433        self.assertFalse(fut.done())
3434        fut = asyncio.gather(*children)
3435        self.assertIs(fut._loop, self.other_loop)
3436        self._run_loop(self.other_loop)
3437        self.assertFalse(fut.done())
3438
3439    def test_one_cancellation(self):
3440        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3441        fut = asyncio.gather(a, b, c, d, e)
3442        cb = test_utils.MockCallback()
3443        fut.add_done_callback(cb)
3444        a.set_result(1)
3445        b.cancel()
3446        self._run_loop(self.one_loop)
3447        self.assertTrue(fut.done())
3448        cb.assert_called_once_with(fut)
3449        self.assertFalse(fut.cancelled())
3450        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3451        # Does nothing
3452        c.set_result(3)
3453        d.cancel()
3454        e.set_exception(RuntimeError())
3455        e.exception()
3456
3457    def test_result_exception_one_cancellation(self):
3458        a, b, c, d, e, f = [self.one_loop.create_future()
3459                            for i in range(6)]
3460        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3461        cb = test_utils.MockCallback()
3462        fut.add_done_callback(cb)
3463        a.set_result(1)
3464        zde = ZeroDivisionError()
3465        b.set_exception(zde)
3466        c.cancel()
3467        self._run_loop(self.one_loop)
3468        self.assertFalse(fut.done())
3469        d.set_result(3)
3470        e.cancel()
3471        rte = RuntimeError()
3472        f.set_exception(rte)
3473        res = self.one_loop.run_until_complete(fut)
3474        self.assertIsInstance(res[2], asyncio.CancelledError)
3475        self.assertIsInstance(res[4], asyncio.CancelledError)
3476        res[2] = res[4] = None
3477        self.assertEqual(res, [1, zde, None, 3, None, rte])
3478        cb.assert_called_once_with(fut)
3479
3480
3481class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3482
3483    def wrap_futures(self, *futures):
3484        coros = []
3485        for fut in futures:
3486            async def coro(fut=fut):
3487                return await fut
3488            coros.append(coro())
3489        return coros
3490
3491    def _gather(self, *args, **kwargs):
3492        async def coro():
3493            return asyncio.gather(*args, **kwargs)
3494        return self.one_loop.run_until_complete(coro())
3495
3496    def test_constructor_without_loop(self):
3497        async def coro():
3498            return 'abc'
3499        gen1 = coro()
3500        self.addCleanup(gen1.close)
3501        gen2 = coro()
3502        self.addCleanup(gen2.close)
3503        with self.assertWarns(DeprecationWarning) as cm:
3504            with self.assertRaises(RuntimeError):
3505                asyncio.gather(gen1, gen2)
3506        self.assertEqual(cm.warnings[0].filename, __file__)
3507
3508    def test_constructor_use_running_loop(self):
3509        async def coro():
3510            return 'abc'
3511        gen1 = coro()
3512        gen2 = coro()
3513        async def gather():
3514            return asyncio.gather(gen1, gen2)
3515        fut = self.one_loop.run_until_complete(gather())
3516        self.assertIs(fut._loop, self.one_loop)
3517        self.one_loop.run_until_complete(fut)
3518
3519    def test_constructor_use_global_loop(self):
3520        # Deprecated in 3.10
3521        async def coro():
3522            return 'abc'
3523        asyncio.set_event_loop(self.other_loop)
3524        self.addCleanup(asyncio.set_event_loop, None)
3525        gen1 = coro()
3526        gen2 = coro()
3527        with self.assertWarns(DeprecationWarning) as cm:
3528            fut = asyncio.gather(gen1, gen2)
3529        self.assertEqual(cm.warnings[0].filename, __file__)
3530        self.assertIs(fut._loop, self.other_loop)
3531        self.other_loop.run_until_complete(fut)
3532
3533    def test_duplicate_coroutines(self):
3534        with self.assertWarns(DeprecationWarning):
3535            @asyncio.coroutine
3536            def coro(s):
3537                return s
3538        c = coro('abc')
3539        fut = self._gather(c, c, coro('def'), c)
3540        self._run_loop(self.one_loop)
3541        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3542
3543    def test_cancellation_broadcast(self):
3544        # Cancelling outer() cancels all children.
3545        proof = 0
3546        waiter = self.one_loop.create_future()
3547
3548        async def inner():
3549            nonlocal proof
3550            await waiter
3551            proof += 1
3552
3553        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3554        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3555        gatherer = None
3556
3557        async def outer():
3558            nonlocal proof, gatherer
3559            gatherer = asyncio.gather(child1, child2)
3560            await gatherer
3561            proof += 100
3562
3563        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3564        test_utils.run_briefly(self.one_loop)
3565        self.assertTrue(f.cancel())
3566        with self.assertRaises(asyncio.CancelledError):
3567            self.one_loop.run_until_complete(f)
3568        self.assertFalse(gatherer.cancel())
3569        self.assertTrue(waiter.cancelled())
3570        self.assertTrue(child1.cancelled())
3571        self.assertTrue(child2.cancelled())
3572        test_utils.run_briefly(self.one_loop)
3573        self.assertEqual(proof, 0)
3574
3575    def test_exception_marking(self):
3576        # Test for the first line marked "Mark exception retrieved."
3577
3578        async def inner(f):
3579            await f
3580            raise RuntimeError('should not be ignored')
3581
3582        a = self.one_loop.create_future()
3583        b = self.one_loop.create_future()
3584
3585        async def outer():
3586            await asyncio.gather(inner(a), inner(b))
3587
3588        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3589        test_utils.run_briefly(self.one_loop)
3590        a.set_result(None)
3591        test_utils.run_briefly(self.one_loop)
3592        b.set_result(None)
3593        test_utils.run_briefly(self.one_loop)
3594        self.assertIsInstance(f.exception(), RuntimeError)
3595
3596    def test_issue46672(self):
3597        with mock.patch(
3598            'asyncio.base_events.BaseEventLoop.call_exception_handler',
3599        ):
3600            async def coro(s):
3601                return s
3602            c = coro('abc')
3603
3604            with self.assertRaises(TypeError):
3605                self._gather(c, {})
3606            self._run_loop(self.one_loop)
3607            # NameError should not happen:
3608            self.one_loop.call_exception_handler.assert_not_called()
3609
3610
3611class RunCoroutineThreadsafeTests(test_utils.TestCase):
3612    """Test case for asyncio.run_coroutine_threadsafe."""
3613
3614    def setUp(self):
3615        super().setUp()
3616        self.loop = asyncio.new_event_loop()
3617        self.set_event_loop(self.loop) # Will cleanup properly
3618
3619    async def add(self, a, b, fail=False, cancel=False):
3620        """Wait 0.05 second and return a + b."""
3621        await asyncio.sleep(0.05)
3622        if fail:
3623            raise RuntimeError("Fail!")
3624        if cancel:
3625            asyncio.current_task(self.loop).cancel()
3626            await asyncio.sleep(0)
3627        return a + b
3628
3629    def target(self, fail=False, cancel=False, timeout=None,
3630               advance_coro=False):
3631        """Run add coroutine in the event loop."""
3632        coro = self.add(1, 2, fail=fail, cancel=cancel)
3633        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3634        if advance_coro:
3635            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3636            # otherwise it spills errors and breaks **other** unittests, since
3637            # 'target' is interacting with threads.
3638
3639            # With this call, `coro` will be advanced, so that
3640            # CoroWrapper.__del__ won't do anything when asyncio tests run
3641            # in debug mode.
3642            self.loop.call_soon_threadsafe(coro.send, None)
3643        try:
3644            return future.result(timeout)
3645        finally:
3646            future.done() or future.cancel()
3647
3648    def test_run_coroutine_threadsafe(self):
3649        """Test coroutine submission from a thread to an event loop."""
3650        future = self.loop.run_in_executor(None, self.target)
3651        result = self.loop.run_until_complete(future)
3652        self.assertEqual(result, 3)
3653
3654    def test_run_coroutine_threadsafe_with_exception(self):
3655        """Test coroutine submission from a thread to an event loop
3656        when an exception is raised."""
3657        future = self.loop.run_in_executor(None, self.target, True)
3658        with self.assertRaises(RuntimeError) as exc_context:
3659            self.loop.run_until_complete(future)
3660        self.assertIn("Fail!", exc_context.exception.args)
3661
3662    def test_run_coroutine_threadsafe_with_timeout(self):
3663        """Test coroutine submission from a thread to an event loop
3664        when a timeout is raised."""
3665        callback = lambda: self.target(timeout=0)
3666        future = self.loop.run_in_executor(None, callback)
3667        with self.assertRaises(asyncio.TimeoutError):
3668            self.loop.run_until_complete(future)
3669        test_utils.run_briefly(self.loop)
3670        # Check that there's no pending task (add has been cancelled)
3671        for task in asyncio.all_tasks(self.loop):
3672            self.assertTrue(task.done())
3673
3674    def test_run_coroutine_threadsafe_task_cancelled(self):
3675        """Test coroutine submission from a thread to an event loop
3676        when the task is cancelled."""
3677        callback = lambda: self.target(cancel=True)
3678        future = self.loop.run_in_executor(None, callback)
3679        with self.assertRaises(asyncio.CancelledError):
3680            self.loop.run_until_complete(future)
3681
3682    def test_run_coroutine_threadsafe_task_factory_exception(self):
3683        """Test coroutine submission from a thread to an event loop
3684        when the task factory raise an exception."""
3685
3686        def task_factory(loop, coro):
3687            raise NameError
3688
3689        run = self.loop.run_in_executor(
3690            None, lambda: self.target(advance_coro=True))
3691
3692        # Set exception handler
3693        callback = test_utils.MockCallback()
3694        self.loop.set_exception_handler(callback)
3695
3696        # Set corrupted task factory
3697        self.addCleanup(self.loop.set_task_factory,
3698                        self.loop.get_task_factory())
3699        self.loop.set_task_factory(task_factory)
3700
3701        # Run event loop
3702        with self.assertRaises(NameError) as exc_context:
3703            self.loop.run_until_complete(run)
3704
3705        # Check exceptions
3706        self.assertEqual(len(callback.call_args_list), 1)
3707        (loop, context), kwargs = callback.call_args
3708        self.assertEqual(context['exception'], exc_context.exception)
3709
3710
3711class SleepTests(test_utils.TestCase):
3712    def setUp(self):
3713        super().setUp()
3714        self.loop = asyncio.new_event_loop()
3715        self.set_event_loop(self.loop)
3716
3717    def tearDown(self):
3718        self.loop.close()
3719        self.loop = None
3720        super().tearDown()
3721
3722    def test_sleep_zero(self):
3723        result = 0
3724
3725        def inc_result(num):
3726            nonlocal result
3727            result += num
3728
3729        async def coro():
3730            self.loop.call_soon(inc_result, 1)
3731            self.assertEqual(result, 0)
3732            num = await asyncio.sleep(0, result=10)
3733            self.assertEqual(result, 1) # inc'ed by call_soon
3734            inc_result(num) # num should be 11
3735
3736        self.loop.run_until_complete(coro())
3737        self.assertEqual(result, 11)
3738
3739
3740class WaitTests(test_utils.TestCase):
3741    def setUp(self):
3742        super().setUp()
3743        self.loop = asyncio.new_event_loop()
3744        self.set_event_loop(self.loop)
3745
3746    def tearDown(self):
3747        self.loop.close()
3748        self.loop = None
3749        super().tearDown()
3750
3751    def test_coro_is_deprecated_in_wait(self):
3752        # Remove test when passing coros to asyncio.wait() is removed in 3.11
3753        with self.assertWarns(DeprecationWarning):
3754            self.loop.run_until_complete(
3755                asyncio.wait([coroutine_function()]))
3756
3757        task = self.loop.create_task(coroutine_function())
3758        with self.assertWarns(DeprecationWarning):
3759            self.loop.run_until_complete(
3760                asyncio.wait([task, coroutine_function()]))
3761
3762
3763class CompatibilityTests(test_utils.TestCase):
3764    # Tests for checking a bridge between old-styled coroutines
3765    # and async/await syntax
3766
3767    def setUp(self):
3768        super().setUp()
3769        self.loop = asyncio.new_event_loop()
3770        self.set_event_loop(self.loop)
3771
3772    def tearDown(self):
3773        self.loop.close()
3774        self.loop = None
3775        super().tearDown()
3776
3777    def test_yield_from_awaitable(self):
3778
3779        with self.assertWarns(DeprecationWarning):
3780            @asyncio.coroutine
3781            def coro():
3782                yield from asyncio.sleep(0)
3783                return 'ok'
3784
3785        result = self.loop.run_until_complete(coro())
3786        self.assertEqual('ok', result)
3787
3788    def test_await_old_style_coro(self):
3789
3790        with self.assertWarns(DeprecationWarning):
3791            @asyncio.coroutine
3792            def coro1():
3793                return 'ok1'
3794
3795        with self.assertWarns(DeprecationWarning):
3796            @asyncio.coroutine
3797            def coro2():
3798                yield from asyncio.sleep(0)
3799                return 'ok2'
3800
3801        async def inner():
3802            return await asyncio.gather(coro1(), coro2())
3803
3804        result = self.loop.run_until_complete(inner())
3805        self.assertEqual(['ok1', 'ok2'], result)
3806
3807    def test_debug_mode_interop(self):
3808        # https://bugs.python.org/issue32636
3809        code = textwrap.dedent("""
3810            import asyncio
3811
3812            async def native_coro():
3813                pass
3814
3815            @asyncio.coroutine
3816            def old_style_coro():
3817                yield from native_coro()
3818
3819            asyncio.run(old_style_coro())
3820        """)
3821
3822        assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
3823                         PYTHONASYNCIODEBUG="1")
3824
3825
3826if __name__ == '__main__':
3827    unittest.main()
3828