1import os 2import subprocess 3import sys 4import threading 5import functools 6import contextlib 7import logging 8import re 9import time 10import gc 11import traceback 12from StringIO import StringIO 13from test import test_support 14 15from concurrent import futures 16from concurrent.futures._base import ( 17 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) 18from concurrent.futures.thread import cpu_count 19 20try: 21 import unittest2 as unittest 22except ImportError: 23 import unittest 24 25 26def reap_threads(func): 27 """Use this function when threads are being used. This will 28 ensure that the threads are cleaned up even when the test fails. 29 If threading is unavailable this function does nothing. 30 """ 31 @functools.wraps(func) 32 def decorator(*args): 33 key = test_support.threading_setup() 34 try: 35 return func(*args) 36 finally: 37 test_support.threading_cleanup(*key) 38 return decorator 39 40 41# Executing the interpreter in a subprocess 42def _assert_python(expected_success, *args, **env_vars): 43 cmd_line = [sys.executable] 44 if not env_vars: 45 cmd_line.append('-E') 46 # Need to preserve the original environment, for in-place testing of 47 # shared library builds. 48 env = os.environ.copy() 49 # But a special flag that can be set to override -- in this case, the 50 # caller is responsible to pass the full environment. 51 if env_vars.pop('__cleanenv', None): 52 env = {} 53 env.update(env_vars) 54 cmd_line.extend(args) 55 p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 56 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 57 env=env) 58 try: 59 out, err = p.communicate() 60 finally: 61 subprocess._cleanup() 62 p.stdout.close() 63 p.stderr.close() 64 rc = p.returncode 65 err = strip_python_stderr(err) 66 if (rc and expected_success) or (not rc and not expected_success): 67 raise AssertionError( 68 "Process return code is %d, " 69 "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore'))) 70 return rc, out, err 71 72 73def assert_python_ok(*args, **env_vars): 74 """ 75 Assert that running the interpreter with `args` and optional environment 76 variables `env_vars` is ok and return a (return code, stdout, stderr) tuple. 77 """ 78 return _assert_python(True, *args, **env_vars) 79 80 81def strip_python_stderr(stderr): 82 """Strip the stderr of a Python process from potential debug output 83 emitted by the interpreter. 84 85 This will typically be run on the result of the communicate() method 86 of a subprocess.Popen object. 87 """ 88 stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip() 89 return stderr 90 91 92@contextlib.contextmanager 93def captured_stderr(): 94 """Return a context manager used by captured_stdout/stdin/stderr 95 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 96 logging_stream = StringIO() 97 handler = logging.StreamHandler(logging_stream) 98 logging.root.addHandler(handler) 99 100 try: 101 yield logging_stream 102 finally: 103 logging.root.removeHandler(handler) 104 105 106def create_future(state=PENDING, exception=None, result=None): 107 f = Future() 108 f._state = state 109 f._exception = exception 110 f._result = result 111 return f 112 113 114PENDING_FUTURE = create_future(state=PENDING) 115RUNNING_FUTURE = create_future(state=RUNNING) 116CANCELLED_FUTURE = create_future(state=CANCELLED) 117CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 118EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) 119SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 120 121 122def mul(x, y): 123 return x * y 124 125 126def sleep_and_raise(t): 127 time.sleep(t) 128 raise Exception('this is an exception') 129 130def sleep_and_print(t, msg): 131 time.sleep(t) 132 print(msg) 133 sys.stdout.flush() 134 135 136class ExecutorMixin: 137 worker_count = 5 138 139 def setUp(self): 140 self.t1 = time.time() 141 try: 142 self.executor = self.executor_type(max_workers=self.worker_count) 143 except NotImplementedError: 144 e = sys.exc_info()[1] 145 self.skipTest(str(e)) 146 self._prime_executor() 147 148 def tearDown(self): 149 self.executor.shutdown(wait=True) 150 dt = time.time() - self.t1 151 if test_support.verbose: 152 print("%.2fs" % dt) 153 self.assertLess(dt, 60, "synchronization issue: test lasted too long") 154 155 def _prime_executor(self): 156 # Make sure that the executor is ready to do work before running the 157 # tests. This should reduce the probability of timeouts in the tests. 158 futures = [self.executor.submit(time.sleep, 0.1) 159 for _ in range(self.worker_count)] 160 161 for f in futures: 162 f.result() 163 164 165class ThreadPoolMixin(ExecutorMixin): 166 executor_type = futures.ThreadPoolExecutor 167 168 169class ProcessPoolMixin(ExecutorMixin): 170 executor_type = futures.ProcessPoolExecutor 171 172 173class ExecutorShutdownTest(unittest.TestCase): 174 def test_run_after_shutdown(self): 175 self.executor.shutdown() 176 self.assertRaises(RuntimeError, 177 self.executor.submit, 178 pow, 2, 5) 179 180 def test_interpreter_shutdown(self): 181 # Test the atexit hook for shutdown of worker threads and processes 182 rc, out, err = assert_python_ok('-c', """if 1: 183 from concurrent.futures import %s 184 from time import sleep 185 from test_futures import sleep_and_print 186 t = %s(5) 187 t.submit(sleep_and_print, 1.0, "apple") 188 """ % (self.executor_type.__name__, self.executor_type.__name__)) 189 # Errors in atexit hooks don't change the process exit code, check 190 # stderr manually. 191 self.assertFalse(err) 192 self.assertEqual(out.strip(), "apple".encode()) 193 194 def test_hang_issue12364(self): 195 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 196 self.executor.shutdown() 197 for f in fs: 198 f.result() 199 200 201class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): 202 def _prime_executor(self): 203 pass 204 205 def test_threads_terminate(self): 206 self.executor.submit(mul, 21, 2) 207 self.executor.submit(mul, 6, 7) 208 self.executor.submit(mul, 3, 14) 209 self.assertEqual(len(self.executor._threads), 3) 210 self.executor.shutdown() 211 for t in self.executor._threads: 212 t.join() 213 214 def test_context_manager_shutdown(self): 215 with futures.ThreadPoolExecutor(max_workers=5) as e: 216 executor = e 217 self.assertEqual(list(e.map(abs, range(-5, 5))), 218 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 219 220 for t in executor._threads: 221 t.join() 222 223 def test_del_shutdown(self): 224 executor = futures.ThreadPoolExecutor(max_workers=5) 225 executor.map(abs, range(-5, 5)) 226 threads = executor._threads 227 del executor 228 gc.collect() 229 230 for t in threads: 231 t.join() 232 233 def test_thread_names_assigned(self): 234 executor = futures.ThreadPoolExecutor( 235 max_workers=5, thread_name_prefix='SpecialPool') 236 executor.map(abs, range(-5, 5)) 237 threads = executor._threads 238 del executor 239 gc.collect() 240 241 for t in threads: 242 self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$') 243 t.join() 244 245 def test_thread_names_default(self): 246 executor = futures.ThreadPoolExecutor(max_workers=5) 247 executor.map(abs, range(-5, 5)) 248 threads = executor._threads 249 del executor 250 gc.collect() 251 252 for t in threads: 253 # Ensure that our default name is reasonably sane and unique when 254 # no thread_name_prefix was supplied. 255 self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 256 t.join() 257 258 259class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): 260 def _prime_executor(self): 261 pass 262 263 def test_processes_terminate(self): 264 self.executor.submit(mul, 21, 2) 265 self.executor.submit(mul, 6, 7) 266 self.executor.submit(mul, 3, 14) 267 self.assertEqual(len(self.executor._processes), 5) 268 processes = self.executor._processes 269 self.executor.shutdown() 270 271 for p in processes: 272 p.join() 273 274 def test_context_manager_shutdown(self): 275 with futures.ProcessPoolExecutor(max_workers=5) as e: 276 processes = e._processes 277 self.assertEqual(list(e.map(abs, range(-5, 5))), 278 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 279 280 for p in processes: 281 p.join() 282 283 def test_del_shutdown(self): 284 executor = futures.ProcessPoolExecutor(max_workers=5) 285 list(executor.map(abs, range(-5, 5))) 286 queue_management_thread = executor._queue_management_thread 287 processes = executor._processes 288 del executor 289 gc.collect() 290 291 queue_management_thread.join() 292 for p in processes: 293 p.join() 294 295 296class WaitTests(unittest.TestCase): 297 298 def test_first_completed(self): 299 future1 = self.executor.submit(mul, 21, 2) 300 future2 = self.executor.submit(time.sleep, 1.5) 301 302 done, not_done = futures.wait( 303 [CANCELLED_FUTURE, future1, future2], 304 return_when=futures.FIRST_COMPLETED) 305 306 self.assertEqual(set([future1]), done) 307 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 308 309 def test_first_completed_some_already_completed(self): 310 future1 = self.executor.submit(time.sleep, 1.5) 311 312 finished, pending = futures.wait( 313 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 314 return_when=futures.FIRST_COMPLETED) 315 316 self.assertEqual( 317 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 318 finished) 319 self.assertEqual(set([future1]), pending) 320 321 def test_first_exception(self): 322 future1 = self.executor.submit(mul, 2, 21) 323 future2 = self.executor.submit(sleep_and_raise, 1.5) 324 future3 = self.executor.submit(time.sleep, 3) 325 326 finished, pending = futures.wait( 327 [future1, future2, future3], 328 return_when=futures.FIRST_EXCEPTION) 329 330 self.assertEqual(set([future1, future2]), finished) 331 self.assertEqual(set([future3]), pending) 332 333 def test_first_exception_some_already_complete(self): 334 future1 = self.executor.submit(divmod, 21, 0) 335 future2 = self.executor.submit(time.sleep, 1.5) 336 337 finished, pending = futures.wait( 338 [SUCCESSFUL_FUTURE, 339 CANCELLED_FUTURE, 340 CANCELLED_AND_NOTIFIED_FUTURE, 341 future1, future2], 342 return_when=futures.FIRST_EXCEPTION) 343 344 self.assertEqual(set([SUCCESSFUL_FUTURE, 345 CANCELLED_AND_NOTIFIED_FUTURE, 346 future1]), finished) 347 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 348 349 def test_first_exception_one_already_failed(self): 350 future1 = self.executor.submit(time.sleep, 2) 351 352 finished, pending = futures.wait( 353 [EXCEPTION_FUTURE, future1], 354 return_when=futures.FIRST_EXCEPTION) 355 356 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 357 self.assertEqual(set([future1]), pending) 358 359 def test_all_completed(self): 360 future1 = self.executor.submit(divmod, 2, 0) 361 future2 = self.executor.submit(mul, 2, 21) 362 363 finished, pending = futures.wait( 364 [SUCCESSFUL_FUTURE, 365 CANCELLED_AND_NOTIFIED_FUTURE, 366 EXCEPTION_FUTURE, 367 future1, 368 future2], 369 return_when=futures.ALL_COMPLETED) 370 371 self.assertEqual(set([SUCCESSFUL_FUTURE, 372 CANCELLED_AND_NOTIFIED_FUTURE, 373 EXCEPTION_FUTURE, 374 future1, 375 future2]), finished) 376 self.assertEqual(set(), pending) 377 378 def test_timeout(self): 379 future1 = self.executor.submit(mul, 6, 7) 380 future2 = self.executor.submit(time.sleep, 3) 381 382 finished, pending = futures.wait( 383 [CANCELLED_AND_NOTIFIED_FUTURE, 384 EXCEPTION_FUTURE, 385 SUCCESSFUL_FUTURE, 386 future1, future2], 387 timeout=1.5, 388 return_when=futures.ALL_COMPLETED) 389 390 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 391 EXCEPTION_FUTURE, 392 SUCCESSFUL_FUTURE, 393 future1]), finished) 394 self.assertEqual(set([future2]), pending) 395 396 397class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): 398 399 def test_pending_calls_race(self): 400 # Issue #14406: multi-threaded race condition when waiting on all 401 # futures. 402 event = threading.Event() 403 def future_func(): 404 event.wait() 405 oldswitchinterval = sys.getcheckinterval() 406 sys.setcheckinterval(1) 407 try: 408 fs = set(self.executor.submit(future_func) for i in range(100)) 409 event.set() 410 futures.wait(fs, return_when=futures.ALL_COMPLETED) 411 finally: 412 sys.setcheckinterval(oldswitchinterval) 413 414 415class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): 416 pass 417 418 419class AsCompletedTests(unittest.TestCase): 420 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 421 def test_no_timeout(self): 422 future1 = self.executor.submit(mul, 2, 21) 423 future2 = self.executor.submit(mul, 7, 6) 424 425 completed = set(futures.as_completed( 426 [CANCELLED_AND_NOTIFIED_FUTURE, 427 EXCEPTION_FUTURE, 428 SUCCESSFUL_FUTURE, 429 future1, future2])) 430 self.assertEqual(set( 431 [CANCELLED_AND_NOTIFIED_FUTURE, 432 EXCEPTION_FUTURE, 433 SUCCESSFUL_FUTURE, 434 future1, future2]), 435 completed) 436 437 def test_zero_timeout(self): 438 future1 = self.executor.submit(time.sleep, 2) 439 completed_futures = set() 440 try: 441 for future in futures.as_completed( 442 [CANCELLED_AND_NOTIFIED_FUTURE, 443 EXCEPTION_FUTURE, 444 SUCCESSFUL_FUTURE, 445 future1], 446 timeout=0): 447 completed_futures.add(future) 448 except futures.TimeoutError: 449 pass 450 451 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 452 EXCEPTION_FUTURE, 453 SUCCESSFUL_FUTURE]), 454 completed_futures) 455 456 def test_duplicate_futures(self): 457 # Issue 20367. Duplicate futures should not raise exceptions or give 458 # duplicate responses. 459 future1 = self.executor.submit(time.sleep, 2) 460 completed = [f for f in futures.as_completed([future1,future1])] 461 self.assertEqual(len(completed), 1) 462 463 464class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): 465 pass 466 467 468class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): 469 pass 470 471 472class ExecutorTest(unittest.TestCase): 473 # Executor.shutdown() and context manager usage is tested by 474 # ExecutorShutdownTest. 475 def test_submit(self): 476 future = self.executor.submit(pow, 2, 8) 477 self.assertEqual(256, future.result()) 478 479 def test_submit_keyword(self): 480 future = self.executor.submit(mul, 2, y=8) 481 self.assertEqual(16, future.result()) 482 483 def test_map(self): 484 self.assertEqual( 485 list(self.executor.map(pow, range(10), range(10))), 486 list(map(pow, range(10), range(10)))) 487 488 def test_map_exception(self): 489 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 490 self.assertEqual(next(i), (0, 1)) 491 self.assertEqual(next(i), (0, 1)) 492 self.assertRaises(ZeroDivisionError, next, i) 493 494 def test_map_timeout(self): 495 results = [] 496 try: 497 for i in self.executor.map(time.sleep, 498 [0, 0, 3], 499 timeout=1.5): 500 results.append(i) 501 except futures.TimeoutError: 502 pass 503 else: 504 self.fail('expected TimeoutError') 505 506 self.assertEqual([None, None], results) 507 508 def test_max_workers_negative(self): 509 for number in (0, -1): 510 with self.assertRaises(ValueError) as cm: 511 self.executor_type(max_workers=number) 512 513 assert str(cm.exception) == "max_workers must be greater than 0" 514 515 516class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): 517 def test_map_submits_without_iteration(self): 518 """Tests verifying issue 11777.""" 519 finished = [] 520 def record_finished(n): 521 finished.append(n) 522 523 self.executor.map(record_finished, range(10)) 524 self.executor.shutdown(wait=True) 525 self.assertEqual(len(finished), 10) 526 527 def test_default_workers(self): 528 executor = self.executor_type() 529 self.assertEqual(executor._max_workers, 530 (cpu_count() or 1) * 5) 531 532 533class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): 534 pass 535 536 537class FutureTests(unittest.TestCase): 538 def test_done_callback_with_result(self): 539 callback_result = [None] 540 def fn(callback_future): 541 callback_result[0] = callback_future.result() 542 543 f = Future() 544 f.add_done_callback(fn) 545 f.set_result(5) 546 self.assertEqual(5, callback_result[0]) 547 548 def test_done_callback_with_exception(self): 549 callback_exception = [None] 550 def fn(callback_future): 551 callback_exception[0] = callback_future.exception() 552 553 f = Future() 554 f.add_done_callback(fn) 555 f.set_exception(Exception('test')) 556 self.assertEqual(('test',), callback_exception[0].args) 557 558 def test_done_callback_with_cancel(self): 559 was_cancelled = [None] 560 def fn(callback_future): 561 was_cancelled[0] = callback_future.cancelled() 562 563 f = Future() 564 f.add_done_callback(fn) 565 self.assertTrue(f.cancel()) 566 self.assertTrue(was_cancelled[0]) 567 568 def test_done_callback_raises(self): 569 with captured_stderr() as stderr: 570 raising_was_called = [False] 571 raising_old_style_was_called = [False] 572 fn_was_called = [False] 573 574 def raising_fn(callback_future): 575 raising_was_called[0] = True 576 raise Exception('doh!') 577 578 def raising_old_style_fn(callback_future): 579 raising_old_style_was_called[0] = True 580 class OldStyle: # Does not inherit from object 581 def __str__(self): 582 return 'doh!' 583 raise OldStyle() 584 585 def fn(callback_future): 586 fn_was_called[0] = True 587 588 f = Future() 589 f.add_done_callback(raising_fn) 590 f.add_done_callback(raising_old_style_fn) 591 f.add_done_callback(fn) 592 f.set_result(5) 593 self.assertTrue(raising_was_called) 594 self.assertTrue(raising_old_style_was_called) 595 self.assertTrue(fn_was_called) 596 self.assertIn('Exception: doh!', stderr.getvalue()) 597 self.assertIn('OldStyle: doh!', stderr.getvalue()) 598 599 def test_done_callback_already_successful(self): 600 callback_result = [None] 601 def fn(callback_future): 602 callback_result[0] = callback_future.result() 603 604 f = Future() 605 f.set_result(5) 606 f.add_done_callback(fn) 607 self.assertEqual(5, callback_result[0]) 608 609 def test_done_callback_already_failed(self): 610 callback_exception = [None] 611 def fn(callback_future): 612 callback_exception[0] = callback_future.exception() 613 614 f = Future() 615 f.set_exception(Exception('test')) 616 f.add_done_callback(fn) 617 self.assertEqual(('test',), callback_exception[0].args) 618 619 def test_done_callback_already_cancelled(self): 620 was_cancelled = [None] 621 def fn(callback_future): 622 was_cancelled[0] = callback_future.cancelled() 623 624 f = Future() 625 self.assertTrue(f.cancel()) 626 f.add_done_callback(fn) 627 self.assertTrue(was_cancelled[0]) 628 629 def test_repr(self): 630 self.assertRegexpMatches(repr(PENDING_FUTURE), 631 '<Future at 0x[0-9a-f]+L? state=pending>') 632 self.assertRegexpMatches(repr(RUNNING_FUTURE), 633 '<Future at 0x[0-9a-f]+L? state=running>') 634 self.assertRegexpMatches(repr(CANCELLED_FUTURE), 635 '<Future at 0x[0-9a-f]+L? state=cancelled>') 636 self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), 637 '<Future at 0x[0-9a-f]+L? state=cancelled>') 638 self.assertRegexpMatches( 639 repr(EXCEPTION_FUTURE), 640 '<Future at 0x[0-9a-f]+L? state=finished raised IOError>') 641 self.assertRegexpMatches( 642 repr(SUCCESSFUL_FUTURE), 643 '<Future at 0x[0-9a-f]+L? state=finished returned int>') 644 645 def test_cancel(self): 646 f1 = create_future(state=PENDING) 647 f2 = create_future(state=RUNNING) 648 f3 = create_future(state=CANCELLED) 649 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 650 f5 = create_future(state=FINISHED, exception=IOError()) 651 f6 = create_future(state=FINISHED, result=5) 652 653 self.assertTrue(f1.cancel()) 654 self.assertEqual(f1._state, CANCELLED) 655 656 self.assertFalse(f2.cancel()) 657 self.assertEqual(f2._state, RUNNING) 658 659 self.assertTrue(f3.cancel()) 660 self.assertEqual(f3._state, CANCELLED) 661 662 self.assertTrue(f4.cancel()) 663 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 664 665 self.assertFalse(f5.cancel()) 666 self.assertEqual(f5._state, FINISHED) 667 668 self.assertFalse(f6.cancel()) 669 self.assertEqual(f6._state, FINISHED) 670 671 def test_cancelled(self): 672 self.assertFalse(PENDING_FUTURE.cancelled()) 673 self.assertFalse(RUNNING_FUTURE.cancelled()) 674 self.assertTrue(CANCELLED_FUTURE.cancelled()) 675 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 676 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 677 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 678 679 def test_done(self): 680 self.assertFalse(PENDING_FUTURE.done()) 681 self.assertFalse(RUNNING_FUTURE.done()) 682 self.assertTrue(CANCELLED_FUTURE.done()) 683 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 684 self.assertTrue(EXCEPTION_FUTURE.done()) 685 self.assertTrue(SUCCESSFUL_FUTURE.done()) 686 687 def test_running(self): 688 self.assertFalse(PENDING_FUTURE.running()) 689 self.assertTrue(RUNNING_FUTURE.running()) 690 self.assertFalse(CANCELLED_FUTURE.running()) 691 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 692 self.assertFalse(EXCEPTION_FUTURE.running()) 693 self.assertFalse(SUCCESSFUL_FUTURE.running()) 694 695 def test_result_with_timeout(self): 696 self.assertRaises(futures.TimeoutError, 697 PENDING_FUTURE.result, timeout=0) 698 self.assertRaises(futures.TimeoutError, 699 RUNNING_FUTURE.result, timeout=0) 700 self.assertRaises(futures.CancelledError, 701 CANCELLED_FUTURE.result, timeout=0) 702 self.assertRaises(futures.CancelledError, 703 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 704 self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) 705 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 706 707 def test_result_with_success(self): 708 # TODO(brian@sweetapp.com): This test is timing dependant. 709 def notification(): 710 # Wait until the main thread is waiting for the result. 711 time.sleep(1) 712 f1.set_result(42) 713 714 f1 = create_future(state=PENDING) 715 t = threading.Thread(target=notification) 716 t.start() 717 718 self.assertEqual(f1.result(timeout=5), 42) 719 720 def test_result_with_cancel(self): 721 # TODO(brian@sweetapp.com): This test is timing dependant. 722 def notification(): 723 # Wait until the main thread is waiting for the result. 724 time.sleep(1) 725 f1.cancel() 726 727 f1 = create_future(state=PENDING) 728 t = threading.Thread(target=notification) 729 t.start() 730 731 self.assertRaises(futures.CancelledError, f1.result, timeout=5) 732 733 def test_exception_with_timeout(self): 734 self.assertRaises(futures.TimeoutError, 735 PENDING_FUTURE.exception, timeout=0) 736 self.assertRaises(futures.TimeoutError, 737 RUNNING_FUTURE.exception, timeout=0) 738 self.assertRaises(futures.CancelledError, 739 CANCELLED_FUTURE.exception, timeout=0) 740 self.assertRaises(futures.CancelledError, 741 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 742 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 743 IOError)) 744 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 745 746 def test_exception_with_success(self): 747 def notification(): 748 # Wait until the main thread is waiting for the exception. 749 time.sleep(1) 750 with f1._condition: 751 f1._state = FINISHED 752 f1._exception = IOError() 753 f1._condition.notify_all() 754 755 f1 = create_future(state=PENDING) 756 t = threading.Thread(target=notification) 757 t.start() 758 759 self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) 760 761 def test_old_style_exception(self): 762 class OldStyle: # Does not inherit from object 763 def __str__(self): 764 return 'doh!' 765 callback_exc_info = [None] 766 def fn(callback_future): 767 callback_exc_info[0] = callback_future.exception_info() 768 f = Future() 769 f.add_done_callback(fn) 770 try: 771 raise OldStyle() 772 except OldStyle: 773 want_exc_info = sys.exc_info() 774 f.set_exception_info(*want_exc_info[1:]) 775 self.assertEqual(f.exception_info(), want_exc_info[1:]) 776 self.assertEqual(callback_exc_info[0], want_exc_info[1:]) 777 try: 778 f.result() 779 except OldStyle: 780 got_exc_info = sys.exc_info() 781 else: 782 self.fail('OldStyle exception not raised') 783 self.assertEqual(got_exc_info[:2], want_exc_info[:2]) 784 got_tb = traceback.extract_tb(got_exc_info[2]) 785 want_tb = traceback.extract_tb(want_exc_info[2]) 786 self.assertEqual(got_tb[-len(want_tb):], want_tb) 787 788@reap_threads 789def test_main(): 790 try: 791 test_support.run_unittest(ProcessPoolExecutorTest, 792 ThreadPoolExecutorTest, 793 ProcessPoolWaitTests, 794 ThreadPoolWaitTests, 795 ProcessPoolAsCompletedTests, 796 ThreadPoolAsCompletedTests, 797 FutureTests, 798 ProcessPoolShutdownTest, 799 ThreadPoolShutdownTest) 800 finally: 801 test_support.reap_children() 802 803if __name__ == "__main__": 804 test_main() 805