1# -*- coding: utf-8 -*- 2 3""" 4Test suite for PEP 380 implementation 5 6adapted from original tests written by Greg Ewing 7see <http://www.cosc.canterbury.ac.nz/greg.ewing/python/yield-from/YieldFrom-Python3.1.2-rev5.zip> 8""" 9 10import unittest 11import inspect 12 13from test.support import captured_stderr, disable_gc, gc_collect 14from test import support 15 16class TestPEP380Operation(unittest.TestCase): 17 """ 18 Test semantics. 19 """ 20 21 def test_delegation_of_initial_next_to_subgenerator(self): 22 """ 23 Test delegation of initial next() call to subgenerator 24 """ 25 trace = [] 26 def g1(): 27 trace.append("Starting g1") 28 yield from g2() 29 trace.append("Finishing g1") 30 def g2(): 31 trace.append("Starting g2") 32 yield 42 33 trace.append("Finishing g2") 34 for x in g1(): 35 trace.append("Yielded %s" % (x,)) 36 self.assertEqual(trace,[ 37 "Starting g1", 38 "Starting g2", 39 "Yielded 42", 40 "Finishing g2", 41 "Finishing g1", 42 ]) 43 44 def test_raising_exception_in_initial_next_call(self): 45 """ 46 Test raising exception in initial next() call 47 """ 48 trace = [] 49 def g1(): 50 try: 51 trace.append("Starting g1") 52 yield from g2() 53 finally: 54 trace.append("Finishing g1") 55 def g2(): 56 try: 57 trace.append("Starting g2") 58 raise ValueError("spanish inquisition occurred") 59 finally: 60 trace.append("Finishing g2") 61 try: 62 for x in g1(): 63 trace.append("Yielded %s" % (x,)) 64 except ValueError as e: 65 self.assertEqual(e.args[0], "spanish inquisition occurred") 66 else: 67 self.fail("subgenerator failed to raise ValueError") 68 self.assertEqual(trace,[ 69 "Starting g1", 70 "Starting g2", 71 "Finishing g2", 72 "Finishing g1", 73 ]) 74 75 def test_delegation_of_next_call_to_subgenerator(self): 76 """ 77 Test delegation of next() call to subgenerator 78 """ 79 trace = [] 80 def g1(): 81 trace.append("Starting g1") 82 yield "g1 ham" 83 yield from g2() 84 yield "g1 eggs" 85 trace.append("Finishing g1") 86 def g2(): 87 trace.append("Starting g2") 88 yield "g2 spam" 89 yield "g2 more spam" 90 trace.append("Finishing g2") 91 for x in g1(): 92 trace.append("Yielded %s" % (x,)) 93 self.assertEqual(trace,[ 94 "Starting g1", 95 "Yielded g1 ham", 96 "Starting g2", 97 "Yielded g2 spam", 98 "Yielded g2 more spam", 99 "Finishing g2", 100 "Yielded g1 eggs", 101 "Finishing g1", 102 ]) 103 104 def test_raising_exception_in_delegated_next_call(self): 105 """ 106 Test raising exception in delegated next() call 107 """ 108 trace = [] 109 def g1(): 110 try: 111 trace.append("Starting g1") 112 yield "g1 ham" 113 yield from g2() 114 yield "g1 eggs" 115 finally: 116 trace.append("Finishing g1") 117 def g2(): 118 try: 119 trace.append("Starting g2") 120 yield "g2 spam" 121 raise ValueError("hovercraft is full of eels") 122 yield "g2 more spam" 123 finally: 124 trace.append("Finishing g2") 125 try: 126 for x in g1(): 127 trace.append("Yielded %s" % (x,)) 128 except ValueError as e: 129 self.assertEqual(e.args[0], "hovercraft is full of eels") 130 else: 131 self.fail("subgenerator failed to raise ValueError") 132 self.assertEqual(trace,[ 133 "Starting g1", 134 "Yielded g1 ham", 135 "Starting g2", 136 "Yielded g2 spam", 137 "Finishing g2", 138 "Finishing g1", 139 ]) 140 141 def test_delegation_of_send(self): 142 """ 143 Test delegation of send() 144 """ 145 trace = [] 146 def g1(): 147 trace.append("Starting g1") 148 x = yield "g1 ham" 149 trace.append("g1 received %s" % (x,)) 150 yield from g2() 151 x = yield "g1 eggs" 152 trace.append("g1 received %s" % (x,)) 153 trace.append("Finishing g1") 154 def g2(): 155 trace.append("Starting g2") 156 x = yield "g2 spam" 157 trace.append("g2 received %s" % (x,)) 158 x = yield "g2 more spam" 159 trace.append("g2 received %s" % (x,)) 160 trace.append("Finishing g2") 161 g = g1() 162 y = next(g) 163 x = 1 164 try: 165 while 1: 166 y = g.send(x) 167 trace.append("Yielded %s" % (y,)) 168 x += 1 169 except StopIteration: 170 pass 171 self.assertEqual(trace,[ 172 "Starting g1", 173 "g1 received 1", 174 "Starting g2", 175 "Yielded g2 spam", 176 "g2 received 2", 177 "Yielded g2 more spam", 178 "g2 received 3", 179 "Finishing g2", 180 "Yielded g1 eggs", 181 "g1 received 4", 182 "Finishing g1", 183 ]) 184 185 def test_handling_exception_while_delegating_send(self): 186 """ 187 Test handling exception while delegating 'send' 188 """ 189 trace = [] 190 def g1(): 191 trace.append("Starting g1") 192 x = yield "g1 ham" 193 trace.append("g1 received %s" % (x,)) 194 yield from g2() 195 x = yield "g1 eggs" 196 trace.append("g1 received %s" % (x,)) 197 trace.append("Finishing g1") 198 def g2(): 199 trace.append("Starting g2") 200 x = yield "g2 spam" 201 trace.append("g2 received %s" % (x,)) 202 raise ValueError("hovercraft is full of eels") 203 x = yield "g2 more spam" 204 trace.append("g2 received %s" % (x,)) 205 trace.append("Finishing g2") 206 def run(): 207 g = g1() 208 y = next(g) 209 x = 1 210 try: 211 while 1: 212 y = g.send(x) 213 trace.append("Yielded %s" % (y,)) 214 x += 1 215 except StopIteration: 216 trace.append("StopIteration") 217 self.assertRaises(ValueError,run) 218 self.assertEqual(trace,[ 219 "Starting g1", 220 "g1 received 1", 221 "Starting g2", 222 "Yielded g2 spam", 223 "g2 received 2", 224 ]) 225 226 def test_delegating_close(self): 227 """ 228 Test delegating 'close' 229 """ 230 trace = [] 231 def g1(): 232 try: 233 trace.append("Starting g1") 234 yield "g1 ham" 235 yield from g2() 236 yield "g1 eggs" 237 finally: 238 trace.append("Finishing g1") 239 def g2(): 240 try: 241 trace.append("Starting g2") 242 yield "g2 spam" 243 yield "g2 more spam" 244 finally: 245 trace.append("Finishing g2") 246 g = g1() 247 for i in range(2): 248 x = next(g) 249 trace.append("Yielded %s" % (x,)) 250 g.close() 251 self.assertEqual(trace,[ 252 "Starting g1", 253 "Yielded g1 ham", 254 "Starting g2", 255 "Yielded g2 spam", 256 "Finishing g2", 257 "Finishing g1" 258 ]) 259 260 def test_handing_exception_while_delegating_close(self): 261 """ 262 Test handling exception while delegating 'close' 263 """ 264 trace = [] 265 def g1(): 266 try: 267 trace.append("Starting g1") 268 yield "g1 ham" 269 yield from g2() 270 yield "g1 eggs" 271 finally: 272 trace.append("Finishing g1") 273 def g2(): 274 try: 275 trace.append("Starting g2") 276 yield "g2 spam" 277 yield "g2 more spam" 278 finally: 279 trace.append("Finishing g2") 280 raise ValueError("nybbles have exploded with delight") 281 try: 282 g = g1() 283 for i in range(2): 284 x = next(g) 285 trace.append("Yielded %s" % (x,)) 286 g.close() 287 except ValueError as e: 288 self.assertEqual(e.args[0], "nybbles have exploded with delight") 289 self.assertIsInstance(e.__context__, GeneratorExit) 290 else: 291 self.fail("subgenerator failed to raise ValueError") 292 self.assertEqual(trace,[ 293 "Starting g1", 294 "Yielded g1 ham", 295 "Starting g2", 296 "Yielded g2 spam", 297 "Finishing g2", 298 "Finishing g1", 299 ]) 300 301 def test_delegating_throw(self): 302 """ 303 Test delegating 'throw' 304 """ 305 trace = [] 306 def g1(): 307 try: 308 trace.append("Starting g1") 309 yield "g1 ham" 310 yield from g2() 311 yield "g1 eggs" 312 finally: 313 trace.append("Finishing g1") 314 def g2(): 315 try: 316 trace.append("Starting g2") 317 yield "g2 spam" 318 yield "g2 more spam" 319 finally: 320 trace.append("Finishing g2") 321 try: 322 g = g1() 323 for i in range(2): 324 x = next(g) 325 trace.append("Yielded %s" % (x,)) 326 e = ValueError("tomato ejected") 327 g.throw(e) 328 except ValueError as e: 329 self.assertEqual(e.args[0], "tomato ejected") 330 else: 331 self.fail("subgenerator failed to raise ValueError") 332 self.assertEqual(trace,[ 333 "Starting g1", 334 "Yielded g1 ham", 335 "Starting g2", 336 "Yielded g2 spam", 337 "Finishing g2", 338 "Finishing g1", 339 ]) 340 341 def test_value_attribute_of_StopIteration_exception(self): 342 """ 343 Test 'value' attribute of StopIteration exception 344 """ 345 trace = [] 346 def pex(e): 347 trace.append("%s: %s" % (e.__class__.__name__, e)) 348 trace.append("value = %s" % (e.value,)) 349 e = StopIteration() 350 pex(e) 351 e = StopIteration("spam") 352 pex(e) 353 e.value = "eggs" 354 pex(e) 355 self.assertEqual(trace,[ 356 "StopIteration: ", 357 "value = None", 358 "StopIteration: spam", 359 "value = spam", 360 "StopIteration: spam", 361 "value = eggs", 362 ]) 363 364 365 def test_exception_value_crash(self): 366 # There used to be a refcount error when the return value 367 # stored in the StopIteration has a refcount of 1. 368 def g1(): 369 yield from g2() 370 def g2(): 371 yield "g2" 372 return [42] 373 self.assertEqual(list(g1()), ["g2"]) 374 375 376 def test_generator_return_value(self): 377 """ 378 Test generator return value 379 """ 380 trace = [] 381 def g1(): 382 trace.append("Starting g1") 383 yield "g1 ham" 384 ret = yield from g2() 385 trace.append("g2 returned %r" % (ret,)) 386 for v in 1, (2,), StopIteration(3): 387 ret = yield from g2(v) 388 trace.append("g2 returned %r" % (ret,)) 389 yield "g1 eggs" 390 trace.append("Finishing g1") 391 def g2(v = None): 392 trace.append("Starting g2") 393 yield "g2 spam" 394 yield "g2 more spam" 395 trace.append("Finishing g2") 396 if v: 397 return v 398 for x in g1(): 399 trace.append("Yielded %s" % (x,)) 400 self.assertEqual(trace,[ 401 "Starting g1", 402 "Yielded g1 ham", 403 "Starting g2", 404 "Yielded g2 spam", 405 "Yielded g2 more spam", 406 "Finishing g2", 407 "g2 returned None", 408 "Starting g2", 409 "Yielded g2 spam", 410 "Yielded g2 more spam", 411 "Finishing g2", 412 "g2 returned 1", 413 "Starting g2", 414 "Yielded g2 spam", 415 "Yielded g2 more spam", 416 "Finishing g2", 417 "g2 returned (2,)", 418 "Starting g2", 419 "Yielded g2 spam", 420 "Yielded g2 more spam", 421 "Finishing g2", 422 "g2 returned StopIteration(3)", 423 "Yielded g1 eggs", 424 "Finishing g1", 425 ]) 426 427 def test_delegation_of_next_to_non_generator(self): 428 """ 429 Test delegation of next() to non-generator 430 """ 431 trace = [] 432 def g(): 433 yield from range(3) 434 for x in g(): 435 trace.append("Yielded %s" % (x,)) 436 self.assertEqual(trace,[ 437 "Yielded 0", 438 "Yielded 1", 439 "Yielded 2", 440 ]) 441 442 443 def test_conversion_of_sendNone_to_next(self): 444 """ 445 Test conversion of send(None) to next() 446 """ 447 trace = [] 448 def g(): 449 yield from range(3) 450 gi = g() 451 for x in range(3): 452 y = gi.send(None) 453 trace.append("Yielded: %s" % (y,)) 454 self.assertEqual(trace,[ 455 "Yielded: 0", 456 "Yielded: 1", 457 "Yielded: 2", 458 ]) 459 460 def test_delegation_of_close_to_non_generator(self): 461 """ 462 Test delegation of close() to non-generator 463 """ 464 trace = [] 465 def g(): 466 try: 467 trace.append("starting g") 468 yield from range(3) 469 trace.append("g should not be here") 470 finally: 471 trace.append("finishing g") 472 gi = g() 473 next(gi) 474 with captured_stderr() as output: 475 gi.close() 476 self.assertEqual(output.getvalue(), '') 477 self.assertEqual(trace,[ 478 "starting g", 479 "finishing g", 480 ]) 481 482 def test_delegating_throw_to_non_generator(self): 483 """ 484 Test delegating 'throw' to non-generator 485 """ 486 trace = [] 487 def g(): 488 try: 489 trace.append("Starting g") 490 yield from range(10) 491 finally: 492 trace.append("Finishing g") 493 try: 494 gi = g() 495 for i in range(5): 496 x = next(gi) 497 trace.append("Yielded %s" % (x,)) 498 e = ValueError("tomato ejected") 499 gi.throw(e) 500 except ValueError as e: 501 self.assertEqual(e.args[0],"tomato ejected") 502 else: 503 self.fail("subgenerator failed to raise ValueError") 504 self.assertEqual(trace,[ 505 "Starting g", 506 "Yielded 0", 507 "Yielded 1", 508 "Yielded 2", 509 "Yielded 3", 510 "Yielded 4", 511 "Finishing g", 512 ]) 513 514 def test_attempting_to_send_to_non_generator(self): 515 """ 516 Test attempting to send to non-generator 517 """ 518 trace = [] 519 def g(): 520 try: 521 trace.append("starting g") 522 yield from range(3) 523 trace.append("g should not be here") 524 finally: 525 trace.append("finishing g") 526 try: 527 gi = g() 528 next(gi) 529 for x in range(3): 530 y = gi.send(42) 531 trace.append("Should not have yielded: %s" % (y,)) 532 except AttributeError as e: 533 self.assertIn("send", e.args[0]) 534 else: 535 self.fail("was able to send into non-generator") 536 self.assertEqual(trace,[ 537 "starting g", 538 "finishing g", 539 ]) 540 541 def test_broken_getattr_handling(self): 542 """ 543 Test subiterator with a broken getattr implementation 544 """ 545 class Broken: 546 def __iter__(self): 547 return self 548 def __next__(self): 549 return 1 550 def __getattr__(self, attr): 551 1/0 552 553 def g(): 554 yield from Broken() 555 556 with self.assertRaises(ZeroDivisionError): 557 gi = g() 558 self.assertEqual(next(gi), 1) 559 gi.send(1) 560 561 with self.assertRaises(ZeroDivisionError): 562 gi = g() 563 self.assertEqual(next(gi), 1) 564 gi.throw(AttributeError) 565 566 with support.catch_unraisable_exception() as cm: 567 gi = g() 568 self.assertEqual(next(gi), 1) 569 gi.close() 570 571 self.assertEqual(ZeroDivisionError, cm.unraisable.exc_type) 572 573 def test_exception_in_initial_next_call(self): 574 """ 575 Test exception in initial next() call 576 """ 577 trace = [] 578 def g1(): 579 trace.append("g1 about to yield from g2") 580 yield from g2() 581 trace.append("g1 should not be here") 582 def g2(): 583 yield 1/0 584 def run(): 585 gi = g1() 586 next(gi) 587 self.assertRaises(ZeroDivisionError,run) 588 self.assertEqual(trace,[ 589 "g1 about to yield from g2" 590 ]) 591 592 def test_attempted_yield_from_loop(self): 593 """ 594 Test attempted yield-from loop 595 """ 596 trace = [] 597 def g1(): 598 trace.append("g1: starting") 599 yield "y1" 600 trace.append("g1: about to yield from g2") 601 yield from g2() 602 trace.append("g1 should not be here") 603 604 def g2(): 605 trace.append("g2: starting") 606 yield "y2" 607 trace.append("g2: about to yield from g1") 608 yield from gi 609 trace.append("g2 should not be here") 610 try: 611 gi = g1() 612 for y in gi: 613 trace.append("Yielded: %s" % (y,)) 614 except ValueError as e: 615 self.assertEqual(e.args[0],"generator already executing") 616 else: 617 self.fail("subgenerator didn't raise ValueError") 618 self.assertEqual(trace,[ 619 "g1: starting", 620 "Yielded: y1", 621 "g1: about to yield from g2", 622 "g2: starting", 623 "Yielded: y2", 624 "g2: about to yield from g1", 625 ]) 626 627 def test_returning_value_from_delegated_throw(self): 628 """ 629 Test returning value from delegated 'throw' 630 """ 631 trace = [] 632 def g1(): 633 try: 634 trace.append("Starting g1") 635 yield "g1 ham" 636 yield from g2() 637 yield "g1 eggs" 638 finally: 639 trace.append("Finishing g1") 640 def g2(): 641 try: 642 trace.append("Starting g2") 643 yield "g2 spam" 644 yield "g2 more spam" 645 except LunchError: 646 trace.append("Caught LunchError in g2") 647 yield "g2 lunch saved" 648 yield "g2 yet more spam" 649 class LunchError(Exception): 650 pass 651 g = g1() 652 for i in range(2): 653 x = next(g) 654 trace.append("Yielded %s" % (x,)) 655 e = LunchError("tomato ejected") 656 g.throw(e) 657 for x in g: 658 trace.append("Yielded %s" % (x,)) 659 self.assertEqual(trace,[ 660 "Starting g1", 661 "Yielded g1 ham", 662 "Starting g2", 663 "Yielded g2 spam", 664 "Caught LunchError in g2", 665 "Yielded g2 yet more spam", 666 "Yielded g1 eggs", 667 "Finishing g1", 668 ]) 669 670 def test_next_and_return_with_value(self): 671 """ 672 Test next and return with value 673 """ 674 trace = [] 675 def f(r): 676 gi = g(r) 677 next(gi) 678 try: 679 trace.append("f resuming g") 680 next(gi) 681 trace.append("f SHOULD NOT BE HERE") 682 except StopIteration as e: 683 trace.append("f caught %r" % (e,)) 684 def g(r): 685 trace.append("g starting") 686 yield 687 trace.append("g returning %r" % (r,)) 688 return r 689 f(None) 690 f(1) 691 f((2,)) 692 f(StopIteration(3)) 693 self.assertEqual(trace,[ 694 "g starting", 695 "f resuming g", 696 "g returning None", 697 "f caught StopIteration()", 698 "g starting", 699 "f resuming g", 700 "g returning 1", 701 "f caught StopIteration(1)", 702 "g starting", 703 "f resuming g", 704 "g returning (2,)", 705 "f caught StopIteration((2,))", 706 "g starting", 707 "f resuming g", 708 "g returning StopIteration(3)", 709 "f caught StopIteration(StopIteration(3))", 710 ]) 711 712 def test_send_and_return_with_value(self): 713 """ 714 Test send and return with value 715 """ 716 trace = [] 717 def f(r): 718 gi = g(r) 719 next(gi) 720 try: 721 trace.append("f sending spam to g") 722 gi.send("spam") 723 trace.append("f SHOULD NOT BE HERE") 724 except StopIteration as e: 725 trace.append("f caught %r" % (e,)) 726 def g(r): 727 trace.append("g starting") 728 x = yield 729 trace.append("g received %r" % (x,)) 730 trace.append("g returning %r" % (r,)) 731 return r 732 f(None) 733 f(1) 734 f((2,)) 735 f(StopIteration(3)) 736 self.assertEqual(trace, [ 737 "g starting", 738 "f sending spam to g", 739 "g received 'spam'", 740 "g returning None", 741 "f caught StopIteration()", 742 "g starting", 743 "f sending spam to g", 744 "g received 'spam'", 745 "g returning 1", 746 'f caught StopIteration(1)', 747 'g starting', 748 'f sending spam to g', 749 "g received 'spam'", 750 'g returning (2,)', 751 'f caught StopIteration((2,))', 752 'g starting', 753 'f sending spam to g', 754 "g received 'spam'", 755 'g returning StopIteration(3)', 756 'f caught StopIteration(StopIteration(3))' 757 ]) 758 759 def test_catching_exception_from_subgen_and_returning(self): 760 """ 761 Test catching an exception thrown into a 762 subgenerator and returning a value 763 """ 764 def inner(): 765 try: 766 yield 1 767 except ValueError: 768 trace.append("inner caught ValueError") 769 return value 770 771 def outer(): 772 v = yield from inner() 773 trace.append("inner returned %r to outer" % (v,)) 774 yield v 775 776 for value in 2, (2,), StopIteration(2): 777 trace = [] 778 g = outer() 779 trace.append(next(g)) 780 trace.append(repr(g.throw(ValueError))) 781 self.assertEqual(trace, [ 782 1, 783 "inner caught ValueError", 784 "inner returned %r to outer" % (value,), 785 repr(value), 786 ]) 787 788 def test_throwing_GeneratorExit_into_subgen_that_returns(self): 789 """ 790 Test throwing GeneratorExit into a subgenerator that 791 catches it and returns normally. 792 """ 793 trace = [] 794 def f(): 795 try: 796 trace.append("Enter f") 797 yield 798 trace.append("Exit f") 799 except GeneratorExit: 800 return 801 def g(): 802 trace.append("Enter g") 803 yield from f() 804 trace.append("Exit g") 805 try: 806 gi = g() 807 next(gi) 808 gi.throw(GeneratorExit) 809 except GeneratorExit: 810 pass 811 else: 812 self.fail("subgenerator failed to raise GeneratorExit") 813 self.assertEqual(trace,[ 814 "Enter g", 815 "Enter f", 816 ]) 817 818 def test_throwing_GeneratorExit_into_subgenerator_that_yields(self): 819 """ 820 Test throwing GeneratorExit into a subgenerator that 821 catches it and yields. 822 """ 823 trace = [] 824 def f(): 825 try: 826 trace.append("Enter f") 827 yield 828 trace.append("Exit f") 829 except GeneratorExit: 830 yield 831 def g(): 832 trace.append("Enter g") 833 yield from f() 834 trace.append("Exit g") 835 try: 836 gi = g() 837 next(gi) 838 gi.throw(GeneratorExit) 839 except RuntimeError as e: 840 self.assertEqual(e.args[0], "generator ignored GeneratorExit") 841 else: 842 self.fail("subgenerator failed to raise GeneratorExit") 843 self.assertEqual(trace,[ 844 "Enter g", 845 "Enter f", 846 ]) 847 848 def test_throwing_GeneratorExit_into_subgen_that_raises(self): 849 """ 850 Test throwing GeneratorExit into a subgenerator that 851 catches it and raises a different exception. 852 """ 853 trace = [] 854 def f(): 855 try: 856 trace.append("Enter f") 857 yield 858 trace.append("Exit f") 859 except GeneratorExit: 860 raise ValueError("Vorpal bunny encountered") 861 def g(): 862 trace.append("Enter g") 863 yield from f() 864 trace.append("Exit g") 865 try: 866 gi = g() 867 next(gi) 868 gi.throw(GeneratorExit) 869 except ValueError as e: 870 self.assertEqual(e.args[0], "Vorpal bunny encountered") 871 self.assertIsInstance(e.__context__, GeneratorExit) 872 else: 873 self.fail("subgenerator failed to raise ValueError") 874 self.assertEqual(trace,[ 875 "Enter g", 876 "Enter f", 877 ]) 878 879 def test_yield_from_empty(self): 880 def g(): 881 yield from () 882 self.assertRaises(StopIteration, next, g()) 883 884 def test_delegating_generators_claim_to_be_running(self): 885 # Check with basic iteration 886 def one(): 887 yield 0 888 yield from two() 889 yield 3 890 def two(): 891 yield 1 892 try: 893 yield from g1 894 except ValueError: 895 pass 896 yield 2 897 g1 = one() 898 self.assertEqual(list(g1), [0, 1, 2, 3]) 899 # Check with send 900 g1 = one() 901 res = [next(g1)] 902 try: 903 while True: 904 res.append(g1.send(42)) 905 except StopIteration: 906 pass 907 self.assertEqual(res, [0, 1, 2, 3]) 908 # Check with throw 909 class MyErr(Exception): 910 pass 911 def one(): 912 try: 913 yield 0 914 except MyErr: 915 pass 916 yield from two() 917 try: 918 yield 3 919 except MyErr: 920 pass 921 def two(): 922 try: 923 yield 1 924 except MyErr: 925 pass 926 try: 927 yield from g1 928 except ValueError: 929 pass 930 try: 931 yield 2 932 except MyErr: 933 pass 934 g1 = one() 935 res = [next(g1)] 936 try: 937 while True: 938 res.append(g1.throw(MyErr)) 939 except StopIteration: 940 pass 941 except: 942 self.assertEqual(res, [0, 1, 2, 3]) 943 raise 944 # Check with close 945 class MyIt(object): 946 def __iter__(self): 947 return self 948 def __next__(self): 949 return 42 950 def close(self_): 951 self.assertTrue(g1.gi_running) 952 self.assertRaises(ValueError, next, g1) 953 def one(): 954 yield from MyIt() 955 g1 = one() 956 next(g1) 957 g1.close() 958 959 def test_delegator_is_visible_to_debugger(self): 960 def call_stack(): 961 return [f[3] for f in inspect.stack()] 962 963 def gen(): 964 yield call_stack() 965 yield call_stack() 966 yield call_stack() 967 968 def spam(g): 969 yield from g 970 971 def eggs(g): 972 yield from g 973 974 for stack in spam(gen()): 975 self.assertTrue('spam' in stack) 976 977 for stack in spam(eggs(gen())): 978 self.assertTrue('spam' in stack and 'eggs' in stack) 979 980 def test_custom_iterator_return(self): 981 # See issue #15568 982 class MyIter: 983 def __iter__(self): 984 return self 985 def __next__(self): 986 raise StopIteration(42) 987 def gen(): 988 nonlocal ret 989 ret = yield from MyIter() 990 ret = None 991 list(gen()) 992 self.assertEqual(ret, 42) 993 994 def test_close_with_cleared_frame(self): 995 # See issue #17669. 996 # 997 # Create a stack of generators: outer() delegating to inner() 998 # delegating to innermost(). The key point is that the instance of 999 # inner is created first: this ensures that its frame appears before 1000 # the instance of outer in the GC linked list. 1001 # 1002 # At the gc.collect call: 1003 # - frame_clear is called on the inner_gen frame. 1004 # - gen_dealloc is called on the outer_gen generator (the only 1005 # reference is in the frame's locals). 1006 # - gen_close is called on the outer_gen generator. 1007 # - gen_close_iter is called to close the inner_gen generator, which 1008 # in turn calls gen_close, and gen_yf. 1009 # 1010 # Previously, gen_yf would crash since inner_gen's frame had been 1011 # cleared (and in particular f_stacktop was NULL). 1012 1013 def innermost(): 1014 yield 1015 def inner(): 1016 outer_gen = yield 1017 yield from innermost() 1018 def outer(): 1019 inner_gen = yield 1020 yield from inner_gen 1021 1022 with disable_gc(): 1023 inner_gen = inner() 1024 outer_gen = outer() 1025 outer_gen.send(None) 1026 outer_gen.send(inner_gen) 1027 outer_gen.send(outer_gen) 1028 1029 del outer_gen 1030 del inner_gen 1031 gc_collect() 1032 1033 def test_send_tuple_with_custom_generator(self): 1034 # See issue #21209. 1035 class MyGen: 1036 def __iter__(self): 1037 return self 1038 def __next__(self): 1039 return 42 1040 def send(self, what): 1041 nonlocal v 1042 v = what 1043 return None 1044 def outer(): 1045 v = yield from MyGen() 1046 g = outer() 1047 next(g) 1048 v = None 1049 g.send((1, 2, 3, 4)) 1050 self.assertEqual(v, (1, 2, 3, 4)) 1051 1052 1053if __name__ == '__main__': 1054 unittest.main() 1055