1import unittest 2import select 3import os 4import socket 5import sys 6import time 7import errno 8import struct 9import threading 10 11from test import support 12from test.support import os_helper 13from test.support import socket_helper 14from test.support import threading_helper 15from test.support import warnings_helper 16from io import BytesIO 17 18if support.PGO: 19 raise unittest.SkipTest("test is not helpful for PGO") 20 21import warnings 22with warnings.catch_warnings(): 23 warnings.simplefilter('ignore', DeprecationWarning) 24 import asyncore 25 26 27HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX') 28 29class dummysocket: 30 def __init__(self): 31 self.closed = False 32 33 def close(self): 34 self.closed = True 35 36 def fileno(self): 37 return 42 38 39class dummychannel: 40 def __init__(self): 41 self.socket = dummysocket() 42 43 def close(self): 44 self.socket.close() 45 46class exitingdummy: 47 def __init__(self): 48 pass 49 50 def handle_read_event(self): 51 raise asyncore.ExitNow() 52 53 handle_write_event = handle_read_event 54 handle_close = handle_read_event 55 handle_expt_event = handle_read_event 56 57class crashingdummy: 58 def __init__(self): 59 self.error_handled = False 60 61 def handle_read_event(self): 62 raise Exception() 63 64 handle_write_event = handle_read_event 65 handle_close = handle_read_event 66 handle_expt_event = handle_read_event 67 68 def handle_error(self): 69 self.error_handled = True 70 71# used when testing senders; just collects what it gets until newline is sent 72def capture_server(evt, buf, serv): 73 try: 74 serv.listen() 75 conn, addr = serv.accept() 76 except TimeoutError: 77 pass 78 else: 79 n = 200 80 start = time.monotonic() 81 while n > 0 and time.monotonic() - start < 3.0: 82 r, w, e = select.select([conn], [], [], 0.1) 83 if r: 84 n -= 1 85 data = conn.recv(10) 86 # keep everything except for the newline terminator 87 buf.write(data.replace(b'\n', b'')) 88 if b'\n' in data: 89 break 90 time.sleep(0.01) 91 92 conn.close() 93 finally: 94 serv.close() 95 evt.set() 96 97def bind_af_aware(sock, addr): 98 """Helper function to bind a socket according to its family.""" 99 if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: 100 # Make sure the path doesn't exist. 101 os_helper.unlink(addr) 102 socket_helper.bind_unix_socket(sock, addr) 103 else: 104 sock.bind(addr) 105 106 107class HelperFunctionTests(unittest.TestCase): 108 def test_readwriteexc(self): 109 # Check exception handling behavior of read, write and _exception 110 111 # check that ExitNow exceptions in the object handler method 112 # bubbles all the way up through asyncore read/write/_exception calls 113 tr1 = exitingdummy() 114 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) 115 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) 116 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) 117 118 # check that an exception other than ExitNow in the object handler 119 # method causes the handle_error method to get called 120 tr2 = crashingdummy() 121 asyncore.read(tr2) 122 self.assertEqual(tr2.error_handled, True) 123 124 tr2 = crashingdummy() 125 asyncore.write(tr2) 126 self.assertEqual(tr2.error_handled, True) 127 128 tr2 = crashingdummy() 129 asyncore._exception(tr2) 130 self.assertEqual(tr2.error_handled, True) 131 132 # asyncore.readwrite uses constants in the select module that 133 # are not present in Windows systems (see this thread: 134 # http://mail.python.org/pipermail/python-list/2001-October/109973.html) 135 # These constants should be present as long as poll is available 136 137 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 138 def test_readwrite(self): 139 # Check that correct methods are called by readwrite() 140 141 attributes = ('read', 'expt', 'write', 'closed', 'error_handled') 142 143 expected = ( 144 (select.POLLIN, 'read'), 145 (select.POLLPRI, 'expt'), 146 (select.POLLOUT, 'write'), 147 (select.POLLERR, 'closed'), 148 (select.POLLHUP, 'closed'), 149 (select.POLLNVAL, 'closed'), 150 ) 151 152 class testobj: 153 def __init__(self): 154 self.read = False 155 self.write = False 156 self.closed = False 157 self.expt = False 158 self.error_handled = False 159 160 def handle_read_event(self): 161 self.read = True 162 163 def handle_write_event(self): 164 self.write = True 165 166 def handle_close(self): 167 self.closed = True 168 169 def handle_expt_event(self): 170 self.expt = True 171 172 def handle_error(self): 173 self.error_handled = True 174 175 for flag, expectedattr in expected: 176 tobj = testobj() 177 self.assertEqual(getattr(tobj, expectedattr), False) 178 asyncore.readwrite(tobj, flag) 179 180 # Only the attribute modified by the routine we expect to be 181 # called should be True. 182 for attr in attributes: 183 self.assertEqual(getattr(tobj, attr), attr==expectedattr) 184 185 # check that ExitNow exceptions in the object handler method 186 # bubbles all the way up through asyncore readwrite call 187 tr1 = exitingdummy() 188 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) 189 190 # check that an exception other than ExitNow in the object handler 191 # method causes the handle_error method to get called 192 tr2 = crashingdummy() 193 self.assertEqual(tr2.error_handled, False) 194 asyncore.readwrite(tr2, flag) 195 self.assertEqual(tr2.error_handled, True) 196 197 def test_closeall(self): 198 self.closeall_check(False) 199 200 def test_closeall_default(self): 201 self.closeall_check(True) 202 203 def closeall_check(self, usedefault): 204 # Check that close_all() closes everything in a given map 205 206 l = [] 207 testmap = {} 208 for i in range(10): 209 c = dummychannel() 210 l.append(c) 211 self.assertEqual(c.socket.closed, False) 212 testmap[i] = c 213 214 if usedefault: 215 socketmap = asyncore.socket_map 216 try: 217 asyncore.socket_map = testmap 218 asyncore.close_all() 219 finally: 220 testmap, asyncore.socket_map = asyncore.socket_map, socketmap 221 else: 222 asyncore.close_all(testmap) 223 224 self.assertEqual(len(testmap), 0) 225 226 for c in l: 227 self.assertEqual(c.socket.closed, True) 228 229 def test_compact_traceback(self): 230 try: 231 raise Exception("I don't like spam!") 232 except: 233 real_t, real_v, real_tb = sys.exc_info() 234 r = asyncore.compact_traceback() 235 else: 236 self.fail("Expected exception") 237 238 (f, function, line), t, v, info = r 239 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') 240 self.assertEqual(function, 'test_compact_traceback') 241 self.assertEqual(t, real_t) 242 self.assertEqual(v, real_v) 243 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) 244 245 246class DispatcherTests(unittest.TestCase): 247 def setUp(self): 248 pass 249 250 def tearDown(self): 251 asyncore.close_all() 252 253 def test_basic(self): 254 d = asyncore.dispatcher() 255 self.assertEqual(d.readable(), True) 256 self.assertEqual(d.writable(), True) 257 258 def test_repr(self): 259 d = asyncore.dispatcher() 260 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) 261 262 def test_log(self): 263 d = asyncore.dispatcher() 264 265 # capture output of dispatcher.log() (to stderr) 266 l1 = "Lovely spam! Wonderful spam!" 267 l2 = "I don't like spam!" 268 with support.captured_stderr() as stderr: 269 d.log(l1) 270 d.log(l2) 271 272 lines = stderr.getvalue().splitlines() 273 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) 274 275 def test_log_info(self): 276 d = asyncore.dispatcher() 277 278 # capture output of dispatcher.log_info() (to stdout via print) 279 l1 = "Have you got anything without spam?" 280 l2 = "Why can't she have egg bacon spam and sausage?" 281 l3 = "THAT'S got spam in it!" 282 with support.captured_stdout() as stdout: 283 d.log_info(l1, 'EGGS') 284 d.log_info(l2) 285 d.log_info(l3, 'SPAM') 286 287 lines = stdout.getvalue().splitlines() 288 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] 289 self.assertEqual(lines, expected) 290 291 def test_unhandled(self): 292 d = asyncore.dispatcher() 293 d.ignore_log_types = () 294 295 # capture output of dispatcher.log_info() (to stdout via print) 296 with support.captured_stdout() as stdout: 297 d.handle_expt() 298 d.handle_read() 299 d.handle_write() 300 d.handle_connect() 301 302 lines = stdout.getvalue().splitlines() 303 expected = ['warning: unhandled incoming priority event', 304 'warning: unhandled read event', 305 'warning: unhandled write event', 306 'warning: unhandled connect event'] 307 self.assertEqual(lines, expected) 308 309 def test_strerror(self): 310 # refers to bug #8573 311 err = asyncore._strerror(errno.EPERM) 312 if hasattr(os, 'strerror'): 313 self.assertEqual(err, os.strerror(errno.EPERM)) 314 err = asyncore._strerror(-1) 315 self.assertTrue(err != "") 316 317 318class dispatcherwithsend_noread(asyncore.dispatcher_with_send): 319 def readable(self): 320 return False 321 322 def handle_connect(self): 323 pass 324 325 326class DispatcherWithSendTests(unittest.TestCase): 327 def setUp(self): 328 pass 329 330 def tearDown(self): 331 asyncore.close_all() 332 333 @threading_helper.reap_threads 334 def test_send(self): 335 evt = threading.Event() 336 sock = socket.socket() 337 sock.settimeout(3) 338 port = socket_helper.bind_port(sock) 339 340 cap = BytesIO() 341 args = (evt, cap, sock) 342 t = threading.Thread(target=capture_server, args=args) 343 t.start() 344 try: 345 # wait a little longer for the server to initialize (it sometimes 346 # refuses connections on slow machines without this wait) 347 time.sleep(0.2) 348 349 data = b"Suppose there isn't a 16-ton weight?" 350 d = dispatcherwithsend_noread() 351 d.create_socket() 352 d.connect((socket_helper.HOST, port)) 353 354 # give time for socket to connect 355 time.sleep(0.1) 356 357 d.send(data) 358 d.send(data) 359 d.send(b'\n') 360 361 n = 1000 362 while d.out_buffer and n > 0: 363 asyncore.poll() 364 n -= 1 365 366 evt.wait() 367 368 self.assertEqual(cap.getvalue(), data*2) 369 finally: 370 threading_helper.join_thread(t) 371 372 373@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 374 'asyncore.file_wrapper required') 375class FileWrapperTest(unittest.TestCase): 376 def setUp(self): 377 self.d = b"It's not dead, it's sleeping!" 378 with open(os_helper.TESTFN, 'wb') as file: 379 file.write(self.d) 380 381 def tearDown(self): 382 os_helper.unlink(os_helper.TESTFN) 383 384 def test_recv(self): 385 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 386 w = asyncore.file_wrapper(fd) 387 os.close(fd) 388 389 self.assertNotEqual(w.fd, fd) 390 self.assertNotEqual(w.fileno(), fd) 391 self.assertEqual(w.recv(13), b"It's not dead") 392 self.assertEqual(w.read(6), b", it's") 393 w.close() 394 self.assertRaises(OSError, w.read, 1) 395 396 def test_send(self): 397 d1 = b"Come again?" 398 d2 = b"I want to buy some cheese." 399 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND) 400 w = asyncore.file_wrapper(fd) 401 os.close(fd) 402 403 w.write(d1) 404 w.send(d2) 405 w.close() 406 with open(os_helper.TESTFN, 'rb') as file: 407 self.assertEqual(file.read(), self.d + d1 + d2) 408 409 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 410 'asyncore.file_dispatcher required') 411 def test_dispatcher(self): 412 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 413 data = [] 414 class FileDispatcher(asyncore.file_dispatcher): 415 def handle_read(self): 416 data.append(self.recv(29)) 417 s = FileDispatcher(fd) 418 os.close(fd) 419 asyncore.loop(timeout=0.01, use_poll=True, count=2) 420 self.assertEqual(b"".join(data), self.d) 421 422 def test_resource_warning(self): 423 # Issue #11453 424 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 425 f = asyncore.file_wrapper(fd) 426 427 os.close(fd) 428 with warnings_helper.check_warnings(('', ResourceWarning)): 429 f = None 430 support.gc_collect() 431 432 def test_close_twice(self): 433 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 434 f = asyncore.file_wrapper(fd) 435 os.close(fd) 436 437 os.close(f.fd) # file_wrapper dupped fd 438 with self.assertRaises(OSError): 439 f.close() 440 441 self.assertEqual(f.fd, -1) 442 # calling close twice should not fail 443 f.close() 444 445 446class BaseTestHandler(asyncore.dispatcher): 447 448 def __init__(self, sock=None): 449 asyncore.dispatcher.__init__(self, sock) 450 self.flag = False 451 452 def handle_accept(self): 453 raise Exception("handle_accept not supposed to be called") 454 455 def handle_accepted(self): 456 raise Exception("handle_accepted not supposed to be called") 457 458 def handle_connect(self): 459 raise Exception("handle_connect not supposed to be called") 460 461 def handle_expt(self): 462 raise Exception("handle_expt not supposed to be called") 463 464 def handle_close(self): 465 raise Exception("handle_close not supposed to be called") 466 467 def handle_error(self): 468 raise 469 470 471class BaseServer(asyncore.dispatcher): 472 """A server which listens on an address and dispatches the 473 connection to a handler. 474 """ 475 476 def __init__(self, family, addr, handler=BaseTestHandler): 477 asyncore.dispatcher.__init__(self) 478 self.create_socket(family) 479 self.set_reuse_addr() 480 bind_af_aware(self.socket, addr) 481 self.listen(5) 482 self.handler = handler 483 484 @property 485 def address(self): 486 return self.socket.getsockname() 487 488 def handle_accepted(self, sock, addr): 489 self.handler(sock) 490 491 def handle_error(self): 492 raise 493 494 495class BaseClient(BaseTestHandler): 496 497 def __init__(self, family, address): 498 BaseTestHandler.__init__(self) 499 self.create_socket(family) 500 self.connect(address) 501 502 def handle_connect(self): 503 pass 504 505 506class BaseTestAPI: 507 508 def tearDown(self): 509 asyncore.close_all(ignore_all=True) 510 511 def loop_waiting_for_flag(self, instance, timeout=5): 512 timeout = float(timeout) / 100 513 count = 100 514 while asyncore.socket_map and count > 0: 515 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) 516 if instance.flag: 517 return 518 count -= 1 519 time.sleep(timeout) 520 self.fail("flag not set") 521 522 def test_handle_connect(self): 523 # make sure handle_connect is called on connect() 524 525 class TestClient(BaseClient): 526 def handle_connect(self): 527 self.flag = True 528 529 server = BaseServer(self.family, self.addr) 530 client = TestClient(self.family, server.address) 531 self.loop_waiting_for_flag(client) 532 533 def test_handle_accept(self): 534 # make sure handle_accept() is called when a client connects 535 536 class TestListener(BaseTestHandler): 537 538 def __init__(self, family, addr): 539 BaseTestHandler.__init__(self) 540 self.create_socket(family) 541 bind_af_aware(self.socket, addr) 542 self.listen(5) 543 self.address = self.socket.getsockname() 544 545 def handle_accept(self): 546 self.flag = True 547 548 server = TestListener(self.family, self.addr) 549 client = BaseClient(self.family, server.address) 550 self.loop_waiting_for_flag(server) 551 552 def test_handle_accepted(self): 553 # make sure handle_accepted() is called when a client connects 554 555 class TestListener(BaseTestHandler): 556 557 def __init__(self, family, addr): 558 BaseTestHandler.__init__(self) 559 self.create_socket(family) 560 bind_af_aware(self.socket, addr) 561 self.listen(5) 562 self.address = self.socket.getsockname() 563 564 def handle_accept(self): 565 asyncore.dispatcher.handle_accept(self) 566 567 def handle_accepted(self, sock, addr): 568 sock.close() 569 self.flag = True 570 571 server = TestListener(self.family, self.addr) 572 client = BaseClient(self.family, server.address) 573 self.loop_waiting_for_flag(server) 574 575 576 def test_handle_read(self): 577 # make sure handle_read is called on data received 578 579 class TestClient(BaseClient): 580 def handle_read(self): 581 self.flag = True 582 583 class TestHandler(BaseTestHandler): 584 def __init__(self, conn): 585 BaseTestHandler.__init__(self, conn) 586 self.send(b'x' * 1024) 587 588 server = BaseServer(self.family, self.addr, TestHandler) 589 client = TestClient(self.family, server.address) 590 self.loop_waiting_for_flag(client) 591 592 def test_handle_write(self): 593 # make sure handle_write is called 594 595 class TestClient(BaseClient): 596 def handle_write(self): 597 self.flag = True 598 599 server = BaseServer(self.family, self.addr) 600 client = TestClient(self.family, server.address) 601 self.loop_waiting_for_flag(client) 602 603 def test_handle_close(self): 604 # make sure handle_close is called when the other end closes 605 # the connection 606 607 class TestClient(BaseClient): 608 609 def handle_read(self): 610 # in order to make handle_close be called we are supposed 611 # to make at least one recv() call 612 self.recv(1024) 613 614 def handle_close(self): 615 self.flag = True 616 self.close() 617 618 class TestHandler(BaseTestHandler): 619 def __init__(self, conn): 620 BaseTestHandler.__init__(self, conn) 621 self.close() 622 623 server = BaseServer(self.family, self.addr, TestHandler) 624 client = TestClient(self.family, server.address) 625 self.loop_waiting_for_flag(client) 626 627 def test_handle_close_after_conn_broken(self): 628 # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and 629 # #11265). 630 631 data = b'\0' * 128 632 633 class TestClient(BaseClient): 634 635 def handle_write(self): 636 self.send(data) 637 638 def handle_close(self): 639 self.flag = True 640 self.close() 641 642 def handle_expt(self): 643 self.flag = True 644 self.close() 645 646 class TestHandler(BaseTestHandler): 647 648 def handle_read(self): 649 self.recv(len(data)) 650 self.close() 651 652 def writable(self): 653 return False 654 655 server = BaseServer(self.family, self.addr, TestHandler) 656 client = TestClient(self.family, server.address) 657 self.loop_waiting_for_flag(client) 658 659 @unittest.skipIf(sys.platform.startswith("sunos"), 660 "OOB support is broken on Solaris") 661 def test_handle_expt(self): 662 # Make sure handle_expt is called on OOB data received. 663 # Note: this might fail on some platforms as OOB data is 664 # tenuously supported and rarely used. 665 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 666 self.skipTest("Not applicable to AF_UNIX sockets.") 667 668 if sys.platform == "darwin" and self.use_poll: 669 self.skipTest("poll may fail on macOS; see issue #28087") 670 671 class TestClient(BaseClient): 672 def handle_expt(self): 673 self.socket.recv(1024, socket.MSG_OOB) 674 self.flag = True 675 676 class TestHandler(BaseTestHandler): 677 def __init__(self, conn): 678 BaseTestHandler.__init__(self, conn) 679 self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) 680 681 server = BaseServer(self.family, self.addr, TestHandler) 682 client = TestClient(self.family, server.address) 683 self.loop_waiting_for_flag(client) 684 685 def test_handle_error(self): 686 687 class TestClient(BaseClient): 688 def handle_write(self): 689 1.0 / 0 690 def handle_error(self): 691 self.flag = True 692 try: 693 raise 694 except ZeroDivisionError: 695 pass 696 else: 697 raise Exception("exception not raised") 698 699 server = BaseServer(self.family, self.addr) 700 client = TestClient(self.family, server.address) 701 self.loop_waiting_for_flag(client) 702 703 def test_connection_attributes(self): 704 server = BaseServer(self.family, self.addr) 705 client = BaseClient(self.family, server.address) 706 707 # we start disconnected 708 self.assertFalse(server.connected) 709 self.assertTrue(server.accepting) 710 # this can't be taken for granted across all platforms 711 #self.assertFalse(client.connected) 712 self.assertFalse(client.accepting) 713 714 # execute some loops so that client connects to server 715 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) 716 self.assertFalse(server.connected) 717 self.assertTrue(server.accepting) 718 self.assertTrue(client.connected) 719 self.assertFalse(client.accepting) 720 721 # disconnect the client 722 client.close() 723 self.assertFalse(server.connected) 724 self.assertTrue(server.accepting) 725 self.assertFalse(client.connected) 726 self.assertFalse(client.accepting) 727 728 # stop serving 729 server.close() 730 self.assertFalse(server.connected) 731 self.assertFalse(server.accepting) 732 733 def test_create_socket(self): 734 s = asyncore.dispatcher() 735 s.create_socket(self.family) 736 self.assertEqual(s.socket.type, socket.SOCK_STREAM) 737 self.assertEqual(s.socket.family, self.family) 738 self.assertEqual(s.socket.gettimeout(), 0) 739 self.assertFalse(s.socket.get_inheritable()) 740 741 def test_bind(self): 742 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 743 self.skipTest("Not applicable to AF_UNIX sockets.") 744 s1 = asyncore.dispatcher() 745 s1.create_socket(self.family) 746 s1.bind(self.addr) 747 s1.listen(5) 748 port = s1.socket.getsockname()[1] 749 750 s2 = asyncore.dispatcher() 751 s2.create_socket(self.family) 752 # EADDRINUSE indicates the socket was correctly bound 753 self.assertRaises(OSError, s2.bind, (self.addr[0], port)) 754 755 def test_set_reuse_addr(self): 756 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 757 self.skipTest("Not applicable to AF_UNIX sockets.") 758 759 with socket.socket(self.family) as sock: 760 try: 761 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 762 except OSError: 763 unittest.skip("SO_REUSEADDR not supported on this platform") 764 else: 765 # if SO_REUSEADDR succeeded for sock we expect asyncore 766 # to do the same 767 s = asyncore.dispatcher(socket.socket(self.family)) 768 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, 769 socket.SO_REUSEADDR)) 770 s.socket.close() 771 s.create_socket(self.family) 772 s.set_reuse_addr() 773 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, 774 socket.SO_REUSEADDR)) 775 776 @threading_helper.reap_threads 777 def test_quick_connect(self): 778 # see: http://bugs.python.org/issue10340 779 if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())): 780 self.skipTest("test specific to AF_INET and AF_INET6") 781 782 server = BaseServer(self.family, self.addr) 783 # run the thread 500 ms: the socket should be connected in 200 ms 784 t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, 785 count=5)) 786 t.start() 787 try: 788 with socket.socket(self.family, socket.SOCK_STREAM) as s: 789 s.settimeout(.2) 790 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 791 struct.pack('ii', 1, 0)) 792 793 try: 794 s.connect(server.address) 795 except OSError: 796 pass 797 finally: 798 threading_helper.join_thread(t) 799 800class TestAPI_UseIPv4Sockets(BaseTestAPI): 801 family = socket.AF_INET 802 addr = (socket_helper.HOST, 0) 803 804@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required') 805class TestAPI_UseIPv6Sockets(BaseTestAPI): 806 family = socket.AF_INET6 807 addr = (socket_helper.HOSTv6, 0) 808 809@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') 810class TestAPI_UseUnixSockets(BaseTestAPI): 811 if HAS_UNIX_SOCKETS: 812 family = socket.AF_UNIX 813 addr = os_helper.TESTFN 814 815 def tearDown(self): 816 os_helper.unlink(self.addr) 817 BaseTestAPI.tearDown(self) 818 819class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): 820 use_poll = False 821 822@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 823class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase): 824 use_poll = True 825 826class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase): 827 use_poll = False 828 829@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 830class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase): 831 use_poll = True 832 833class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase): 834 use_poll = False 835 836@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 837class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase): 838 use_poll = True 839 840if __name__ == "__main__": 841 unittest.main() 842