• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import types
14import unittest
15import weakref
16from unittest import mock
17
18import asyncio
19from asyncio import coroutines
20from asyncio import futures
21from asyncio import tasks
22from test.test_asyncio import utils as test_utils
23from test import support
24from test.support.script_helper import assert_python_ok
25
26
27def tearDownModule():
28    asyncio.set_event_loop_policy(None)
29
30
31async def coroutine_function():
32    pass
33
34
35@contextlib.contextmanager
36def set_coroutine_debug(enabled):
37    coroutines = asyncio.coroutines
38
39    old_debug = coroutines._DEBUG
40    try:
41        coroutines._DEBUG = enabled
42        yield
43    finally:
44        coroutines._DEBUG = old_debug
45
46
47def format_coroutine(qualname, state, src, source_traceback, generator=False):
48    if generator:
49        state = '%s' % state
50    else:
51        state = '%s, defined' % state
52    if source_traceback is not None:
53        frame = source_traceback[-1]
54        return ('coro=<%s() %s at %s> created at %s:%s'
55                % (qualname, state, src, frame[0], frame[1]))
56    else:
57        return 'coro=<%s() %s at %s>' % (qualname, state, src)
58
59
60class Dummy:
61
62    def __repr__(self):
63        return '<Dummy>'
64
65    def __call__(self, *args):
66        pass
67
68
69class CoroLikeObject:
70    def send(self, v):
71        raise StopIteration(42)
72
73    def throw(self, *exc):
74        pass
75
76    def close(self):
77        pass
78
79    def __await__(self):
80        return self
81
82
83class BaseTaskTests:
84
85    Task = None
86    Future = None
87
88    def new_task(self, loop, coro, name='TestTask'):
89        return self.__class__.Task(coro, loop=loop, name=name)
90
91    def new_future(self, loop):
92        return self.__class__.Future(loop=loop)
93
94    def setUp(self):
95        super().setUp()
96        self.loop = self.new_test_loop()
97        self.loop.set_task_factory(self.new_task)
98        self.loop.create_future = lambda: self.new_future(self.loop)
99
100    def test_task_del_collect(self):
101        class Evil:
102            def __del__(self):
103                gc.collect()
104
105        async def run():
106            return Evil()
107
108        self.loop.run_until_complete(
109            asyncio.gather(*[
110                self.new_task(self.loop, run()) for _ in range(100)
111            ], loop=self.loop))
112
113    def test_other_loop_future(self):
114        other_loop = asyncio.new_event_loop()
115        fut = self.new_future(other_loop)
116
117        async def run(fut):
118            await fut
119
120        try:
121            with self.assertRaisesRegex(RuntimeError,
122                                        r'Task .* got Future .* attached'):
123                self.loop.run_until_complete(run(fut))
124        finally:
125            other_loop.close()
126
127    def test_task_awaits_on_itself(self):
128
129        async def test():
130            await task
131
132        task = asyncio.ensure_future(test(), loop=self.loop)
133
134        with self.assertRaisesRegex(RuntimeError,
135                                    'Task cannot await on itself'):
136            self.loop.run_until_complete(task)
137
138    def test_task_class(self):
139        async def notmuch():
140            return 'ok'
141        t = self.new_task(self.loop, notmuch())
142        self.loop.run_until_complete(t)
143        self.assertTrue(t.done())
144        self.assertEqual(t.result(), 'ok')
145        self.assertIs(t._loop, self.loop)
146        self.assertIs(t.get_loop(), self.loop)
147
148        loop = asyncio.new_event_loop()
149        self.set_event_loop(loop)
150        t = self.new_task(loop, notmuch())
151        self.assertIs(t._loop, loop)
152        loop.run_until_complete(t)
153        loop.close()
154
155    def test_ensure_future_coroutine(self):
156        with self.assertWarns(DeprecationWarning):
157            @asyncio.coroutine
158            def notmuch():
159                return 'ok'
160        t = asyncio.ensure_future(notmuch(), loop=self.loop)
161        self.loop.run_until_complete(t)
162        self.assertTrue(t.done())
163        self.assertEqual(t.result(), 'ok')
164        self.assertIs(t._loop, self.loop)
165
166        loop = asyncio.new_event_loop()
167        self.set_event_loop(loop)
168        t = asyncio.ensure_future(notmuch(), loop=loop)
169        self.assertIs(t._loop, loop)
170        loop.run_until_complete(t)
171        loop.close()
172
173    def test_ensure_future_future(self):
174        f_orig = self.new_future(self.loop)
175        f_orig.set_result('ko')
176
177        f = asyncio.ensure_future(f_orig)
178        self.loop.run_until_complete(f)
179        self.assertTrue(f.done())
180        self.assertEqual(f.result(), 'ko')
181        self.assertIs(f, f_orig)
182
183        loop = asyncio.new_event_loop()
184        self.set_event_loop(loop)
185
186        with self.assertRaises(ValueError):
187            f = asyncio.ensure_future(f_orig, loop=loop)
188
189        loop.close()
190
191        f = asyncio.ensure_future(f_orig, loop=self.loop)
192        self.assertIs(f, f_orig)
193
194    def test_ensure_future_task(self):
195        async def notmuch():
196            return 'ok'
197        t_orig = self.new_task(self.loop, notmuch())
198        t = asyncio.ensure_future(t_orig)
199        self.loop.run_until_complete(t)
200        self.assertTrue(t.done())
201        self.assertEqual(t.result(), 'ok')
202        self.assertIs(t, t_orig)
203
204        loop = asyncio.new_event_loop()
205        self.set_event_loop(loop)
206
207        with self.assertRaises(ValueError):
208            t = asyncio.ensure_future(t_orig, loop=loop)
209
210        loop.close()
211
212        t = asyncio.ensure_future(t_orig, loop=self.loop)
213        self.assertIs(t, t_orig)
214
215    def test_ensure_future_awaitable(self):
216        class Aw:
217            def __init__(self, coro):
218                self.coro = coro
219            def __await__(self):
220                return (yield from self.coro)
221
222        with self.assertWarns(DeprecationWarning):
223            @asyncio.coroutine
224            def coro():
225                return 'ok'
226
227        loop = asyncio.new_event_loop()
228        self.set_event_loop(loop)
229        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
230        loop.run_until_complete(fut)
231        assert fut.result() == 'ok'
232
233    def test_ensure_future_neither(self):
234        with self.assertRaises(TypeError):
235            asyncio.ensure_future('ok')
236
237    def test_ensure_future_error_msg(self):
238        loop = asyncio.new_event_loop()
239        f = self.new_future(self.loop)
240        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
241                                    'different loop than the one specified as '
242                                    'the loop argument'):
243            asyncio.ensure_future(f, loop=loop)
244        loop.close()
245
246    def test_get_stack(self):
247        T = None
248
249        async def foo():
250            await bar()
251
252        async def bar():
253            # test get_stack()
254            f = T.get_stack(limit=1)
255            try:
256                self.assertEqual(f[0].f_code.co_name, 'foo')
257            finally:
258                f = None
259
260            # test print_stack()
261            file = io.StringIO()
262            T.print_stack(limit=1, file=file)
263            file.seek(0)
264            tb = file.read()
265            self.assertRegex(tb, r'foo\(\) running')
266
267        async def runner():
268            nonlocal T
269            T = asyncio.ensure_future(foo(), loop=self.loop)
270            await T
271
272        self.loop.run_until_complete(runner())
273
274    def test_task_repr(self):
275        self.loop.set_debug(False)
276
277        async def notmuch():
278            return 'abc'
279
280        # test coroutine function
281        self.assertEqual(notmuch.__name__, 'notmuch')
282        self.assertRegex(notmuch.__qualname__,
283                         r'\w+.test_task_repr.<locals>.notmuch')
284        self.assertEqual(notmuch.__module__, __name__)
285
286        filename, lineno = test_utils.get_function_source(notmuch)
287        src = "%s:%s" % (filename, lineno)
288
289        # test coroutine object
290        gen = notmuch()
291        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
292        self.assertEqual(gen.__name__, 'notmuch')
293        self.assertEqual(gen.__qualname__, coro_qualname)
294
295        # test pending Task
296        t = self.new_task(self.loop, gen)
297        t.add_done_callback(Dummy())
298
299        coro = format_coroutine(coro_qualname, 'running', src,
300                                t._source_traceback, generator=True)
301        self.assertEqual(repr(t),
302                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
303
304        # test cancelling Task
305        t.cancel()  # Does not take immediate effect!
306        self.assertEqual(repr(t),
307                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
308
309        # test cancelled Task
310        self.assertRaises(asyncio.CancelledError,
311                          self.loop.run_until_complete, t)
312        coro = format_coroutine(coro_qualname, 'done', src,
313                                t._source_traceback)
314        self.assertEqual(repr(t),
315                         "<Task cancelled name='TestTask' %s>" % coro)
316
317        # test finished Task
318        t = self.new_task(self.loop, notmuch())
319        self.loop.run_until_complete(t)
320        coro = format_coroutine(coro_qualname, 'done', src,
321                                t._source_traceback)
322        self.assertEqual(repr(t),
323                         "<Task finished name='TestTask' %s result='abc'>" % coro)
324
325    def test_task_repr_autogenerated(self):
326        async def notmuch():
327            return 123
328
329        t1 = self.new_task(self.loop, notmuch(), None)
330        t2 = self.new_task(self.loop, notmuch(), None)
331        self.assertNotEqual(repr(t1), repr(t2))
332
333        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
334        self.assertIsNotNone(match1)
335        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
336        self.assertIsNotNone(match2)
337
338        # Autogenerated task names should have monotonically increasing numbers
339        self.assertLess(int(match1.group(1)), int(match2.group(1)))
340        self.loop.run_until_complete(t1)
341        self.loop.run_until_complete(t2)
342
343    def test_task_repr_name_not_str(self):
344        async def notmuch():
345            return 123
346
347        t = self.new_task(self.loop, notmuch())
348        t.set_name({6})
349        self.assertEqual(t.get_name(), '{6}')
350        self.loop.run_until_complete(t)
351
352    def test_task_repr_coro_decorator(self):
353        self.loop.set_debug(False)
354
355        with self.assertWarns(DeprecationWarning):
356            @asyncio.coroutine
357            def notmuch():
358                # notmuch() function doesn't use yield from: it will be wrapped by
359                # @coroutine decorator
360                return 123
361
362        # test coroutine function
363        self.assertEqual(notmuch.__name__, 'notmuch')
364        self.assertRegex(notmuch.__qualname__,
365                         r'\w+.test_task_repr_coro_decorator'
366                         r'\.<locals>\.notmuch')
367        self.assertEqual(notmuch.__module__, __name__)
368
369        # test coroutine object
370        gen = notmuch()
371        # On Python >= 3.5, generators now inherit the name of the
372        # function, as expected, and have a qualified name (__qualname__
373        # attribute).
374        coro_name = 'notmuch'
375        coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
376                         '.<locals>.notmuch')
377        self.assertEqual(gen.__name__, coro_name)
378        self.assertEqual(gen.__qualname__, coro_qualname)
379
380        # test repr(CoroWrapper)
381        if coroutines._DEBUG:
382            # format the coroutine object
383            if coroutines._DEBUG:
384                filename, lineno = test_utils.get_function_source(notmuch)
385                frame = gen._source_traceback[-1]
386                coro = ('%s() running, defined at %s:%s, created at %s:%s'
387                        % (coro_qualname, filename, lineno,
388                           frame[0], frame[1]))
389            else:
390                code = gen.gi_code
391                coro = ('%s() running at %s:%s'
392                        % (coro_qualname, code.co_filename,
393                           code.co_firstlineno))
394
395            self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
396
397        # test pending Task
398        t = self.new_task(self.loop, gen)
399        t.add_done_callback(Dummy())
400
401        # format the coroutine object
402        if coroutines._DEBUG:
403            src = '%s:%s' % test_utils.get_function_source(notmuch)
404        else:
405            code = gen.gi_code
406            src = '%s:%s' % (code.co_filename, code.co_firstlineno)
407        coro = format_coroutine(coro_qualname, 'running', src,
408                                t._source_traceback,
409                                generator=not coroutines._DEBUG)
410        self.assertEqual(repr(t),
411                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
412        self.loop.run_until_complete(t)
413
414    def test_task_repr_wait_for(self):
415        self.loop.set_debug(False)
416
417        async def wait_for(fut):
418            return await fut
419
420        fut = self.new_future(self.loop)
421        task = self.new_task(self.loop, wait_for(fut))
422        test_utils.run_briefly(self.loop)
423        self.assertRegex(repr(task),
424                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
425
426        fut.set_result(None)
427        self.loop.run_until_complete(task)
428
429    def test_task_repr_partial_corowrapper(self):
430        # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
431        # coroutine is a partial function
432        with set_coroutine_debug(True):
433            self.loop.set_debug(True)
434
435            async def func(x, y):
436                await asyncio.sleep(0)
437
438            with self.assertWarns(DeprecationWarning):
439                partial_func = asyncio.coroutine(functools.partial(func, 1))
440            task = self.loop.create_task(partial_func(2))
441
442            # make warnings quiet
443            task._log_destroy_pending = False
444            self.addCleanup(task._coro.close)
445
446        coro_repr = repr(task._coro)
447        expected = (
448            r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
449            r'\.<locals>\.func at'
450        )
451        self.assertRegex(coro_repr, expected)
452
453    def test_task_basics(self):
454
455        async def outer():
456            a = await inner1()
457            b = await inner2()
458            return a+b
459
460        async def inner1():
461            return 42
462
463        async def inner2():
464            return 1000
465
466        t = outer()
467        self.assertEqual(self.loop.run_until_complete(t), 1042)
468
469    def test_cancel(self):
470
471        def gen():
472            when = yield
473            self.assertAlmostEqual(10.0, when)
474            yield 0
475
476        loop = self.new_test_loop(gen)
477
478        async def task():
479            await asyncio.sleep(10.0)
480            return 12
481
482        t = self.new_task(loop, task())
483        loop.call_soon(t.cancel)
484        with self.assertRaises(asyncio.CancelledError):
485            loop.run_until_complete(t)
486        self.assertTrue(t.done())
487        self.assertTrue(t.cancelled())
488        self.assertFalse(t.cancel())
489
490    def test_cancel_yield(self):
491        with self.assertWarns(DeprecationWarning):
492            @asyncio.coroutine
493            def task():
494                yield
495                yield
496                return 12
497
498        t = self.new_task(self.loop, task())
499        test_utils.run_briefly(self.loop)  # start coro
500        t.cancel()
501        self.assertRaises(
502            asyncio.CancelledError, self.loop.run_until_complete, t)
503        self.assertTrue(t.done())
504        self.assertTrue(t.cancelled())
505        self.assertFalse(t.cancel())
506
507    def test_cancel_inner_future(self):
508        f = self.new_future(self.loop)
509
510        async def task():
511            await f
512            return 12
513
514        t = self.new_task(self.loop, task())
515        test_utils.run_briefly(self.loop)  # start task
516        f.cancel()
517        with self.assertRaises(asyncio.CancelledError):
518            self.loop.run_until_complete(t)
519        self.assertTrue(f.cancelled())
520        self.assertTrue(t.cancelled())
521
522    def test_cancel_both_task_and_inner_future(self):
523        f = self.new_future(self.loop)
524
525        async def task():
526            await f
527            return 12
528
529        t = self.new_task(self.loop, task())
530        test_utils.run_briefly(self.loop)
531
532        f.cancel()
533        t.cancel()
534
535        with self.assertRaises(asyncio.CancelledError):
536            self.loop.run_until_complete(t)
537
538        self.assertTrue(t.done())
539        self.assertTrue(f.cancelled())
540        self.assertTrue(t.cancelled())
541
542    def test_cancel_task_catching(self):
543        fut1 = self.new_future(self.loop)
544        fut2 = self.new_future(self.loop)
545
546        async def task():
547            await fut1
548            try:
549                await fut2
550            except asyncio.CancelledError:
551                return 42
552
553        t = self.new_task(self.loop, task())
554        test_utils.run_briefly(self.loop)
555        self.assertIs(t._fut_waiter, fut1)  # White-box test.
556        fut1.set_result(None)
557        test_utils.run_briefly(self.loop)
558        self.assertIs(t._fut_waiter, fut2)  # White-box test.
559        t.cancel()
560        self.assertTrue(fut2.cancelled())
561        res = self.loop.run_until_complete(t)
562        self.assertEqual(res, 42)
563        self.assertFalse(t.cancelled())
564
565    def test_cancel_task_ignoring(self):
566        fut1 = self.new_future(self.loop)
567        fut2 = self.new_future(self.loop)
568        fut3 = self.new_future(self.loop)
569
570        async def task():
571            await fut1
572            try:
573                await fut2
574            except asyncio.CancelledError:
575                pass
576            res = await fut3
577            return res
578
579        t = self.new_task(self.loop, task())
580        test_utils.run_briefly(self.loop)
581        self.assertIs(t._fut_waiter, fut1)  # White-box test.
582        fut1.set_result(None)
583        test_utils.run_briefly(self.loop)
584        self.assertIs(t._fut_waiter, fut2)  # White-box test.
585        t.cancel()
586        self.assertTrue(fut2.cancelled())
587        test_utils.run_briefly(self.loop)
588        self.assertIs(t._fut_waiter, fut3)  # White-box test.
589        fut3.set_result(42)
590        res = self.loop.run_until_complete(t)
591        self.assertEqual(res, 42)
592        self.assertFalse(fut3.cancelled())
593        self.assertFalse(t.cancelled())
594
595    def test_cancel_current_task(self):
596        loop = asyncio.new_event_loop()
597        self.set_event_loop(loop)
598
599        async def task():
600            t.cancel()
601            self.assertTrue(t._must_cancel)  # White-box test.
602            # The sleep should be cancelled immediately.
603            await asyncio.sleep(100)
604            return 12
605
606        t = self.new_task(loop, task())
607        self.assertFalse(t.cancelled())
608        self.assertRaises(
609            asyncio.CancelledError, loop.run_until_complete, t)
610        self.assertTrue(t.done())
611        self.assertTrue(t.cancelled())
612        self.assertFalse(t._must_cancel)  # White-box test.
613        self.assertFalse(t.cancel())
614
615    def test_cancel_at_end(self):
616        """coroutine end right after task is cancelled"""
617        loop = asyncio.new_event_loop()
618        self.set_event_loop(loop)
619
620        async def task():
621            t.cancel()
622            self.assertTrue(t._must_cancel)  # White-box test.
623            return 12
624
625        t = self.new_task(loop, task())
626        self.assertFalse(t.cancelled())
627        self.assertRaises(
628            asyncio.CancelledError, loop.run_until_complete, t)
629        self.assertTrue(t.done())
630        self.assertTrue(t.cancelled())
631        self.assertFalse(t._must_cancel)  # White-box test.
632        self.assertFalse(t.cancel())
633
634    def test_cancel_awaited_task(self):
635        # This tests for a relatively rare condition when
636        # a task cancellation is requested for a task which is not
637        # currently blocked, such as a task cancelling itself.
638        # In this situation we must ensure that whatever next future
639        # or task the cancelled task blocks on is cancelled correctly
640        # as well.  See also bpo-34872.
641        loop = asyncio.new_event_loop()
642        self.addCleanup(lambda: loop.close())
643
644        task = nested_task = None
645        fut = self.new_future(loop)
646
647        async def nested():
648            await fut
649
650        async def coro():
651            nonlocal nested_task
652            # Create a sub-task and wait for it to run.
653            nested_task = self.new_task(loop, nested())
654            await asyncio.sleep(0)
655
656            # Request the current task to be cancelled.
657            task.cancel()
658            # Block on the nested task, which should be immediately
659            # cancelled.
660            await nested_task
661
662        task = self.new_task(loop, coro())
663        with self.assertRaises(asyncio.CancelledError):
664            loop.run_until_complete(task)
665
666        self.assertTrue(task.cancelled())
667        self.assertTrue(nested_task.cancelled())
668        self.assertTrue(fut.cancelled())
669
670    def test_stop_while_run_in_complete(self):
671
672        def gen():
673            when = yield
674            self.assertAlmostEqual(0.1, when)
675            when = yield 0.1
676            self.assertAlmostEqual(0.2, when)
677            when = yield 0.1
678            self.assertAlmostEqual(0.3, when)
679            yield 0.1
680
681        loop = self.new_test_loop(gen)
682
683        x = 0
684
685        async def task():
686            nonlocal x
687            while x < 10:
688                await asyncio.sleep(0.1)
689                x += 1
690                if x == 2:
691                    loop.stop()
692
693        t = self.new_task(loop, task())
694        with self.assertRaises(RuntimeError) as cm:
695            loop.run_until_complete(t)
696        self.assertEqual(str(cm.exception),
697                         'Event loop stopped before Future completed.')
698        self.assertFalse(t.done())
699        self.assertEqual(x, 2)
700        self.assertAlmostEqual(0.3, loop.time())
701
702        t.cancel()
703        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
704
705    def test_log_traceback(self):
706        async def coro():
707            pass
708
709        task = self.new_task(self.loop, coro())
710        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
711            task._log_traceback = True
712        self.loop.run_until_complete(task)
713
714    def test_wait_for_timeout_less_then_0_or_0_future_done(self):
715        def gen():
716            when = yield
717            self.assertAlmostEqual(0, when)
718
719        loop = self.new_test_loop(gen)
720
721        fut = self.new_future(loop)
722        fut.set_result('done')
723
724        ret = loop.run_until_complete(asyncio.wait_for(fut, 0))
725
726        self.assertEqual(ret, 'done')
727        self.assertTrue(fut.done())
728        self.assertAlmostEqual(0, loop.time())
729
730    def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
731        def gen():
732            when = yield
733            self.assertAlmostEqual(0, when)
734
735        loop = self.new_test_loop(gen)
736
737        foo_started = False
738
739        async def foo():
740            nonlocal foo_started
741            foo_started = True
742
743        with self.assertRaises(asyncio.TimeoutError):
744            loop.run_until_complete(asyncio.wait_for(foo(), 0))
745
746        self.assertAlmostEqual(0, loop.time())
747        self.assertEqual(foo_started, False)
748
749    def test_wait_for_timeout_less_then_0_or_0(self):
750        def gen():
751            when = yield
752            self.assertAlmostEqual(0.2, when)
753            when = yield 0
754            self.assertAlmostEqual(0, when)
755
756        for timeout in [0, -1]:
757            with self.subTest(timeout=timeout):
758                loop = self.new_test_loop(gen)
759
760                foo_running = None
761
762                async def foo():
763                    nonlocal foo_running
764                    foo_running = True
765                    try:
766                        await asyncio.sleep(0.2)
767                    finally:
768                        foo_running = False
769                    return 'done'
770
771                fut = self.new_task(loop, foo())
772
773                with self.assertRaises(asyncio.TimeoutError):
774                    loop.run_until_complete(asyncio.wait_for(fut, timeout))
775                self.assertTrue(fut.done())
776                # it should have been cancelled due to the timeout
777                self.assertTrue(fut.cancelled())
778                self.assertAlmostEqual(0, loop.time())
779                self.assertEqual(foo_running, False)
780
781    def test_wait_for(self):
782
783        def gen():
784            when = yield
785            self.assertAlmostEqual(0.2, when)
786            when = yield 0
787            self.assertAlmostEqual(0.1, when)
788            when = yield 0.1
789
790        loop = self.new_test_loop(gen)
791
792        foo_running = None
793
794        async def foo():
795            nonlocal foo_running
796            foo_running = True
797            try:
798                await asyncio.sleep(0.2)
799            finally:
800                foo_running = False
801            return 'done'
802
803        fut = self.new_task(loop, foo())
804
805        with self.assertRaises(asyncio.TimeoutError):
806            loop.run_until_complete(asyncio.wait_for(fut, 0.1))
807        self.assertTrue(fut.done())
808        # it should have been cancelled due to the timeout
809        self.assertTrue(fut.cancelled())
810        self.assertAlmostEqual(0.1, loop.time())
811        self.assertEqual(foo_running, False)
812
813    def test_wait_for_blocking(self):
814        loop = self.new_test_loop()
815
816        async def coro():
817            return 'done'
818
819        res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
820        self.assertEqual(res, 'done')
821
822    def test_wait_for_with_global_loop(self):
823
824        def gen():
825            when = yield
826            self.assertAlmostEqual(0.2, when)
827            when = yield 0
828            self.assertAlmostEqual(0.01, when)
829            yield 0.01
830
831        loop = self.new_test_loop(gen)
832
833        async def foo():
834            await asyncio.sleep(0.2)
835            return 'done'
836
837        asyncio.set_event_loop(loop)
838        try:
839            fut = self.new_task(loop, foo())
840            with self.assertRaises(asyncio.TimeoutError):
841                loop.run_until_complete(asyncio.wait_for(fut, 0.01))
842        finally:
843            asyncio.set_event_loop(None)
844
845        self.assertAlmostEqual(0.01, loop.time())
846        self.assertTrue(fut.done())
847        self.assertTrue(fut.cancelled())
848
849    def test_wait_for_race_condition(self):
850
851        def gen():
852            yield 0.1
853            yield 0.1
854            yield 0.1
855
856        loop = self.new_test_loop(gen)
857
858        fut = self.new_future(loop)
859        task = asyncio.wait_for(fut, timeout=0.2)
860        loop.call_later(0.1, fut.set_result, "ok")
861        res = loop.run_until_complete(task)
862        self.assertEqual(res, "ok")
863
864    def test_wait_for_waits_for_task_cancellation(self):
865        loop = asyncio.new_event_loop()
866        self.addCleanup(loop.close)
867
868        task_done = False
869
870        async def foo():
871            async def inner():
872                nonlocal task_done
873                try:
874                    await asyncio.sleep(0.2)
875                finally:
876                    task_done = True
877
878            inner_task = self.new_task(loop, inner())
879
880            with self.assertRaises(asyncio.TimeoutError):
881                await asyncio.wait_for(inner_task, timeout=0.1)
882
883            self.assertTrue(task_done)
884
885        loop.run_until_complete(foo())
886
887    def test_wait_for_self_cancellation(self):
888        loop = asyncio.new_event_loop()
889        self.addCleanup(loop.close)
890
891        async def foo():
892            async def inner():
893                try:
894                    await asyncio.sleep(0.3)
895                except asyncio.CancelledError:
896                    try:
897                        await asyncio.sleep(0.3)
898                    except asyncio.CancelledError:
899                        await asyncio.sleep(0.3)
900
901                return 42
902
903            inner_task = self.new_task(loop, inner())
904
905            wait = asyncio.wait_for(inner_task, timeout=0.1)
906
907            # Test that wait_for itself is properly cancellable
908            # even when the initial task holds up the initial cancellation.
909            task = self.new_task(loop, wait)
910            await asyncio.sleep(0.2)
911            task.cancel()
912
913            with self.assertRaises(asyncio.CancelledError):
914                await task
915
916            self.assertEqual(await inner_task, 42)
917
918        loop.run_until_complete(foo())
919
920    def test_wait(self):
921
922        def gen():
923            when = yield
924            self.assertAlmostEqual(0.1, when)
925            when = yield 0
926            self.assertAlmostEqual(0.15, when)
927            yield 0.15
928
929        loop = self.new_test_loop(gen)
930
931        a = self.new_task(loop, asyncio.sleep(0.1))
932        b = self.new_task(loop, asyncio.sleep(0.15))
933
934        async def foo():
935            done, pending = await asyncio.wait([b, a])
936            self.assertEqual(done, set([a, b]))
937            self.assertEqual(pending, set())
938            return 42
939
940        res = loop.run_until_complete(self.new_task(loop, foo()))
941        self.assertEqual(res, 42)
942        self.assertAlmostEqual(0.15, loop.time())
943
944        # Doing it again should take no time and exercise a different path.
945        res = loop.run_until_complete(self.new_task(loop, foo()))
946        self.assertAlmostEqual(0.15, loop.time())
947        self.assertEqual(res, 42)
948
949    def test_wait_with_global_loop(self):
950
951        def gen():
952            when = yield
953            self.assertAlmostEqual(0.01, when)
954            when = yield 0
955            self.assertAlmostEqual(0.015, when)
956            yield 0.015
957
958        loop = self.new_test_loop(gen)
959
960        a = self.new_task(loop, asyncio.sleep(0.01))
961        b = self.new_task(loop, asyncio.sleep(0.015))
962
963        async def foo():
964            done, pending = await asyncio.wait([b, a])
965            self.assertEqual(done, set([a, b]))
966            self.assertEqual(pending, set())
967            return 42
968
969        asyncio.set_event_loop(loop)
970        res = loop.run_until_complete(
971            self.new_task(loop, foo()))
972
973        self.assertEqual(res, 42)
974
975    def test_wait_duplicate_coroutines(self):
976
977        with self.assertWarns(DeprecationWarning):
978            @asyncio.coroutine
979            def coro(s):
980                return s
981        c = coro('test')
982
983        task =self.new_task(
984            self.loop,
985            asyncio.wait([c, c, coro('spam')]))
986
987        done, pending = self.loop.run_until_complete(task)
988
989        self.assertFalse(pending)
990        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
991
992    def test_wait_errors(self):
993        self.assertRaises(
994            ValueError, self.loop.run_until_complete,
995            asyncio.wait(set()))
996
997        # -1 is an invalid return_when value
998        sleep_coro = asyncio.sleep(10.0)
999        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1000        self.assertRaises(ValueError,
1001                          self.loop.run_until_complete, wait_coro)
1002
1003        sleep_coro.close()
1004
1005    def test_wait_first_completed(self):
1006
1007        def gen():
1008            when = yield
1009            self.assertAlmostEqual(10.0, when)
1010            when = yield 0
1011            self.assertAlmostEqual(0.1, when)
1012            yield 0.1
1013
1014        loop = self.new_test_loop(gen)
1015
1016        a = self.new_task(loop, asyncio.sleep(10.0))
1017        b = self.new_task(loop, asyncio.sleep(0.1))
1018        task = self.new_task(
1019            loop,
1020            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1021
1022        done, pending = loop.run_until_complete(task)
1023        self.assertEqual({b}, done)
1024        self.assertEqual({a}, pending)
1025        self.assertFalse(a.done())
1026        self.assertTrue(b.done())
1027        self.assertIsNone(b.result())
1028        self.assertAlmostEqual(0.1, loop.time())
1029
1030        # move forward to close generator
1031        loop.advance_time(10)
1032        loop.run_until_complete(asyncio.wait([a, b]))
1033
1034    def test_wait_really_done(self):
1035        # there is possibility that some tasks in the pending list
1036        # became done but their callbacks haven't all been called yet
1037
1038        async def coro1():
1039            await asyncio.sleep(0)
1040
1041        async def coro2():
1042            await asyncio.sleep(0)
1043            await asyncio.sleep(0)
1044
1045        a = self.new_task(self.loop, coro1())
1046        b = self.new_task(self.loop, coro2())
1047        task = self.new_task(
1048            self.loop,
1049            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1050
1051        done, pending = self.loop.run_until_complete(task)
1052        self.assertEqual({a, b}, done)
1053        self.assertTrue(a.done())
1054        self.assertIsNone(a.result())
1055        self.assertTrue(b.done())
1056        self.assertIsNone(b.result())
1057
1058    def test_wait_first_exception(self):
1059
1060        def gen():
1061            when = yield
1062            self.assertAlmostEqual(10.0, when)
1063            yield 0
1064
1065        loop = self.new_test_loop(gen)
1066
1067        # first_exception, task already has exception
1068        a = self.new_task(loop, asyncio.sleep(10.0))
1069
1070        async def exc():
1071            raise ZeroDivisionError('err')
1072
1073        b = self.new_task(loop, exc())
1074        task = self.new_task(
1075            loop,
1076            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1077
1078        done, pending = loop.run_until_complete(task)
1079        self.assertEqual({b}, done)
1080        self.assertEqual({a}, pending)
1081        self.assertAlmostEqual(0, loop.time())
1082
1083        # move forward to close generator
1084        loop.advance_time(10)
1085        loop.run_until_complete(asyncio.wait([a, b]))
1086
1087    def test_wait_first_exception_in_wait(self):
1088
1089        def gen():
1090            when = yield
1091            self.assertAlmostEqual(10.0, when)
1092            when = yield 0
1093            self.assertAlmostEqual(0.01, when)
1094            yield 0.01
1095
1096        loop = self.new_test_loop(gen)
1097
1098        # first_exception, exception during waiting
1099        a = self.new_task(loop, asyncio.sleep(10.0))
1100
1101        async def exc():
1102            await asyncio.sleep(0.01)
1103            raise ZeroDivisionError('err')
1104
1105        b = self.new_task(loop, exc())
1106        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1107
1108        done, pending = loop.run_until_complete(task)
1109        self.assertEqual({b}, done)
1110        self.assertEqual({a}, pending)
1111        self.assertAlmostEqual(0.01, loop.time())
1112
1113        # move forward to close generator
1114        loop.advance_time(10)
1115        loop.run_until_complete(asyncio.wait([a, b]))
1116
1117    def test_wait_with_exception(self):
1118
1119        def gen():
1120            when = yield
1121            self.assertAlmostEqual(0.1, when)
1122            when = yield 0
1123            self.assertAlmostEqual(0.15, when)
1124            yield 0.15
1125
1126        loop = self.new_test_loop(gen)
1127
1128        a = self.new_task(loop, asyncio.sleep(0.1))
1129
1130        async def sleeper():
1131            await asyncio.sleep(0.15)
1132            raise ZeroDivisionError('really')
1133
1134        b = self.new_task(loop, sleeper())
1135
1136        async def foo():
1137            done, pending = await asyncio.wait([b, a])
1138            self.assertEqual(len(done), 2)
1139            self.assertEqual(pending, set())
1140            errors = set(f for f in done if f.exception() is not None)
1141            self.assertEqual(len(errors), 1)
1142
1143        loop.run_until_complete(self.new_task(loop, foo()))
1144        self.assertAlmostEqual(0.15, loop.time())
1145
1146        loop.run_until_complete(self.new_task(loop, foo()))
1147        self.assertAlmostEqual(0.15, loop.time())
1148
1149    def test_wait_with_timeout(self):
1150
1151        def gen():
1152            when = yield
1153            self.assertAlmostEqual(0.1, when)
1154            when = yield 0
1155            self.assertAlmostEqual(0.15, when)
1156            when = yield 0
1157            self.assertAlmostEqual(0.11, when)
1158            yield 0.11
1159
1160        loop = self.new_test_loop(gen)
1161
1162        a = self.new_task(loop, asyncio.sleep(0.1))
1163        b = self.new_task(loop, asyncio.sleep(0.15))
1164
1165        async def foo():
1166            done, pending = await asyncio.wait([b, a], timeout=0.11)
1167            self.assertEqual(done, set([a]))
1168            self.assertEqual(pending, set([b]))
1169
1170        loop.run_until_complete(self.new_task(loop, foo()))
1171        self.assertAlmostEqual(0.11, loop.time())
1172
1173        # move forward to close generator
1174        loop.advance_time(10)
1175        loop.run_until_complete(asyncio.wait([a, b]))
1176
1177    def test_wait_concurrent_complete(self):
1178
1179        def gen():
1180            when = yield
1181            self.assertAlmostEqual(0.1, when)
1182            when = yield 0
1183            self.assertAlmostEqual(0.15, when)
1184            when = yield 0
1185            self.assertAlmostEqual(0.1, when)
1186            yield 0.1
1187
1188        loop = self.new_test_loop(gen)
1189
1190        a = self.new_task(loop, asyncio.sleep(0.1))
1191        b = self.new_task(loop, asyncio.sleep(0.15))
1192
1193        done, pending = loop.run_until_complete(
1194            asyncio.wait([b, a], timeout=0.1))
1195
1196        self.assertEqual(done, set([a]))
1197        self.assertEqual(pending, set([b]))
1198        self.assertAlmostEqual(0.1, loop.time())
1199
1200        # move forward to close generator
1201        loop.advance_time(10)
1202        loop.run_until_complete(asyncio.wait([a, b]))
1203
1204    def test_as_completed(self):
1205
1206        def gen():
1207            yield 0
1208            yield 0
1209            yield 0.01
1210            yield 0
1211
1212        loop = self.new_test_loop(gen)
1213        # disable "slow callback" warning
1214        loop.slow_callback_duration = 1.0
1215        completed = set()
1216        time_shifted = False
1217
1218        with self.assertWarns(DeprecationWarning):
1219            @asyncio.coroutine
1220            def sleeper(dt, x):
1221                nonlocal time_shifted
1222                yield from  asyncio.sleep(dt)
1223                completed.add(x)
1224                if not time_shifted and 'a' in completed and 'b' in completed:
1225                    time_shifted = True
1226                    loop.advance_time(0.14)
1227                return x
1228
1229        a = sleeper(0.01, 'a')
1230        b = sleeper(0.01, 'b')
1231        c = sleeper(0.15, 'c')
1232
1233        async def foo():
1234            values = []
1235            for f in asyncio.as_completed([b, c, a], loop=loop):
1236                values.append(await f)
1237            return values
1238        with self.assertWarns(DeprecationWarning):
1239            res = loop.run_until_complete(self.new_task(loop, foo()))
1240        self.assertAlmostEqual(0.15, loop.time())
1241        self.assertTrue('a' in res[:2])
1242        self.assertTrue('b' in res[:2])
1243        self.assertEqual(res[2], 'c')
1244
1245        # Doing it again should take no time and exercise a different path.
1246        with self.assertWarns(DeprecationWarning):
1247            res = loop.run_until_complete(self.new_task(loop, foo()))
1248        self.assertAlmostEqual(0.15, loop.time())
1249
1250    def test_as_completed_with_timeout(self):
1251
1252        def gen():
1253            yield
1254            yield 0
1255            yield 0
1256            yield 0.1
1257
1258        loop = self.new_test_loop(gen)
1259
1260        a = loop.create_task(asyncio.sleep(0.1, 'a'))
1261        b = loop.create_task(asyncio.sleep(0.15, 'b'))
1262
1263        async def foo():
1264            values = []
1265            for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
1266                if values:
1267                    loop.advance_time(0.02)
1268                try:
1269                    v = await f
1270                    values.append((1, v))
1271                except asyncio.TimeoutError as exc:
1272                    values.append((2, exc))
1273            return values
1274
1275        with self.assertWarns(DeprecationWarning):
1276            res = loop.run_until_complete(self.new_task(loop, foo()))
1277        self.assertEqual(len(res), 2, res)
1278        self.assertEqual(res[0], (1, 'a'))
1279        self.assertEqual(res[1][0], 2)
1280        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1281        self.assertAlmostEqual(0.12, loop.time())
1282
1283        # move forward to close generator
1284        loop.advance_time(10)
1285        loop.run_until_complete(asyncio.wait([a, b]))
1286
1287    def test_as_completed_with_unused_timeout(self):
1288
1289        def gen():
1290            yield
1291            yield 0
1292            yield 0.01
1293
1294        loop = self.new_test_loop(gen)
1295
1296        a = asyncio.sleep(0.01, 'a')
1297
1298        async def foo():
1299            for f in asyncio.as_completed([a], timeout=1, loop=loop):
1300                v = await f
1301                self.assertEqual(v, 'a')
1302
1303        with self.assertWarns(DeprecationWarning):
1304            loop.run_until_complete(self.new_task(loop, foo()))
1305
1306    def test_as_completed_reverse_wait(self):
1307
1308        def gen():
1309            yield 0
1310            yield 0.05
1311            yield 0
1312
1313        loop = self.new_test_loop(gen)
1314
1315        a = asyncio.sleep(0.05, 'a')
1316        b = asyncio.sleep(0.10, 'b')
1317        fs = {a, b}
1318
1319        with self.assertWarns(DeprecationWarning):
1320            futs = list(asyncio.as_completed(fs, loop=loop))
1321        self.assertEqual(len(futs), 2)
1322
1323        x = loop.run_until_complete(futs[1])
1324        self.assertEqual(x, 'a')
1325        self.assertAlmostEqual(0.05, loop.time())
1326        loop.advance_time(0.05)
1327        y = loop.run_until_complete(futs[0])
1328        self.assertEqual(y, 'b')
1329        self.assertAlmostEqual(0.10, loop.time())
1330
1331    def test_as_completed_concurrent(self):
1332
1333        def gen():
1334            when = yield
1335            self.assertAlmostEqual(0.05, when)
1336            when = yield 0
1337            self.assertAlmostEqual(0.05, when)
1338            yield 0.05
1339
1340        loop = self.new_test_loop(gen)
1341
1342        a = asyncio.sleep(0.05, 'a')
1343        b = asyncio.sleep(0.05, 'b')
1344        fs = {a, b}
1345        with self.assertWarns(DeprecationWarning):
1346            futs = list(asyncio.as_completed(fs, loop=loop))
1347        self.assertEqual(len(futs), 2)
1348        waiter = asyncio.wait(futs)
1349        done, pending = loop.run_until_complete(waiter)
1350        self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1351
1352    def test_as_completed_duplicate_coroutines(self):
1353
1354        with self.assertWarns(DeprecationWarning):
1355            @asyncio.coroutine
1356            def coro(s):
1357                return s
1358
1359        with self.assertWarns(DeprecationWarning):
1360            @asyncio.coroutine
1361            def runner():
1362                result = []
1363                c = coro('ham')
1364                for f in asyncio.as_completed([c, c, coro('spam')],
1365                                              loop=self.loop):
1366                    result.append((yield from f))
1367                return result
1368
1369        with self.assertWarns(DeprecationWarning):
1370            fut = self.new_task(self.loop, runner())
1371            self.loop.run_until_complete(fut)
1372        result = fut.result()
1373        self.assertEqual(set(result), {'ham', 'spam'})
1374        self.assertEqual(len(result), 2)
1375
1376    def test_sleep(self):
1377
1378        def gen():
1379            when = yield
1380            self.assertAlmostEqual(0.05, when)
1381            when = yield 0.05
1382            self.assertAlmostEqual(0.1, when)
1383            yield 0.05
1384
1385        loop = self.new_test_loop(gen)
1386
1387        async def sleeper(dt, arg):
1388            await asyncio.sleep(dt/2)
1389            res = await asyncio.sleep(dt/2, arg)
1390            return res
1391
1392        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1393        loop.run_until_complete(t)
1394        self.assertTrue(t.done())
1395        self.assertEqual(t.result(), 'yeah')
1396        self.assertAlmostEqual(0.1, loop.time())
1397
1398    def test_sleep_cancel(self):
1399
1400        def gen():
1401            when = yield
1402            self.assertAlmostEqual(10.0, when)
1403            yield 0
1404
1405        loop = self.new_test_loop(gen)
1406
1407        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1408
1409        handle = None
1410        orig_call_later = loop.call_later
1411
1412        def call_later(delay, callback, *args):
1413            nonlocal handle
1414            handle = orig_call_later(delay, callback, *args)
1415            return handle
1416
1417        loop.call_later = call_later
1418        test_utils.run_briefly(loop)
1419
1420        self.assertFalse(handle._cancelled)
1421
1422        t.cancel()
1423        test_utils.run_briefly(loop)
1424        self.assertTrue(handle._cancelled)
1425
1426    def test_task_cancel_sleeping_task(self):
1427
1428        def gen():
1429            when = yield
1430            self.assertAlmostEqual(0.1, when)
1431            when = yield 0
1432            self.assertAlmostEqual(5000, when)
1433            yield 0.1
1434
1435        loop = self.new_test_loop(gen)
1436
1437        async def sleep(dt):
1438            await asyncio.sleep(dt)
1439
1440        async def doit():
1441            sleeper = self.new_task(loop, sleep(5000))
1442            loop.call_later(0.1, sleeper.cancel)
1443            try:
1444                await sleeper
1445            except asyncio.CancelledError:
1446                return 'cancelled'
1447            else:
1448                return 'slept in'
1449
1450        doer = doit()
1451        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1452        self.assertAlmostEqual(0.1, loop.time())
1453
1454    def test_task_cancel_waiter_future(self):
1455        fut = self.new_future(self.loop)
1456
1457        async def coro():
1458            await fut
1459
1460        task = self.new_task(self.loop, coro())
1461        test_utils.run_briefly(self.loop)
1462        self.assertIs(task._fut_waiter, fut)
1463
1464        task.cancel()
1465        test_utils.run_briefly(self.loop)
1466        self.assertRaises(
1467            asyncio.CancelledError, self.loop.run_until_complete, task)
1468        self.assertIsNone(task._fut_waiter)
1469        self.assertTrue(fut.cancelled())
1470
1471    def test_task_set_methods(self):
1472        async def notmuch():
1473            return 'ko'
1474
1475        gen = notmuch()
1476        task = self.new_task(self.loop, gen)
1477
1478        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1479            task.set_result('ok')
1480
1481        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1482            task.set_exception(ValueError())
1483
1484        self.assertEqual(
1485            self.loop.run_until_complete(task),
1486            'ko')
1487
1488    def test_step_result(self):
1489        with self.assertWarns(DeprecationWarning):
1490            @asyncio.coroutine
1491            def notmuch():
1492                yield None
1493                yield 1
1494                return 'ko'
1495
1496        self.assertRaises(
1497            RuntimeError, self.loop.run_until_complete, notmuch())
1498
1499    def test_step_result_future(self):
1500        # If coroutine returns future, task waits on this future.
1501
1502        class Fut(asyncio.Future):
1503            def __init__(self, *args, **kwds):
1504                self.cb_added = False
1505                super().__init__(*args, **kwds)
1506
1507            def add_done_callback(self, *args, **kwargs):
1508                self.cb_added = True
1509                super().add_done_callback(*args, **kwargs)
1510
1511        fut = Fut(loop=self.loop)
1512        result = None
1513
1514        async def wait_for_future():
1515            nonlocal result
1516            result = await fut
1517
1518        t = self.new_task(self.loop, wait_for_future())
1519        test_utils.run_briefly(self.loop)
1520        self.assertTrue(fut.cb_added)
1521
1522        res = object()
1523        fut.set_result(res)
1524        test_utils.run_briefly(self.loop)
1525        self.assertIs(res, result)
1526        self.assertTrue(t.done())
1527        self.assertIsNone(t.result())
1528
1529    def test_baseexception_during_cancel(self):
1530
1531        def gen():
1532            when = yield
1533            self.assertAlmostEqual(10.0, when)
1534            yield 0
1535
1536        loop = self.new_test_loop(gen)
1537
1538        async def sleeper():
1539            await asyncio.sleep(10)
1540
1541        base_exc = SystemExit()
1542
1543        async def notmutch():
1544            try:
1545                await sleeper()
1546            except asyncio.CancelledError:
1547                raise base_exc
1548
1549        task = self.new_task(loop, notmutch())
1550        test_utils.run_briefly(loop)
1551
1552        task.cancel()
1553        self.assertFalse(task.done())
1554
1555        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1556
1557        self.assertTrue(task.done())
1558        self.assertFalse(task.cancelled())
1559        self.assertIs(task.exception(), base_exc)
1560
1561    def test_iscoroutinefunction(self):
1562        def fn():
1563            pass
1564
1565        self.assertFalse(asyncio.iscoroutinefunction(fn))
1566
1567        def fn1():
1568            yield
1569        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1570
1571        with self.assertWarns(DeprecationWarning):
1572            @asyncio.coroutine
1573            def fn2():
1574                yield
1575        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1576
1577        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1578
1579    def test_yield_vs_yield_from(self):
1580        fut = self.new_future(self.loop)
1581
1582        with self.assertWarns(DeprecationWarning):
1583            @asyncio.coroutine
1584            def wait_for_future():
1585                yield fut
1586
1587        task = wait_for_future()
1588        with self.assertRaises(RuntimeError):
1589            self.loop.run_until_complete(task)
1590
1591        self.assertFalse(fut.done())
1592
1593    def test_yield_vs_yield_from_generator(self):
1594        with self.assertWarns(DeprecationWarning):
1595            @asyncio.coroutine
1596            def coro():
1597                yield
1598
1599        with self.assertWarns(DeprecationWarning):
1600            @asyncio.coroutine
1601            def wait_for_future():
1602                gen = coro()
1603                try:
1604                    yield gen
1605                finally:
1606                    gen.close()
1607
1608        task = wait_for_future()
1609        self.assertRaises(
1610            RuntimeError,
1611            self.loop.run_until_complete, task)
1612
1613    def test_coroutine_non_gen_function(self):
1614        with self.assertWarns(DeprecationWarning):
1615            @asyncio.coroutine
1616            def func():
1617                return 'test'
1618
1619        self.assertTrue(asyncio.iscoroutinefunction(func))
1620
1621        coro = func()
1622        self.assertTrue(asyncio.iscoroutine(coro))
1623
1624        res = self.loop.run_until_complete(coro)
1625        self.assertEqual(res, 'test')
1626
1627    def test_coroutine_non_gen_function_return_future(self):
1628        fut = self.new_future(self.loop)
1629
1630        with self.assertWarns(DeprecationWarning):
1631            @asyncio.coroutine
1632            def func():
1633                return fut
1634
1635        async def coro():
1636            fut.set_result('test')
1637
1638        t1 = self.new_task(self.loop, func())
1639        t2 = self.new_task(self.loop, coro())
1640        res = self.loop.run_until_complete(t1)
1641        self.assertEqual(res, 'test')
1642        self.assertIsNone(t2.result())
1643
1644
1645    def test_current_task_deprecated(self):
1646        Task = self.__class__.Task
1647
1648        with self.assertWarns(DeprecationWarning):
1649            self.assertIsNone(Task.current_task(loop=self.loop))
1650
1651        async def coro(loop):
1652            with self.assertWarns(DeprecationWarning):
1653                self.assertIs(Task.current_task(loop=loop), task)
1654
1655            # See http://bugs.python.org/issue29271 for details:
1656            asyncio.set_event_loop(loop)
1657            try:
1658                with self.assertWarns(DeprecationWarning):
1659                    self.assertIs(Task.current_task(None), task)
1660                with self.assertWarns(DeprecationWarning):
1661                    self.assertIs(Task.current_task(), task)
1662            finally:
1663                asyncio.set_event_loop(None)
1664
1665        task = self.new_task(self.loop, coro(self.loop))
1666        self.loop.run_until_complete(task)
1667        with self.assertWarns(DeprecationWarning):
1668            self.assertIsNone(Task.current_task(loop=self.loop))
1669
1670    def test_current_task(self):
1671        self.assertIsNone(asyncio.current_task(loop=self.loop))
1672
1673        async def coro(loop):
1674            self.assertIs(asyncio.current_task(loop=loop), task)
1675
1676            self.assertIs(asyncio.current_task(None), task)
1677            self.assertIs(asyncio.current_task(), task)
1678
1679        task = self.new_task(self.loop, coro(self.loop))
1680        self.loop.run_until_complete(task)
1681        self.assertIsNone(asyncio.current_task(loop=self.loop))
1682
1683    def test_current_task_with_interleaving_tasks(self):
1684        self.assertIsNone(asyncio.current_task(loop=self.loop))
1685
1686        fut1 = self.new_future(self.loop)
1687        fut2 = self.new_future(self.loop)
1688
1689        async def coro1(loop):
1690            self.assertTrue(asyncio.current_task(loop=loop) is task1)
1691            await fut1
1692            self.assertTrue(asyncio.current_task(loop=loop) is task1)
1693            fut2.set_result(True)
1694
1695        async def coro2(loop):
1696            self.assertTrue(asyncio.current_task(loop=loop) is task2)
1697            fut1.set_result(True)
1698            await fut2
1699            self.assertTrue(asyncio.current_task(loop=loop) is task2)
1700
1701        task1 = self.new_task(self.loop, coro1(self.loop))
1702        task2 = self.new_task(self.loop, coro2(self.loop))
1703
1704        self.loop.run_until_complete(asyncio.wait((task1, task2)))
1705        self.assertIsNone(asyncio.current_task(loop=self.loop))
1706
1707    # Some thorough tests for cancellation propagation through
1708    # coroutines, tasks and wait().
1709
1710    def test_yield_future_passes_cancel(self):
1711        # Cancelling outer() cancels inner() cancels waiter.
1712        proof = 0
1713        waiter = self.new_future(self.loop)
1714
1715        async def inner():
1716            nonlocal proof
1717            try:
1718                await waiter
1719            except asyncio.CancelledError:
1720                proof += 1
1721                raise
1722            else:
1723                self.fail('got past sleep() in inner()')
1724
1725        async def outer():
1726            nonlocal proof
1727            try:
1728                await inner()
1729            except asyncio.CancelledError:
1730                proof += 100  # Expect this path.
1731            else:
1732                proof += 10
1733
1734        f = asyncio.ensure_future(outer(), loop=self.loop)
1735        test_utils.run_briefly(self.loop)
1736        f.cancel()
1737        self.loop.run_until_complete(f)
1738        self.assertEqual(proof, 101)
1739        self.assertTrue(waiter.cancelled())
1740
1741    def test_yield_wait_does_not_shield_cancel(self):
1742        # Cancelling outer() makes wait() return early, leaves inner()
1743        # running.
1744        proof = 0
1745        waiter = self.new_future(self.loop)
1746
1747        async def inner():
1748            nonlocal proof
1749            await waiter
1750            proof += 1
1751
1752        async def outer():
1753            nonlocal proof
1754            d, p = await asyncio.wait([inner()])
1755            proof += 100
1756
1757        f = asyncio.ensure_future(outer(), loop=self.loop)
1758        test_utils.run_briefly(self.loop)
1759        f.cancel()
1760        self.assertRaises(
1761            asyncio.CancelledError, self.loop.run_until_complete, f)
1762        waiter.set_result(None)
1763        test_utils.run_briefly(self.loop)
1764        self.assertEqual(proof, 1)
1765
1766    def test_shield_result(self):
1767        inner = self.new_future(self.loop)
1768        outer = asyncio.shield(inner)
1769        inner.set_result(42)
1770        res = self.loop.run_until_complete(outer)
1771        self.assertEqual(res, 42)
1772
1773    def test_shield_exception(self):
1774        inner = self.new_future(self.loop)
1775        outer = asyncio.shield(inner)
1776        test_utils.run_briefly(self.loop)
1777        exc = RuntimeError('expected')
1778        inner.set_exception(exc)
1779        test_utils.run_briefly(self.loop)
1780        self.assertIs(outer.exception(), exc)
1781
1782    def test_shield_cancel_inner(self):
1783        inner = self.new_future(self.loop)
1784        outer = asyncio.shield(inner)
1785        test_utils.run_briefly(self.loop)
1786        inner.cancel()
1787        test_utils.run_briefly(self.loop)
1788        self.assertTrue(outer.cancelled())
1789
1790    def test_shield_cancel_outer(self):
1791        inner = self.new_future(self.loop)
1792        outer = asyncio.shield(inner)
1793        test_utils.run_briefly(self.loop)
1794        outer.cancel()
1795        test_utils.run_briefly(self.loop)
1796        self.assertTrue(outer.cancelled())
1797        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
1798
1799    def test_shield_shortcut(self):
1800        fut = self.new_future(self.loop)
1801        fut.set_result(42)
1802        res = self.loop.run_until_complete(asyncio.shield(fut))
1803        self.assertEqual(res, 42)
1804
1805    def test_shield_effect(self):
1806        # Cancelling outer() does not affect inner().
1807        proof = 0
1808        waiter = self.new_future(self.loop)
1809
1810        async def inner():
1811            nonlocal proof
1812            await waiter
1813            proof += 1
1814
1815        async def outer():
1816            nonlocal proof
1817            await asyncio.shield(inner())
1818            proof += 100
1819
1820        f = asyncio.ensure_future(outer(), loop=self.loop)
1821        test_utils.run_briefly(self.loop)
1822        f.cancel()
1823        with self.assertRaises(asyncio.CancelledError):
1824            self.loop.run_until_complete(f)
1825        waiter.set_result(None)
1826        test_utils.run_briefly(self.loop)
1827        self.assertEqual(proof, 1)
1828
1829    def test_shield_gather(self):
1830        child1 = self.new_future(self.loop)
1831        child2 = self.new_future(self.loop)
1832        parent = asyncio.gather(child1, child2)
1833        outer = asyncio.shield(parent)
1834        test_utils.run_briefly(self.loop)
1835        outer.cancel()
1836        test_utils.run_briefly(self.loop)
1837        self.assertTrue(outer.cancelled())
1838        child1.set_result(1)
1839        child2.set_result(2)
1840        test_utils.run_briefly(self.loop)
1841        self.assertEqual(parent.result(), [1, 2])
1842
1843    def test_gather_shield(self):
1844        child1 = self.new_future(self.loop)
1845        child2 = self.new_future(self.loop)
1846        inner1 = asyncio.shield(child1)
1847        inner2 = asyncio.shield(child2)
1848        parent = asyncio.gather(inner1, inner2)
1849        test_utils.run_briefly(self.loop)
1850        parent.cancel()
1851        # This should cancel inner1 and inner2 but bot child1 and child2.
1852        test_utils.run_briefly(self.loop)
1853        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
1854        self.assertTrue(inner1.cancelled())
1855        self.assertTrue(inner2.cancelled())
1856        child1.set_result(1)
1857        child2.set_result(2)
1858        test_utils.run_briefly(self.loop)
1859
1860    def test_as_completed_invalid_args(self):
1861        fut = self.new_future(self.loop)
1862
1863        # as_completed() expects a list of futures, not a future instance
1864        self.assertRaises(TypeError, self.loop.run_until_complete,
1865            asyncio.as_completed(fut, loop=self.loop))
1866        coro = coroutine_function()
1867        self.assertRaises(TypeError, self.loop.run_until_complete,
1868            asyncio.as_completed(coro, loop=self.loop))
1869        coro.close()
1870
1871    def test_wait_invalid_args(self):
1872        fut = self.new_future(self.loop)
1873
1874        # wait() expects a list of futures, not a future instance
1875        self.assertRaises(TypeError, self.loop.run_until_complete,
1876            asyncio.wait(fut))
1877        coro = coroutine_function()
1878        self.assertRaises(TypeError, self.loop.run_until_complete,
1879            asyncio.wait(coro))
1880        coro.close()
1881
1882        # wait() expects at least a future
1883        self.assertRaises(ValueError, self.loop.run_until_complete,
1884            asyncio.wait([]))
1885
1886    def test_corowrapper_mocks_generator(self):
1887
1888        def check():
1889            # A function that asserts various things.
1890            # Called twice, with different debug flag values.
1891
1892            with self.assertWarns(DeprecationWarning):
1893                @asyncio.coroutine
1894                def coro():
1895                    # The actual coroutine.
1896                    self.assertTrue(gen.gi_running)
1897                    yield from fut
1898
1899            # A completed Future used to run the coroutine.
1900            fut = self.new_future(self.loop)
1901            fut.set_result(None)
1902
1903            # Call the coroutine.
1904            gen = coro()
1905
1906            # Check some properties.
1907            self.assertTrue(asyncio.iscoroutine(gen))
1908            self.assertIsInstance(gen.gi_frame, types.FrameType)
1909            self.assertFalse(gen.gi_running)
1910            self.assertIsInstance(gen.gi_code, types.CodeType)
1911
1912            # Run it.
1913            self.loop.run_until_complete(gen)
1914
1915            # The frame should have changed.
1916            self.assertIsNone(gen.gi_frame)
1917
1918        # Test with debug flag cleared.
1919        with set_coroutine_debug(False):
1920            check()
1921
1922        # Test with debug flag set.
1923        with set_coroutine_debug(True):
1924            check()
1925
1926    def test_yield_from_corowrapper(self):
1927        with set_coroutine_debug(True):
1928            with self.assertWarns(DeprecationWarning):
1929                @asyncio.coroutine
1930                def t1():
1931                    return (yield from t2())
1932
1933            with self.assertWarns(DeprecationWarning):
1934                @asyncio.coroutine
1935                def t2():
1936                    f = self.new_future(self.loop)
1937                    self.new_task(self.loop, t3(f))
1938                    return (yield from f)
1939
1940            with self.assertWarns(DeprecationWarning):
1941                @asyncio.coroutine
1942                def t3(f):
1943                    f.set_result((1, 2, 3))
1944
1945            task = self.new_task(self.loop, t1())
1946            val = self.loop.run_until_complete(task)
1947            self.assertEqual(val, (1, 2, 3))
1948
1949    def test_yield_from_corowrapper_send(self):
1950        def foo():
1951            a = yield
1952            return a
1953
1954        def call(arg):
1955            cw = asyncio.coroutines.CoroWrapper(foo())
1956            cw.send(None)
1957            try:
1958                cw.send(arg)
1959            except StopIteration as ex:
1960                return ex.args[0]
1961            else:
1962                raise AssertionError('StopIteration was expected')
1963
1964        self.assertEqual(call((1, 2)), (1, 2))
1965        self.assertEqual(call('spam'), 'spam')
1966
1967    def test_corowrapper_weakref(self):
1968        wd = weakref.WeakValueDictionary()
1969        def foo(): yield from []
1970        cw = asyncio.coroutines.CoroWrapper(foo())
1971        wd['cw'] = cw  # Would fail without __weakref__ slot.
1972        cw.gen = None  # Suppress warning from __del__.
1973
1974    def test_corowrapper_throw(self):
1975        # Issue 429: CoroWrapper.throw must be compatible with gen.throw
1976        def foo():
1977            value = None
1978            while True:
1979                try:
1980                    value = yield value
1981                except Exception as e:
1982                    value = e
1983
1984        exception = Exception("foo")
1985        cw = asyncio.coroutines.CoroWrapper(foo())
1986        cw.send(None)
1987        self.assertIs(exception, cw.throw(exception))
1988
1989        cw = asyncio.coroutines.CoroWrapper(foo())
1990        cw.send(None)
1991        self.assertIs(exception, cw.throw(Exception, exception))
1992
1993        cw = asyncio.coroutines.CoroWrapper(foo())
1994        cw.send(None)
1995        exception = cw.throw(Exception, "foo")
1996        self.assertIsInstance(exception, Exception)
1997        self.assertEqual(exception.args, ("foo", ))
1998
1999        cw = asyncio.coroutines.CoroWrapper(foo())
2000        cw.send(None)
2001        exception = cw.throw(Exception, "foo", None)
2002        self.assertIsInstance(exception, Exception)
2003        self.assertEqual(exception.args, ("foo", ))
2004
2005    def test_all_tasks_deprecated(self):
2006        Task = self.__class__.Task
2007
2008        async def coro():
2009            with self.assertWarns(DeprecationWarning):
2010                assert Task.all_tasks(self.loop) == {t}
2011
2012        t = self.new_task(self.loop, coro())
2013        self.loop.run_until_complete(t)
2014
2015    def test_log_destroyed_pending_task(self):
2016        Task = self.__class__.Task
2017
2018        with self.assertWarns(DeprecationWarning):
2019            @asyncio.coroutine
2020            def kill_me(loop):
2021                future = self.new_future(loop)
2022                yield from future
2023                # at this point, the only reference to kill_me() task is
2024                # the Task._wakeup() method in future._callbacks
2025                raise Exception("code never reached")
2026
2027        mock_handler = mock.Mock()
2028        self.loop.set_debug(True)
2029        self.loop.set_exception_handler(mock_handler)
2030
2031        # schedule the task
2032        coro = kill_me(self.loop)
2033        task = asyncio.ensure_future(coro, loop=self.loop)
2034
2035        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2036
2037        # See http://bugs.python.org/issue29271 for details:
2038        asyncio.set_event_loop(self.loop)
2039        try:
2040            with self.assertWarns(DeprecationWarning):
2041                self.assertEqual(Task.all_tasks(), {task})
2042            with self.assertWarns(DeprecationWarning):
2043                self.assertEqual(Task.all_tasks(None), {task})
2044        finally:
2045            asyncio.set_event_loop(None)
2046
2047        # execute the task so it waits for future
2048        self.loop._run_once()
2049        self.assertEqual(len(self.loop._ready), 0)
2050
2051        # remove the future used in kill_me(), and references to the task
2052        del coro.gi_frame.f_locals['future']
2053        coro = None
2054        source_traceback = task._source_traceback
2055        task = None
2056
2057        # no more reference to kill_me() task: the task is destroyed by the GC
2058        support.gc_collect()
2059
2060        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2061
2062        mock_handler.assert_called_with(self.loop, {
2063            'message': 'Task was destroyed but it is pending!',
2064            'task': mock.ANY,
2065            'source_traceback': source_traceback,
2066        })
2067        mock_handler.reset_mock()
2068
2069    @mock.patch('asyncio.base_events.logger')
2070    def test_tb_logger_not_called_after_cancel(self, m_log):
2071        loop = asyncio.new_event_loop()
2072        self.set_event_loop(loop)
2073
2074        async def coro():
2075            raise TypeError
2076
2077        async def runner():
2078            task = self.new_task(loop, coro())
2079            await asyncio.sleep(0.05)
2080            task.cancel()
2081            task = None
2082
2083        loop.run_until_complete(runner())
2084        self.assertFalse(m_log.error.called)
2085
2086    @mock.patch('asyncio.coroutines.logger')
2087    def test_coroutine_never_yielded(self, m_log):
2088        with set_coroutine_debug(True):
2089            with self.assertWarns(DeprecationWarning):
2090                @asyncio.coroutine
2091                def coro_noop():
2092                    pass
2093
2094        tb_filename = __file__
2095        tb_lineno = sys._getframe().f_lineno + 2
2096        # create a coroutine object but don't use it
2097        coro_noop()
2098        support.gc_collect()
2099
2100        self.assertTrue(m_log.error.called)
2101        message = m_log.error.call_args[0][0]
2102        func_filename, func_lineno = test_utils.get_function_source(coro_noop)
2103
2104        regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
2105                    r'was never yielded from\n'
2106                 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
2107                 r'.*\n'
2108                 r'  File "%s", line %s, in test_coroutine_never_yielded\n'
2109                 r'    coro_noop\(\)$'
2110                 % (re.escape(coro_noop.__qualname__),
2111                    re.escape(func_filename), func_lineno,
2112                    re.escape(tb_filename), tb_lineno))
2113
2114        self.assertRegex(message, re.compile(regex, re.DOTALL))
2115
2116    def test_return_coroutine_from_coroutine(self):
2117        """Return of @asyncio.coroutine()-wrapped function generator object
2118        from @asyncio.coroutine()-wrapped function should have same effect as
2119        returning generator object or Future."""
2120        def check():
2121            with self.assertWarns(DeprecationWarning):
2122                @asyncio.coroutine
2123                def outer_coro():
2124                    with self.assertWarns(DeprecationWarning):
2125                        @asyncio.coroutine
2126                        def inner_coro():
2127                            return 1
2128
2129                    return inner_coro()
2130
2131            result = self.loop.run_until_complete(outer_coro())
2132            self.assertEqual(result, 1)
2133
2134        # Test with debug flag cleared.
2135        with set_coroutine_debug(False):
2136            check()
2137
2138        # Test with debug flag set.
2139        with set_coroutine_debug(True):
2140            check()
2141
2142    def test_task_source_traceback(self):
2143        self.loop.set_debug(True)
2144
2145        task = self.new_task(self.loop, coroutine_function())
2146        lineno = sys._getframe().f_lineno - 1
2147        self.assertIsInstance(task._source_traceback, list)
2148        self.assertEqual(task._source_traceback[-2][:3],
2149                         (__file__,
2150                          lineno,
2151                          'test_task_source_traceback'))
2152        self.loop.run_until_complete(task)
2153
2154    def _test_cancel_wait_for(self, timeout):
2155        loop = asyncio.new_event_loop()
2156        self.addCleanup(loop.close)
2157
2158        async def blocking_coroutine():
2159            fut = self.new_future(loop)
2160            # Block: fut result is never set
2161            await fut
2162
2163        task = loop.create_task(blocking_coroutine())
2164
2165        wait = loop.create_task(asyncio.wait_for(task, timeout))
2166        loop.call_soon(wait.cancel)
2167
2168        self.assertRaises(asyncio.CancelledError,
2169                          loop.run_until_complete, wait)
2170
2171        # Python issue #23219: cancelling the wait must also cancel the task
2172        self.assertTrue(task.cancelled())
2173
2174    def test_cancel_blocking_wait_for(self):
2175        self._test_cancel_wait_for(None)
2176
2177    def test_cancel_wait_for(self):
2178        self._test_cancel_wait_for(60.0)
2179
2180    def test_cancel_gather_1(self):
2181        """Ensure that a gathering future refuses to be cancelled once all
2182        children are done"""
2183        loop = asyncio.new_event_loop()
2184        self.addCleanup(loop.close)
2185
2186        fut = self.new_future(loop)
2187        # The indirection fut->child_coro is needed since otherwise the
2188        # gathering task is done at the same time as the child future
2189        def child_coro():
2190            return (yield from fut)
2191        gather_future = asyncio.gather(child_coro(), loop=loop)
2192        gather_task = asyncio.ensure_future(gather_future, loop=loop)
2193
2194        cancel_result = None
2195        def cancelling_callback(_):
2196            nonlocal cancel_result
2197            cancel_result = gather_task.cancel()
2198        fut.add_done_callback(cancelling_callback)
2199
2200        fut.set_result(42) # calls the cancelling_callback after fut is done()
2201
2202        # At this point the task should complete.
2203        loop.run_until_complete(gather_task)
2204
2205        # Python issue #26923: asyncio.gather drops cancellation
2206        self.assertEqual(cancel_result, False)
2207        self.assertFalse(gather_task.cancelled())
2208        self.assertEqual(gather_task.result(), [42])
2209
2210    def test_cancel_gather_2(self):
2211        loop = asyncio.new_event_loop()
2212        self.addCleanup(loop.close)
2213
2214        async def test():
2215            time = 0
2216            while True:
2217                time += 0.05
2218                await asyncio.gather(asyncio.sleep(0.05),
2219                                     return_exceptions=True,
2220                                     loop=loop)
2221                if time > 1:
2222                    return
2223
2224        async def main():
2225            qwe = self.new_task(loop, test())
2226            await asyncio.sleep(0.2)
2227            qwe.cancel()
2228            try:
2229                await qwe
2230            except asyncio.CancelledError:
2231                pass
2232            else:
2233                self.fail('gather did not propagate the cancellation request')
2234
2235        loop.run_until_complete(main())
2236
2237    def test_exception_traceback(self):
2238        # See http://bugs.python.org/issue28843
2239
2240        async def foo():
2241            1 / 0
2242
2243        async def main():
2244            task = self.new_task(self.loop, foo())
2245            await asyncio.sleep(0)  # skip one loop iteration
2246            self.assertIsNotNone(task.exception().__traceback__)
2247
2248        self.loop.run_until_complete(main())
2249
2250    @mock.patch('asyncio.base_events.logger')
2251    def test_error_in_call_soon(self, m_log):
2252        def call_soon(callback, *args, **kwargs):
2253            raise ValueError
2254        self.loop.call_soon = call_soon
2255
2256        with self.assertWarns(DeprecationWarning):
2257            @asyncio.coroutine
2258            def coro():
2259                pass
2260
2261        self.assertFalse(m_log.error.called)
2262
2263        with self.assertRaises(ValueError):
2264            gen = coro()
2265            try:
2266                self.new_task(self.loop, gen)
2267            finally:
2268                gen.close()
2269
2270        self.assertTrue(m_log.error.called)
2271        message = m_log.error.call_args[0][0]
2272        self.assertIn('Task was destroyed but it is pending', message)
2273
2274        self.assertEqual(asyncio.all_tasks(self.loop), set())
2275
2276    def test_create_task_with_noncoroutine(self):
2277        with self.assertRaisesRegex(TypeError,
2278                                    "a coroutine was expected, got 123"):
2279            self.new_task(self.loop, 123)
2280
2281        # test it for the second time to ensure that caching
2282        # in asyncio.iscoroutine() doesn't break things.
2283        with self.assertRaisesRegex(TypeError,
2284                                    "a coroutine was expected, got 123"):
2285            self.new_task(self.loop, 123)
2286
2287    def test_create_task_with_oldstyle_coroutine(self):
2288
2289        with self.assertWarns(DeprecationWarning):
2290            @asyncio.coroutine
2291            def coro():
2292                pass
2293
2294        task = self.new_task(self.loop, coro())
2295        self.assertIsInstance(task, self.Task)
2296        self.loop.run_until_complete(task)
2297
2298        # test it for the second time to ensure that caching
2299        # in asyncio.iscoroutine() doesn't break things.
2300        task = self.new_task(self.loop, coro())
2301        self.assertIsInstance(task, self.Task)
2302        self.loop.run_until_complete(task)
2303
2304    def test_create_task_with_async_function(self):
2305
2306        async def coro():
2307            pass
2308
2309        task = self.new_task(self.loop, coro())
2310        self.assertIsInstance(task, self.Task)
2311        self.loop.run_until_complete(task)
2312
2313        # test it for the second time to ensure that caching
2314        # in asyncio.iscoroutine() doesn't break things.
2315        task = self.new_task(self.loop, coro())
2316        self.assertIsInstance(task, self.Task)
2317        self.loop.run_until_complete(task)
2318
2319    def test_create_task_with_asynclike_function(self):
2320        task = self.new_task(self.loop, CoroLikeObject())
2321        self.assertIsInstance(task, self.Task)
2322        self.assertEqual(self.loop.run_until_complete(task), 42)
2323
2324        # test it for the second time to ensure that caching
2325        # in asyncio.iscoroutine() doesn't break things.
2326        task = self.new_task(self.loop, CoroLikeObject())
2327        self.assertIsInstance(task, self.Task)
2328        self.assertEqual(self.loop.run_until_complete(task), 42)
2329
2330    def test_bare_create_task(self):
2331
2332        async def inner():
2333            return 1
2334
2335        async def coro():
2336            task = asyncio.create_task(inner())
2337            self.assertIsInstance(task, self.Task)
2338            ret = await task
2339            self.assertEqual(1, ret)
2340
2341        self.loop.run_until_complete(coro())
2342
2343    def test_bare_create_named_task(self):
2344
2345        async def coro_noop():
2346            pass
2347
2348        async def coro():
2349            task = asyncio.create_task(coro_noop(), name='No-op')
2350            self.assertEqual(task.get_name(), 'No-op')
2351            await task
2352
2353        self.loop.run_until_complete(coro())
2354
2355    def test_context_1(self):
2356        cvar = contextvars.ContextVar('cvar', default='nope')
2357
2358        async def sub():
2359            await asyncio.sleep(0.01)
2360            self.assertEqual(cvar.get(), 'nope')
2361            cvar.set('something else')
2362
2363        async def main():
2364            self.assertEqual(cvar.get(), 'nope')
2365            subtask = self.new_task(loop, sub())
2366            cvar.set('yes')
2367            self.assertEqual(cvar.get(), 'yes')
2368            await subtask
2369            self.assertEqual(cvar.get(), 'yes')
2370
2371        loop = asyncio.new_event_loop()
2372        try:
2373            task = self.new_task(loop, main())
2374            loop.run_until_complete(task)
2375        finally:
2376            loop.close()
2377
2378    def test_context_2(self):
2379        cvar = contextvars.ContextVar('cvar', default='nope')
2380
2381        async def main():
2382            def fut_on_done(fut):
2383                # This change must not pollute the context
2384                # of the "main()" task.
2385                cvar.set('something else')
2386
2387            self.assertEqual(cvar.get(), 'nope')
2388
2389            for j in range(2):
2390                fut = self.new_future(loop)
2391                fut.add_done_callback(fut_on_done)
2392                cvar.set(f'yes{j}')
2393                loop.call_soon(fut.set_result, None)
2394                await fut
2395                self.assertEqual(cvar.get(), f'yes{j}')
2396
2397                for i in range(3):
2398                    # Test that task passed its context to add_done_callback:
2399                    cvar.set(f'yes{i}-{j}')
2400                    await asyncio.sleep(0.001)
2401                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2402
2403        loop = asyncio.new_event_loop()
2404        try:
2405            task = self.new_task(loop, main())
2406            loop.run_until_complete(task)
2407        finally:
2408            loop.close()
2409
2410        self.assertEqual(cvar.get(), 'nope')
2411
2412    def test_context_3(self):
2413        # Run 100 Tasks in parallel, each modifying cvar.
2414
2415        cvar = contextvars.ContextVar('cvar', default=-1)
2416
2417        async def sub(num):
2418            for i in range(10):
2419                cvar.set(num + i)
2420                await asyncio.sleep(random.uniform(0.001, 0.05))
2421                self.assertEqual(cvar.get(), num + i)
2422
2423        async def main():
2424            tasks = []
2425            for i in range(100):
2426                task = loop.create_task(sub(random.randint(0, 10)))
2427                tasks.append(task)
2428
2429            await asyncio.gather(*tasks, loop=loop)
2430
2431        loop = asyncio.new_event_loop()
2432        try:
2433            loop.run_until_complete(main())
2434        finally:
2435            loop.close()
2436
2437        self.assertEqual(cvar.get(), -1)
2438
2439    def test_get_coro(self):
2440        loop = asyncio.new_event_loop()
2441        coro = coroutine_function()
2442        try:
2443            task = self.new_task(loop, coro)
2444            loop.run_until_complete(task)
2445            self.assertIs(task.get_coro(), coro)
2446        finally:
2447            loop.close()
2448
2449
2450def add_subclass_tests(cls):
2451    BaseTask = cls.Task
2452    BaseFuture = cls.Future
2453
2454    if BaseTask is None or BaseFuture is None:
2455        return cls
2456
2457    class CommonFuture:
2458        def __init__(self, *args, **kwargs):
2459            self.calls = collections.defaultdict(lambda: 0)
2460            super().__init__(*args, **kwargs)
2461
2462        def add_done_callback(self, *args, **kwargs):
2463            self.calls['add_done_callback'] += 1
2464            return super().add_done_callback(*args, **kwargs)
2465
2466    class Task(CommonFuture, BaseTask):
2467        pass
2468
2469    class Future(CommonFuture, BaseFuture):
2470        pass
2471
2472    def test_subclasses_ctask_cfuture(self):
2473        fut = self.Future(loop=self.loop)
2474
2475        async def func():
2476            self.loop.call_soon(lambda: fut.set_result('spam'))
2477            return await fut
2478
2479        task = self.Task(func(), loop=self.loop)
2480
2481        result = self.loop.run_until_complete(task)
2482
2483        self.assertEqual(result, 'spam')
2484
2485        self.assertEqual(
2486            dict(task.calls),
2487            {'add_done_callback': 1})
2488
2489        self.assertEqual(
2490            dict(fut.calls),
2491            {'add_done_callback': 1})
2492
2493    # Add patched Task & Future back to the test case
2494    cls.Task = Task
2495    cls.Future = Future
2496
2497    # Add an extra unit-test
2498    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2499
2500    # Disable the "test_task_source_traceback" test
2501    # (the test is hardcoded for a particular call stack, which
2502    # is slightly different for Task subclasses)
2503    cls.test_task_source_traceback = None
2504
2505    return cls
2506
2507
2508class SetMethodsTest:
2509
2510    def test_set_result_causes_invalid_state(self):
2511        Future = type(self).Future
2512        self.loop.call_exception_handler = exc_handler = mock.Mock()
2513
2514        async def foo():
2515            await asyncio.sleep(0.1)
2516            return 10
2517
2518        coro = foo()
2519        task = self.new_task(self.loop, coro)
2520        Future.set_result(task, 'spam')
2521
2522        self.assertEqual(
2523            self.loop.run_until_complete(task),
2524            'spam')
2525
2526        exc_handler.assert_called_once()
2527        exc = exc_handler.call_args[0][0]['exception']
2528        with self.assertRaisesRegex(asyncio.InvalidStateError,
2529                                    r'step\(\): already done'):
2530            raise exc
2531
2532        coro.close()
2533
2534    def test_set_exception_causes_invalid_state(self):
2535        class MyExc(Exception):
2536            pass
2537
2538        Future = type(self).Future
2539        self.loop.call_exception_handler = exc_handler = mock.Mock()
2540
2541        async def foo():
2542            await asyncio.sleep(0.1)
2543            return 10
2544
2545        coro = foo()
2546        task = self.new_task(self.loop, coro)
2547        Future.set_exception(task, MyExc())
2548
2549        with self.assertRaises(MyExc):
2550            self.loop.run_until_complete(task)
2551
2552        exc_handler.assert_called_once()
2553        exc = exc_handler.call_args[0][0]['exception']
2554        with self.assertRaisesRegex(asyncio.InvalidStateError,
2555                                    r'step\(\): already done'):
2556            raise exc
2557
2558        coro.close()
2559
2560
2561@unittest.skipUnless(hasattr(futures, '_CFuture') and
2562                     hasattr(tasks, '_CTask'),
2563                     'requires the C _asyncio module')
2564class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2565                          test_utils.TestCase):
2566
2567    Task = getattr(tasks, '_CTask', None)
2568    Future = getattr(futures, '_CFuture', None)
2569
2570    @support.refcount_test
2571    def test_refleaks_in_task___init__(self):
2572        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2573        async def coro():
2574            pass
2575        task = self.new_task(self.loop, coro())
2576        self.loop.run_until_complete(task)
2577        refs_before = gettotalrefcount()
2578        for i in range(100):
2579            task.__init__(coro(), loop=self.loop)
2580            self.loop.run_until_complete(task)
2581        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2582
2583    def test_del__log_destroy_pending_segfault(self):
2584        async def coro():
2585            pass
2586        task = self.new_task(self.loop, coro())
2587        self.loop.run_until_complete(task)
2588        with self.assertRaises(AttributeError):
2589            del task._log_destroy_pending
2590
2591
2592@unittest.skipUnless(hasattr(futures, '_CFuture') and
2593                     hasattr(tasks, '_CTask'),
2594                     'requires the C _asyncio module')
2595@add_subclass_tests
2596class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2597
2598    Task = getattr(tasks, '_CTask', None)
2599    Future = getattr(futures, '_CFuture', None)
2600
2601
2602@unittest.skipUnless(hasattr(tasks, '_CTask'),
2603                     'requires the C _asyncio module')
2604@add_subclass_tests
2605class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2606
2607    Task = getattr(tasks, '_CTask', None)
2608    Future = futures._PyFuture
2609
2610
2611@unittest.skipUnless(hasattr(futures, '_CFuture'),
2612                     'requires the C _asyncio module')
2613@add_subclass_tests
2614class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
2615
2616    Future = getattr(futures, '_CFuture', None)
2617    Task = tasks._PyTask
2618
2619
2620@unittest.skipUnless(hasattr(tasks, '_CTask'),
2621                     'requires the C _asyncio module')
2622class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2623
2624    Task = getattr(tasks, '_CTask', None)
2625    Future = futures._PyFuture
2626
2627
2628@unittest.skipUnless(hasattr(futures, '_CFuture'),
2629                     'requires the C _asyncio module')
2630class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
2631
2632    Task = tasks._PyTask
2633    Future = getattr(futures, '_CFuture', None)
2634
2635
2636class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
2637                            test_utils.TestCase):
2638
2639    Task = tasks._PyTask
2640    Future = futures._PyFuture
2641
2642
2643@add_subclass_tests
2644class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2645    Task = tasks._PyTask
2646    Future = futures._PyFuture
2647
2648
2649@unittest.skipUnless(hasattr(tasks, '_CTask'),
2650                     'requires the C _asyncio module')
2651class CTask_Future_Tests(test_utils.TestCase):
2652
2653    def test_foobar(self):
2654        class Fut(asyncio.Future):
2655            @property
2656            def get_loop(self):
2657                raise AttributeError
2658
2659        async def coro():
2660            await fut
2661            return 'spam'
2662
2663        self.loop = asyncio.new_event_loop()
2664        try:
2665            fut = Fut(loop=self.loop)
2666            self.loop.call_later(0.1, fut.set_result, 1)
2667            task = self.loop.create_task(coro())
2668            res = self.loop.run_until_complete(task)
2669        finally:
2670            self.loop.close()
2671
2672        self.assertEqual(res, 'spam')
2673
2674
2675class BaseTaskIntrospectionTests:
2676    _register_task = None
2677    _unregister_task = None
2678    _enter_task = None
2679    _leave_task = None
2680
2681    def test__register_task_1(self):
2682        class TaskLike:
2683            @property
2684            def _loop(self):
2685                return loop
2686
2687            def done(self):
2688                return False
2689
2690        task = TaskLike()
2691        loop = mock.Mock()
2692
2693        self.assertEqual(asyncio.all_tasks(loop), set())
2694        self._register_task(task)
2695        self.assertEqual(asyncio.all_tasks(loop), {task})
2696        self._unregister_task(task)
2697
2698    def test__register_task_2(self):
2699        class TaskLike:
2700            def get_loop(self):
2701                return loop
2702
2703            def done(self):
2704                return False
2705
2706        task = TaskLike()
2707        loop = mock.Mock()
2708
2709        self.assertEqual(asyncio.all_tasks(loop), set())
2710        self._register_task(task)
2711        self.assertEqual(asyncio.all_tasks(loop), {task})
2712        self._unregister_task(task)
2713
2714    def test__register_task_3(self):
2715        class TaskLike:
2716            def get_loop(self):
2717                return loop
2718
2719            def done(self):
2720                return True
2721
2722        task = TaskLike()
2723        loop = mock.Mock()
2724
2725        self.assertEqual(asyncio.all_tasks(loop), set())
2726        self._register_task(task)
2727        self.assertEqual(asyncio.all_tasks(loop), set())
2728        with self.assertWarns(DeprecationWarning):
2729            self.assertEqual(asyncio.Task.all_tasks(loop), {task})
2730        self._unregister_task(task)
2731
2732    def test__enter_task(self):
2733        task = mock.Mock()
2734        loop = mock.Mock()
2735        self.assertIsNone(asyncio.current_task(loop))
2736        self._enter_task(loop, task)
2737        self.assertIs(asyncio.current_task(loop), task)
2738        self._leave_task(loop, task)
2739
2740    def test__enter_task_failure(self):
2741        task1 = mock.Mock()
2742        task2 = mock.Mock()
2743        loop = mock.Mock()
2744        self._enter_task(loop, task1)
2745        with self.assertRaises(RuntimeError):
2746            self._enter_task(loop, task2)
2747        self.assertIs(asyncio.current_task(loop), task1)
2748        self._leave_task(loop, task1)
2749
2750    def test__leave_task(self):
2751        task = mock.Mock()
2752        loop = mock.Mock()
2753        self._enter_task(loop, task)
2754        self._leave_task(loop, task)
2755        self.assertIsNone(asyncio.current_task(loop))
2756
2757    def test__leave_task_failure1(self):
2758        task1 = mock.Mock()
2759        task2 = mock.Mock()
2760        loop = mock.Mock()
2761        self._enter_task(loop, task1)
2762        with self.assertRaises(RuntimeError):
2763            self._leave_task(loop, task2)
2764        self.assertIs(asyncio.current_task(loop), task1)
2765        self._leave_task(loop, task1)
2766
2767    def test__leave_task_failure2(self):
2768        task = mock.Mock()
2769        loop = mock.Mock()
2770        with self.assertRaises(RuntimeError):
2771            self._leave_task(loop, task)
2772        self.assertIsNone(asyncio.current_task(loop))
2773
2774    def test__unregister_task(self):
2775        task = mock.Mock()
2776        loop = mock.Mock()
2777        task.get_loop = lambda: loop
2778        self._register_task(task)
2779        self._unregister_task(task)
2780        self.assertEqual(asyncio.all_tasks(loop), set())
2781
2782    def test__unregister_task_not_registered(self):
2783        task = mock.Mock()
2784        loop = mock.Mock()
2785        self._unregister_task(task)
2786        self.assertEqual(asyncio.all_tasks(loop), set())
2787
2788
2789class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
2790    _register_task = staticmethod(tasks._py_register_task)
2791    _unregister_task = staticmethod(tasks._py_unregister_task)
2792    _enter_task = staticmethod(tasks._py_enter_task)
2793    _leave_task = staticmethod(tasks._py_leave_task)
2794
2795
2796@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
2797                     'requires the C _asyncio module')
2798class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
2799    if hasattr(tasks, '_c_register_task'):
2800        _register_task = staticmethod(tasks._c_register_task)
2801        _unregister_task = staticmethod(tasks._c_unregister_task)
2802        _enter_task = staticmethod(tasks._c_enter_task)
2803        _leave_task = staticmethod(tasks._c_leave_task)
2804    else:
2805        _register_task = _unregister_task = _enter_task = _leave_task = None
2806
2807
2808class BaseCurrentLoopTests:
2809
2810    def setUp(self):
2811        super().setUp()
2812        self.loop = asyncio.new_event_loop()
2813        self.set_event_loop(self.loop)
2814
2815    def new_task(self, coro):
2816        raise NotImplementedError
2817
2818    def test_current_task_no_running_loop(self):
2819        self.assertIsNone(asyncio.current_task(loop=self.loop))
2820
2821    def test_current_task_no_running_loop_implicit(self):
2822        with self.assertRaises(RuntimeError):
2823            asyncio.current_task()
2824
2825    def test_current_task_with_implicit_loop(self):
2826        async def coro():
2827            self.assertIs(asyncio.current_task(loop=self.loop), task)
2828
2829            self.assertIs(asyncio.current_task(None), task)
2830            self.assertIs(asyncio.current_task(), task)
2831
2832        task = self.new_task(coro())
2833        self.loop.run_until_complete(task)
2834        self.assertIsNone(asyncio.current_task(loop=self.loop))
2835
2836
2837class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
2838
2839    def new_task(self, coro):
2840        return tasks._PyTask(coro, loop=self.loop)
2841
2842
2843@unittest.skipUnless(hasattr(tasks, '_CTask'),
2844                     'requires the C _asyncio module')
2845class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
2846
2847    def new_task(self, coro):
2848        return getattr(tasks, '_CTask')(coro, loop=self.loop)
2849
2850
2851class GenericTaskTests(test_utils.TestCase):
2852
2853    def test_future_subclass(self):
2854        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
2855
2856    def test_asyncio_module_compiled(self):
2857        # Because of circular imports it's easy to make _asyncio
2858        # module non-importable.  This is a simple test that will
2859        # fail on systems where C modules were successfully compiled
2860        # (hence the test for _functools), but _asyncio somehow didn't.
2861        try:
2862            import _functools
2863        except ImportError:
2864            pass
2865        else:
2866            try:
2867                import _asyncio
2868            except ImportError:
2869                self.fail('_asyncio module is missing')
2870
2871
2872class GatherTestsBase:
2873
2874    def setUp(self):
2875        super().setUp()
2876        self.one_loop = self.new_test_loop()
2877        self.other_loop = self.new_test_loop()
2878        self.set_event_loop(self.one_loop, cleanup=False)
2879
2880    def _run_loop(self, loop):
2881        while loop._ready:
2882            test_utils.run_briefly(loop)
2883
2884    def _check_success(self, **kwargs):
2885        a, b, c = [self.one_loop.create_future() for i in range(3)]
2886        fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
2887        cb = test_utils.MockCallback()
2888        fut.add_done_callback(cb)
2889        b.set_result(1)
2890        a.set_result(2)
2891        self._run_loop(self.one_loop)
2892        self.assertEqual(cb.called, False)
2893        self.assertFalse(fut.done())
2894        c.set_result(3)
2895        self._run_loop(self.one_loop)
2896        cb.assert_called_once_with(fut)
2897        self.assertEqual(fut.result(), [2, 1, 3])
2898
2899    def test_success(self):
2900        self._check_success()
2901        self._check_success(return_exceptions=False)
2902
2903    def test_result_exception_success(self):
2904        self._check_success(return_exceptions=True)
2905
2906    def test_one_exception(self):
2907        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
2908        fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
2909        cb = test_utils.MockCallback()
2910        fut.add_done_callback(cb)
2911        exc = ZeroDivisionError()
2912        a.set_result(1)
2913        b.set_exception(exc)
2914        self._run_loop(self.one_loop)
2915        self.assertTrue(fut.done())
2916        cb.assert_called_once_with(fut)
2917        self.assertIs(fut.exception(), exc)
2918        # Does nothing
2919        c.set_result(3)
2920        d.cancel()
2921        e.set_exception(RuntimeError())
2922        e.exception()
2923
2924    def test_return_exceptions(self):
2925        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
2926        fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
2927                             return_exceptions=True)
2928        cb = test_utils.MockCallback()
2929        fut.add_done_callback(cb)
2930        exc = ZeroDivisionError()
2931        exc2 = RuntimeError()
2932        b.set_result(1)
2933        c.set_exception(exc)
2934        a.set_result(3)
2935        self._run_loop(self.one_loop)
2936        self.assertFalse(fut.done())
2937        d.set_exception(exc2)
2938        self._run_loop(self.one_loop)
2939        self.assertTrue(fut.done())
2940        cb.assert_called_once_with(fut)
2941        self.assertEqual(fut.result(), [3, 1, exc, exc2])
2942
2943    def test_env_var_debug(self):
2944        code = '\n'.join((
2945            'import asyncio.coroutines',
2946            'print(asyncio.coroutines._DEBUG)'))
2947
2948        # Test with -E to not fail if the unit test was run with
2949        # PYTHONASYNCIODEBUG set to a non-empty string
2950        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
2951        self.assertEqual(stdout.rstrip(), b'False')
2952
2953        sts, stdout, stderr = assert_python_ok('-c', code,
2954                                               PYTHONASYNCIODEBUG='',
2955                                               PYTHONDEVMODE='')
2956        self.assertEqual(stdout.rstrip(), b'False')
2957
2958        sts, stdout, stderr = assert_python_ok('-c', code,
2959                                               PYTHONASYNCIODEBUG='1',
2960                                               PYTHONDEVMODE='')
2961        self.assertEqual(stdout.rstrip(), b'True')
2962
2963        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
2964                                               PYTHONASYNCIODEBUG='1',
2965                                               PYTHONDEVMODE='')
2966        self.assertEqual(stdout.rstrip(), b'False')
2967
2968        # -X dev
2969        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
2970                                               '-c', code)
2971        self.assertEqual(stdout.rstrip(), b'True')
2972
2973
2974class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
2975
2976    def wrap_futures(self, *futures):
2977        return futures
2978
2979    def _check_empty_sequence(self, seq_or_iter):
2980        asyncio.set_event_loop(self.one_loop)
2981        self.addCleanup(asyncio.set_event_loop, None)
2982        fut = asyncio.gather(*seq_or_iter)
2983        self.assertIsInstance(fut, asyncio.Future)
2984        self.assertIs(fut._loop, self.one_loop)
2985        self._run_loop(self.one_loop)
2986        self.assertTrue(fut.done())
2987        self.assertEqual(fut.result(), [])
2988        with self.assertWarns(DeprecationWarning):
2989            fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
2990        self.assertIs(fut._loop, self.other_loop)
2991
2992    def test_constructor_empty_sequence(self):
2993        self._check_empty_sequence([])
2994        self._check_empty_sequence(())
2995        self._check_empty_sequence(set())
2996        self._check_empty_sequence(iter(""))
2997
2998    def test_constructor_heterogenous_futures(self):
2999        fut1 = self.one_loop.create_future()
3000        fut2 = self.other_loop.create_future()
3001        with self.assertRaises(ValueError):
3002            asyncio.gather(fut1, fut2)
3003        with self.assertRaises(ValueError):
3004            asyncio.gather(fut1, loop=self.other_loop)
3005
3006    def test_constructor_homogenous_futures(self):
3007        children = [self.other_loop.create_future() for i in range(3)]
3008        fut = asyncio.gather(*children)
3009        self.assertIs(fut._loop, self.other_loop)
3010        self._run_loop(self.other_loop)
3011        self.assertFalse(fut.done())
3012        fut = asyncio.gather(*children, loop=self.other_loop)
3013        self.assertIs(fut._loop, self.other_loop)
3014        self._run_loop(self.other_loop)
3015        self.assertFalse(fut.done())
3016
3017    def test_one_cancellation(self):
3018        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3019        fut = asyncio.gather(a, b, c, d, e)
3020        cb = test_utils.MockCallback()
3021        fut.add_done_callback(cb)
3022        a.set_result(1)
3023        b.cancel()
3024        self._run_loop(self.one_loop)
3025        self.assertTrue(fut.done())
3026        cb.assert_called_once_with(fut)
3027        self.assertFalse(fut.cancelled())
3028        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3029        # Does nothing
3030        c.set_result(3)
3031        d.cancel()
3032        e.set_exception(RuntimeError())
3033        e.exception()
3034
3035    def test_result_exception_one_cancellation(self):
3036        a, b, c, d, e, f = [self.one_loop.create_future()
3037                            for i in range(6)]
3038        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3039        cb = test_utils.MockCallback()
3040        fut.add_done_callback(cb)
3041        a.set_result(1)
3042        zde = ZeroDivisionError()
3043        b.set_exception(zde)
3044        c.cancel()
3045        self._run_loop(self.one_loop)
3046        self.assertFalse(fut.done())
3047        d.set_result(3)
3048        e.cancel()
3049        rte = RuntimeError()
3050        f.set_exception(rte)
3051        res = self.one_loop.run_until_complete(fut)
3052        self.assertIsInstance(res[2], asyncio.CancelledError)
3053        self.assertIsInstance(res[4], asyncio.CancelledError)
3054        res[2] = res[4] = None
3055        self.assertEqual(res, [1, zde, None, 3, None, rte])
3056        cb.assert_called_once_with(fut)
3057
3058
3059class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3060
3061    def setUp(self):
3062        super().setUp()
3063        asyncio.set_event_loop(self.one_loop)
3064
3065    def wrap_futures(self, *futures):
3066        coros = []
3067        for fut in futures:
3068            async def coro(fut=fut):
3069                return await fut
3070            coros.append(coro())
3071        return coros
3072
3073    def test_constructor_loop_selection(self):
3074        async def coro():
3075            return 'abc'
3076        gen1 = coro()
3077        gen2 = coro()
3078        fut = asyncio.gather(gen1, gen2)
3079        self.assertIs(fut._loop, self.one_loop)
3080        self.one_loop.run_until_complete(fut)
3081
3082        self.set_event_loop(self.other_loop, cleanup=False)
3083        gen3 = coro()
3084        gen4 = coro()
3085        fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
3086        self.assertIs(fut2._loop, self.other_loop)
3087        self.other_loop.run_until_complete(fut2)
3088
3089    def test_duplicate_coroutines(self):
3090        with self.assertWarns(DeprecationWarning):
3091            @asyncio.coroutine
3092            def coro(s):
3093                return s
3094        c = coro('abc')
3095        fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
3096        self._run_loop(self.one_loop)
3097        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3098
3099    def test_cancellation_broadcast(self):
3100        # Cancelling outer() cancels all children.
3101        proof = 0
3102        waiter = self.one_loop.create_future()
3103
3104        async def inner():
3105            nonlocal proof
3106            await waiter
3107            proof += 1
3108
3109        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3110        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3111        gatherer = None
3112
3113        async def outer():
3114            nonlocal proof, gatherer
3115            gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
3116            await gatherer
3117            proof += 100
3118
3119        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3120        test_utils.run_briefly(self.one_loop)
3121        self.assertTrue(f.cancel())
3122        with self.assertRaises(asyncio.CancelledError):
3123            self.one_loop.run_until_complete(f)
3124        self.assertFalse(gatherer.cancel())
3125        self.assertTrue(waiter.cancelled())
3126        self.assertTrue(child1.cancelled())
3127        self.assertTrue(child2.cancelled())
3128        test_utils.run_briefly(self.one_loop)
3129        self.assertEqual(proof, 0)
3130
3131    def test_exception_marking(self):
3132        # Test for the first line marked "Mark exception retrieved."
3133
3134        async def inner(f):
3135            await f
3136            raise RuntimeError('should not be ignored')
3137
3138        a = self.one_loop.create_future()
3139        b = self.one_loop.create_future()
3140
3141        async def outer():
3142            await asyncio.gather(inner(a), inner(b), loop=self.one_loop)
3143
3144        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3145        test_utils.run_briefly(self.one_loop)
3146        a.set_result(None)
3147        test_utils.run_briefly(self.one_loop)
3148        b.set_result(None)
3149        test_utils.run_briefly(self.one_loop)
3150        self.assertIsInstance(f.exception(), RuntimeError)
3151
3152
3153class RunCoroutineThreadsafeTests(test_utils.TestCase):
3154    """Test case for asyncio.run_coroutine_threadsafe."""
3155
3156    def setUp(self):
3157        super().setUp()
3158        self.loop = asyncio.new_event_loop()
3159        self.set_event_loop(self.loop) # Will cleanup properly
3160
3161    async def add(self, a, b, fail=False, cancel=False):
3162        """Wait 0.05 second and return a + b."""
3163        await asyncio.sleep(0.05)
3164        if fail:
3165            raise RuntimeError("Fail!")
3166        if cancel:
3167            asyncio.current_task(self.loop).cancel()
3168            await asyncio.sleep(0)
3169        return a + b
3170
3171    def target(self, fail=False, cancel=False, timeout=None,
3172               advance_coro=False):
3173        """Run add coroutine in the event loop."""
3174        coro = self.add(1, 2, fail=fail, cancel=cancel)
3175        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3176        if advance_coro:
3177            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3178            # otherwise it spills errors and breaks **other** unittests, since
3179            # 'target' is interacting with threads.
3180
3181            # With this call, `coro` will be advanced, so that
3182            # CoroWrapper.__del__ won't do anything when asyncio tests run
3183            # in debug mode.
3184            self.loop.call_soon_threadsafe(coro.send, None)
3185        try:
3186            return future.result(timeout)
3187        finally:
3188            future.done() or future.cancel()
3189
3190    def test_run_coroutine_threadsafe(self):
3191        """Test coroutine submission from a thread to an event loop."""
3192        future = self.loop.run_in_executor(None, self.target)
3193        result = self.loop.run_until_complete(future)
3194        self.assertEqual(result, 3)
3195
3196    def test_run_coroutine_threadsafe_with_exception(self):
3197        """Test coroutine submission from a thread to an event loop
3198        when an exception is raised."""
3199        future = self.loop.run_in_executor(None, self.target, True)
3200        with self.assertRaises(RuntimeError) as exc_context:
3201            self.loop.run_until_complete(future)
3202        self.assertIn("Fail!", exc_context.exception.args)
3203
3204    def test_run_coroutine_threadsafe_with_timeout(self):
3205        """Test coroutine submission from a thread to an event loop
3206        when a timeout is raised."""
3207        callback = lambda: self.target(timeout=0)
3208        future = self.loop.run_in_executor(None, callback)
3209        with self.assertRaises(asyncio.TimeoutError):
3210            self.loop.run_until_complete(future)
3211        test_utils.run_briefly(self.loop)
3212        # Check that there's no pending task (add has been cancelled)
3213        for task in asyncio.all_tasks(self.loop):
3214            self.assertTrue(task.done())
3215
3216    def test_run_coroutine_threadsafe_task_cancelled(self):
3217        """Test coroutine submission from a tread to an event loop
3218        when the task is cancelled."""
3219        callback = lambda: self.target(cancel=True)
3220        future = self.loop.run_in_executor(None, callback)
3221        with self.assertRaises(asyncio.CancelledError):
3222            self.loop.run_until_complete(future)
3223
3224    def test_run_coroutine_threadsafe_task_factory_exception(self):
3225        """Test coroutine submission from a tread to an event loop
3226        when the task factory raise an exception."""
3227
3228        def task_factory(loop, coro):
3229            raise NameError
3230
3231        run = self.loop.run_in_executor(
3232            None, lambda: self.target(advance_coro=True))
3233
3234        # Set exception handler
3235        callback = test_utils.MockCallback()
3236        self.loop.set_exception_handler(callback)
3237
3238        # Set corrupted task factory
3239        self.loop.set_task_factory(task_factory)
3240
3241        # Run event loop
3242        with self.assertRaises(NameError) as exc_context:
3243            self.loop.run_until_complete(run)
3244
3245        # Check exceptions
3246        self.assertEqual(len(callback.call_args_list), 1)
3247        (loop, context), kwargs = callback.call_args
3248        self.assertEqual(context['exception'], exc_context.exception)
3249
3250
3251class SleepTests(test_utils.TestCase):
3252    def setUp(self):
3253        super().setUp()
3254        self.loop = asyncio.new_event_loop()
3255        self.set_event_loop(self.loop)
3256
3257    def tearDown(self):
3258        self.loop.close()
3259        self.loop = None
3260        super().tearDown()
3261
3262    def test_sleep_zero(self):
3263        result = 0
3264
3265        def inc_result(num):
3266            nonlocal result
3267            result += num
3268
3269        async def coro():
3270            self.loop.call_soon(inc_result, 1)
3271            self.assertEqual(result, 0)
3272            num = await asyncio.sleep(0, result=10)
3273            self.assertEqual(result, 1) # inc'ed by call_soon
3274            inc_result(num) # num should be 11
3275
3276        self.loop.run_until_complete(coro())
3277        self.assertEqual(result, 11)
3278
3279    def test_loop_argument_is_deprecated(self):
3280        # Remove test when loop argument is removed in Python 3.10
3281        with self.assertWarns(DeprecationWarning):
3282            self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
3283
3284
3285class WaitTests(test_utils.TestCase):
3286    def setUp(self):
3287        super().setUp()
3288        self.loop = asyncio.new_event_loop()
3289        self.set_event_loop(self.loop)
3290
3291    def tearDown(self):
3292        self.loop.close()
3293        self.loop = None
3294        super().tearDown()
3295
3296    def test_loop_argument_is_deprecated_in_wait(self):
3297        # Remove test when loop argument is removed in Python 3.10
3298        with self.assertWarns(DeprecationWarning):
3299            self.loop.run_until_complete(
3300                asyncio.wait([coroutine_function()], loop=self.loop))
3301
3302    def test_loop_argument_is_deprecated_in_wait_for(self):
3303        # Remove test when loop argument is removed in Python 3.10
3304        with self.assertWarns(DeprecationWarning):
3305            self.loop.run_until_complete(
3306                asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop))
3307
3308
3309class CompatibilityTests(test_utils.TestCase):
3310    # Tests for checking a bridge between old-styled coroutines
3311    # and async/await syntax
3312
3313    def setUp(self):
3314        super().setUp()
3315        self.loop = asyncio.new_event_loop()
3316        self.set_event_loop(self.loop)
3317
3318    def tearDown(self):
3319        self.loop.close()
3320        self.loop = None
3321        super().tearDown()
3322
3323    def test_yield_from_awaitable(self):
3324
3325        with self.assertWarns(DeprecationWarning):
3326            @asyncio.coroutine
3327            def coro():
3328                yield from asyncio.sleep(0)
3329                return 'ok'
3330
3331        result = self.loop.run_until_complete(coro())
3332        self.assertEqual('ok', result)
3333
3334    def test_await_old_style_coro(self):
3335
3336        with self.assertWarns(DeprecationWarning):
3337            @asyncio.coroutine
3338            def coro1():
3339                return 'ok1'
3340
3341        with self.assertWarns(DeprecationWarning):
3342            @asyncio.coroutine
3343            def coro2():
3344                yield from asyncio.sleep(0)
3345                return 'ok2'
3346
3347        async def inner():
3348            return await asyncio.gather(coro1(), coro2(), loop=self.loop)
3349
3350        result = self.loop.run_until_complete(inner())
3351        self.assertEqual(['ok1', 'ok2'], result)
3352
3353    def test_debug_mode_interop(self):
3354        # https://bugs.python.org/issue32636
3355        code = textwrap.dedent("""
3356            import asyncio
3357
3358            async def native_coro():
3359                pass
3360
3361            @asyncio.coroutine
3362            def old_style_coro():
3363                yield from native_coro()
3364
3365            asyncio.run(old_style_coro())
3366        """)
3367
3368        assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
3369                         PYTHONASYNCIODEBUG="1")
3370
3371
3372if __name__ == '__main__':
3373    unittest.main()
3374