1import unittest 2from test import support 3 4import errno 5import io 6import itertools 7import socket 8import select 9import tempfile 10import time 11import traceback 12import queue 13import sys 14import os 15import platform 16import array 17import contextlib 18from weakref import proxy 19import signal 20import math 21import pickle 22import struct 23import random 24import shutil 25import string 26import _thread as thread 27import threading 28try: 29 import multiprocessing 30except ImportError: 31 multiprocessing = False 32try: 33 import fcntl 34except ImportError: 35 fcntl = None 36 37HOST = support.HOST 38# test unicode string and carriage return 39MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') 40MAIN_TIMEOUT = 60.0 41 42VSOCKPORT = 1234 43AIX = platform.system() == "AIX" 44 45try: 46 import _socket 47except ImportError: 48 _socket = None 49 50def get_cid(): 51 if fcntl is None: 52 return None 53 if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'): 54 return None 55 try: 56 with open("/dev/vsock", "rb") as f: 57 r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ") 58 except OSError: 59 return None 60 else: 61 return struct.unpack("I", r)[0] 62 63def _have_socket_can(): 64 """Check whether CAN sockets are supported on this host.""" 65 try: 66 s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 67 except (AttributeError, OSError): 68 return False 69 else: 70 s.close() 71 return True 72 73def _have_socket_can_isotp(): 74 """Check whether CAN ISOTP sockets are supported on this host.""" 75 try: 76 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) 77 except (AttributeError, OSError): 78 return False 79 else: 80 s.close() 81 return True 82 83def _have_socket_rds(): 84 """Check whether RDS sockets are supported on this host.""" 85 try: 86 s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 87 except (AttributeError, OSError): 88 return False 89 else: 90 s.close() 91 return True 92 93def _have_socket_alg(): 94 """Check whether AF_ALG sockets are supported on this host.""" 95 try: 96 s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 97 except (AttributeError, OSError): 98 return False 99 else: 100 s.close() 101 return True 102 103def _have_socket_qipcrtr(): 104 """Check whether AF_QIPCRTR sockets are supported on this host.""" 105 try: 106 s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0) 107 except (AttributeError, OSError): 108 return False 109 else: 110 s.close() 111 return True 112 113def _have_socket_vsock(): 114 """Check whether AF_VSOCK sockets are supported on this host.""" 115 ret = get_cid() is not None 116 return ret 117 118 119@contextlib.contextmanager 120def socket_setdefaulttimeout(timeout): 121 old_timeout = socket.getdefaulttimeout() 122 try: 123 socket.setdefaulttimeout(timeout) 124 yield 125 finally: 126 socket.setdefaulttimeout(old_timeout) 127 128 129HAVE_SOCKET_CAN = _have_socket_can() 130 131HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() 132 133HAVE_SOCKET_RDS = _have_socket_rds() 134 135HAVE_SOCKET_ALG = _have_socket_alg() 136 137HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr() 138 139HAVE_SOCKET_VSOCK = _have_socket_vsock() 140 141# Size in bytes of the int type 142SIZEOF_INT = array.array("i").itemsize 143 144class SocketTCPTest(unittest.TestCase): 145 146 def setUp(self): 147 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 148 self.port = support.bind_port(self.serv) 149 self.serv.listen() 150 151 def tearDown(self): 152 self.serv.close() 153 self.serv = None 154 155class SocketUDPTest(unittest.TestCase): 156 157 def setUp(self): 158 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 159 self.port = support.bind_port(self.serv) 160 161 def tearDown(self): 162 self.serv.close() 163 self.serv = None 164 165class ThreadSafeCleanupTestCase(unittest.TestCase): 166 """Subclass of unittest.TestCase with thread-safe cleanup methods. 167 168 This subclass protects the addCleanup() and doCleanups() methods 169 with a recursive lock. 170 """ 171 172 def __init__(self, *args, **kwargs): 173 super().__init__(*args, **kwargs) 174 self._cleanup_lock = threading.RLock() 175 176 def addCleanup(self, *args, **kwargs): 177 with self._cleanup_lock: 178 return super().addCleanup(*args, **kwargs) 179 180 def doCleanups(self, *args, **kwargs): 181 with self._cleanup_lock: 182 return super().doCleanups(*args, **kwargs) 183 184class SocketCANTest(unittest.TestCase): 185 186 """To be able to run this test, a `vcan0` CAN interface can be created with 187 the following commands: 188 # modprobe vcan 189 # ip link add dev vcan0 type vcan 190 # ifconfig vcan0 up 191 """ 192 interface = 'vcan0' 193 bufsize = 128 194 195 """The CAN frame structure is defined in <linux/can.h>: 196 197 struct can_frame { 198 canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ 199 __u8 can_dlc; /* data length code: 0 .. 8 */ 200 __u8 data[8] __attribute__((aligned(8))); 201 }; 202 """ 203 can_frame_fmt = "=IB3x8s" 204 can_frame_size = struct.calcsize(can_frame_fmt) 205 206 """The Broadcast Management Command frame structure is defined 207 in <linux/can/bcm.h>: 208 209 struct bcm_msg_head { 210 __u32 opcode; 211 __u32 flags; 212 __u32 count; 213 struct timeval ival1, ival2; 214 canid_t can_id; 215 __u32 nframes; 216 struct can_frame frames[0]; 217 } 218 219 `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see 220 `struct can_frame` definition). Must use native not standard types for packing. 221 """ 222 bcm_cmd_msg_fmt = "@3I4l2I" 223 bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) 224 225 def setUp(self): 226 self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 227 self.addCleanup(self.s.close) 228 try: 229 self.s.bind((self.interface,)) 230 except OSError: 231 self.skipTest('network interface `%s` does not exist' % 232 self.interface) 233 234 235class SocketRDSTest(unittest.TestCase): 236 237 """To be able to run this test, the `rds` kernel module must be loaded: 238 # modprobe rds 239 """ 240 bufsize = 8192 241 242 def setUp(self): 243 self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 244 self.addCleanup(self.serv.close) 245 try: 246 self.port = support.bind_port(self.serv) 247 except OSError: 248 self.skipTest('unable to bind RDS socket') 249 250 251class ThreadableTest: 252 """Threadable Test class 253 254 The ThreadableTest class makes it easy to create a threaded 255 client/server pair from an existing unit test. To create a 256 new threaded class from an existing unit test, use multiple 257 inheritance: 258 259 class NewClass (OldClass, ThreadableTest): 260 pass 261 262 This class defines two new fixture functions with obvious 263 purposes for overriding: 264 265 clientSetUp () 266 clientTearDown () 267 268 Any new test functions within the class must then define 269 tests in pairs, where the test name is preceded with a 270 '_' to indicate the client portion of the test. Ex: 271 272 def testFoo(self): 273 # Server portion 274 275 def _testFoo(self): 276 # Client portion 277 278 Any exceptions raised by the clients during their tests 279 are caught and transferred to the main thread to alert 280 the testing framework. 281 282 Note, the server setup function cannot call any blocking 283 functions that rely on the client thread during setup, 284 unless serverExplicitReady() is called just before 285 the blocking call (such as in setting up a client/server 286 connection and performing the accept() in setUp(). 287 """ 288 289 def __init__(self): 290 # Swap the true setup function 291 self.__setUp = self.setUp 292 self.__tearDown = self.tearDown 293 self.setUp = self._setUp 294 self.tearDown = self._tearDown 295 296 def serverExplicitReady(self): 297 """This method allows the server to explicitly indicate that 298 it wants the client thread to proceed. This is useful if the 299 server is about to execute a blocking routine that is 300 dependent upon the client thread during its setup routine.""" 301 self.server_ready.set() 302 303 def _setUp(self): 304 self.wait_threads = support.wait_threads_exit() 305 self.wait_threads.__enter__() 306 307 self.server_ready = threading.Event() 308 self.client_ready = threading.Event() 309 self.done = threading.Event() 310 self.queue = queue.Queue(1) 311 self.server_crashed = False 312 313 # Do some munging to start the client test. 314 methodname = self.id() 315 i = methodname.rfind('.') 316 methodname = methodname[i+1:] 317 test_method = getattr(self, '_' + methodname) 318 self.client_thread = thread.start_new_thread( 319 self.clientRun, (test_method,)) 320 321 try: 322 self.__setUp() 323 except: 324 self.server_crashed = True 325 raise 326 finally: 327 self.server_ready.set() 328 self.client_ready.wait() 329 330 def _tearDown(self): 331 self.__tearDown() 332 self.done.wait() 333 self.wait_threads.__exit__(None, None, None) 334 335 if self.queue.qsize(): 336 exc = self.queue.get() 337 raise exc 338 339 def clientRun(self, test_func): 340 self.server_ready.wait() 341 try: 342 self.clientSetUp() 343 except BaseException as e: 344 self.queue.put(e) 345 self.clientTearDown() 346 return 347 finally: 348 self.client_ready.set() 349 if self.server_crashed: 350 self.clientTearDown() 351 return 352 if not hasattr(test_func, '__call__'): 353 raise TypeError("test_func must be a callable function") 354 try: 355 test_func() 356 except BaseException as e: 357 self.queue.put(e) 358 finally: 359 self.clientTearDown() 360 361 def clientSetUp(self): 362 raise NotImplementedError("clientSetUp must be implemented.") 363 364 def clientTearDown(self): 365 self.done.set() 366 thread.exit() 367 368class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 369 370 def __init__(self, methodName='runTest'): 371 SocketTCPTest.__init__(self, methodName=methodName) 372 ThreadableTest.__init__(self) 373 374 def clientSetUp(self): 375 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 376 377 def clientTearDown(self): 378 self.cli.close() 379 self.cli = None 380 ThreadableTest.clientTearDown(self) 381 382class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 383 384 def __init__(self, methodName='runTest'): 385 SocketUDPTest.__init__(self, methodName=methodName) 386 ThreadableTest.__init__(self) 387 388 def clientSetUp(self): 389 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 390 391 def clientTearDown(self): 392 self.cli.close() 393 self.cli = None 394 ThreadableTest.clientTearDown(self) 395 396class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): 397 398 def __init__(self, methodName='runTest'): 399 SocketCANTest.__init__(self, methodName=methodName) 400 ThreadableTest.__init__(self) 401 402 def clientSetUp(self): 403 self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 404 try: 405 self.cli.bind((self.interface,)) 406 except OSError: 407 # skipTest should not be called here, and will be called in the 408 # server instead 409 pass 410 411 def clientTearDown(self): 412 self.cli.close() 413 self.cli = None 414 ThreadableTest.clientTearDown(self) 415 416class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): 417 418 def __init__(self, methodName='runTest'): 419 SocketRDSTest.__init__(self, methodName=methodName) 420 ThreadableTest.__init__(self) 421 422 def clientSetUp(self): 423 self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 424 try: 425 # RDS sockets must be bound explicitly to send or receive data 426 self.cli.bind((HOST, 0)) 427 self.cli_addr = self.cli.getsockname() 428 except OSError: 429 # skipTest should not be called here, and will be called in the 430 # server instead 431 pass 432 433 def clientTearDown(self): 434 self.cli.close() 435 self.cli = None 436 ThreadableTest.clientTearDown(self) 437 438@unittest.skipIf(fcntl is None, "need fcntl") 439@unittest.skipUnless(HAVE_SOCKET_VSOCK, 440 'VSOCK sockets required for this test.') 441@unittest.skipUnless(get_cid() != 2, 442 "This test can only be run on a virtual guest.") 443class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest): 444 445 def __init__(self, methodName='runTest'): 446 unittest.TestCase.__init__(self, methodName=methodName) 447 ThreadableTest.__init__(self) 448 449 def setUp(self): 450 self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 451 self.addCleanup(self.serv.close) 452 self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT)) 453 self.serv.listen() 454 self.serverExplicitReady() 455 self.conn, self.connaddr = self.serv.accept() 456 self.addCleanup(self.conn.close) 457 458 def clientSetUp(self): 459 time.sleep(0.1) 460 self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 461 self.addCleanup(self.cli.close) 462 cid = get_cid() 463 self.cli.connect((cid, VSOCKPORT)) 464 465 def testStream(self): 466 msg = self.conn.recv(1024) 467 self.assertEqual(msg, MSG) 468 469 def _testStream(self): 470 self.cli.send(MSG) 471 self.cli.close() 472 473class SocketConnectedTest(ThreadedTCPSocketTest): 474 """Socket tests for client-server connection. 475 476 self.cli_conn is a client socket connected to the server. The 477 setUp() method guarantees that it is connected to the server. 478 """ 479 480 def __init__(self, methodName='runTest'): 481 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 482 483 def setUp(self): 484 ThreadedTCPSocketTest.setUp(self) 485 # Indicate explicitly we're ready for the client thread to 486 # proceed and then perform the blocking call to accept 487 self.serverExplicitReady() 488 conn, addr = self.serv.accept() 489 self.cli_conn = conn 490 491 def tearDown(self): 492 self.cli_conn.close() 493 self.cli_conn = None 494 ThreadedTCPSocketTest.tearDown(self) 495 496 def clientSetUp(self): 497 ThreadedTCPSocketTest.clientSetUp(self) 498 self.cli.connect((HOST, self.port)) 499 self.serv_conn = self.cli 500 501 def clientTearDown(self): 502 self.serv_conn.close() 503 self.serv_conn = None 504 ThreadedTCPSocketTest.clientTearDown(self) 505 506class SocketPairTest(unittest.TestCase, ThreadableTest): 507 508 def __init__(self, methodName='runTest'): 509 unittest.TestCase.__init__(self, methodName=methodName) 510 ThreadableTest.__init__(self) 511 512 def setUp(self): 513 self.serv, self.cli = socket.socketpair() 514 515 def tearDown(self): 516 self.serv.close() 517 self.serv = None 518 519 def clientSetUp(self): 520 pass 521 522 def clientTearDown(self): 523 self.cli.close() 524 self.cli = None 525 ThreadableTest.clientTearDown(self) 526 527 528# The following classes are used by the sendmsg()/recvmsg() tests. 529# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase 530# gives a drop-in replacement for SocketConnectedTest, but different 531# address families can be used, and the attributes serv_addr and 532# cli_addr will be set to the addresses of the endpoints. 533 534class SocketTestBase(unittest.TestCase): 535 """A base class for socket tests. 536 537 Subclasses must provide methods newSocket() to return a new socket 538 and bindSock(sock) to bind it to an unused address. 539 540 Creates a socket self.serv and sets self.serv_addr to its address. 541 """ 542 543 def setUp(self): 544 self.serv = self.newSocket() 545 self.bindServer() 546 547 def bindServer(self): 548 """Bind server socket and set self.serv_addr to its address.""" 549 self.bindSock(self.serv) 550 self.serv_addr = self.serv.getsockname() 551 552 def tearDown(self): 553 self.serv.close() 554 self.serv = None 555 556 557class SocketListeningTestMixin(SocketTestBase): 558 """Mixin to listen on the server socket.""" 559 560 def setUp(self): 561 super().setUp() 562 self.serv.listen() 563 564 565class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, 566 ThreadableTest): 567 """Mixin to add client socket and allow client/server tests. 568 569 Client socket is self.cli and its address is self.cli_addr. See 570 ThreadableTest for usage information. 571 """ 572 573 def __init__(self, *args, **kwargs): 574 super().__init__(*args, **kwargs) 575 ThreadableTest.__init__(self) 576 577 def clientSetUp(self): 578 self.cli = self.newClientSocket() 579 self.bindClient() 580 581 def newClientSocket(self): 582 """Return a new socket for use as client.""" 583 return self.newSocket() 584 585 def bindClient(self): 586 """Bind client socket and set self.cli_addr to its address.""" 587 self.bindSock(self.cli) 588 self.cli_addr = self.cli.getsockname() 589 590 def clientTearDown(self): 591 self.cli.close() 592 self.cli = None 593 ThreadableTest.clientTearDown(self) 594 595 596class ConnectedStreamTestMixin(SocketListeningTestMixin, 597 ThreadedSocketTestMixin): 598 """Mixin to allow client/server stream tests with connected client. 599 600 Server's socket representing connection to client is self.cli_conn 601 and client's connection to server is self.serv_conn. (Based on 602 SocketConnectedTest.) 603 """ 604 605 def setUp(self): 606 super().setUp() 607 # Indicate explicitly we're ready for the client thread to 608 # proceed and then perform the blocking call to accept 609 self.serverExplicitReady() 610 conn, addr = self.serv.accept() 611 self.cli_conn = conn 612 613 def tearDown(self): 614 self.cli_conn.close() 615 self.cli_conn = None 616 super().tearDown() 617 618 def clientSetUp(self): 619 super().clientSetUp() 620 self.cli.connect(self.serv_addr) 621 self.serv_conn = self.cli 622 623 def clientTearDown(self): 624 try: 625 self.serv_conn.close() 626 self.serv_conn = None 627 except AttributeError: 628 pass 629 super().clientTearDown() 630 631 632class UnixSocketTestBase(SocketTestBase): 633 """Base class for Unix-domain socket tests.""" 634 635 # This class is used for file descriptor passing tests, so we 636 # create the sockets in a private directory so that other users 637 # can't send anything that might be problematic for a privileged 638 # user running the tests. 639 640 def setUp(self): 641 self.dir_path = tempfile.mkdtemp() 642 self.addCleanup(os.rmdir, self.dir_path) 643 super().setUp() 644 645 def bindSock(self, sock): 646 path = tempfile.mktemp(dir=self.dir_path) 647 support.bind_unix_socket(sock, path) 648 self.addCleanup(support.unlink, path) 649 650class UnixStreamBase(UnixSocketTestBase): 651 """Base class for Unix-domain SOCK_STREAM tests.""" 652 653 def newSocket(self): 654 return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 655 656 657class InetTestBase(SocketTestBase): 658 """Base class for IPv4 socket tests.""" 659 660 host = HOST 661 662 def setUp(self): 663 super().setUp() 664 self.port = self.serv_addr[1] 665 666 def bindSock(self, sock): 667 support.bind_port(sock, host=self.host) 668 669class TCPTestBase(InetTestBase): 670 """Base class for TCP-over-IPv4 tests.""" 671 672 def newSocket(self): 673 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 674 675class UDPTestBase(InetTestBase): 676 """Base class for UDP-over-IPv4 tests.""" 677 678 def newSocket(self): 679 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 680 681class SCTPStreamBase(InetTestBase): 682 """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" 683 684 def newSocket(self): 685 return socket.socket(socket.AF_INET, socket.SOCK_STREAM, 686 socket.IPPROTO_SCTP) 687 688 689class Inet6TestBase(InetTestBase): 690 """Base class for IPv6 socket tests.""" 691 692 host = support.HOSTv6 693 694class UDP6TestBase(Inet6TestBase): 695 """Base class for UDP-over-IPv6 tests.""" 696 697 def newSocket(self): 698 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 699 700 701# Test-skipping decorators for use with ThreadableTest. 702 703def skipWithClientIf(condition, reason): 704 """Skip decorated test if condition is true, add client_skip decorator. 705 706 If the decorated object is not a class, sets its attribute 707 "client_skip" to a decorator which will return an empty function 708 if the test is to be skipped, or the original function if it is 709 not. This can be used to avoid running the client part of a 710 skipped test when using ThreadableTest. 711 """ 712 def client_pass(*args, **kwargs): 713 pass 714 def skipdec(obj): 715 retval = unittest.skip(reason)(obj) 716 if not isinstance(obj, type): 717 retval.client_skip = lambda f: client_pass 718 return retval 719 def noskipdec(obj): 720 if not (isinstance(obj, type) or hasattr(obj, "client_skip")): 721 obj.client_skip = lambda f: f 722 return obj 723 return skipdec if condition else noskipdec 724 725 726def requireAttrs(obj, *attributes): 727 """Skip decorated test if obj is missing any of the given attributes. 728 729 Sets client_skip attribute as skipWithClientIf() does. 730 """ 731 missing = [name for name in attributes if not hasattr(obj, name)] 732 return skipWithClientIf( 733 missing, "don't have " + ", ".join(name for name in missing)) 734 735 736def requireSocket(*args): 737 """Skip decorated test if a socket cannot be created with given arguments. 738 739 When an argument is given as a string, will use the value of that 740 attribute of the socket module, or skip the test if it doesn't 741 exist. Sets client_skip attribute as skipWithClientIf() does. 742 """ 743 err = None 744 missing = [obj for obj in args if 745 isinstance(obj, str) and not hasattr(socket, obj)] 746 if missing: 747 err = "don't have " + ", ".join(name for name in missing) 748 else: 749 callargs = [getattr(socket, obj) if isinstance(obj, str) else obj 750 for obj in args] 751 try: 752 s = socket.socket(*callargs) 753 except OSError as e: 754 # XXX: check errno? 755 err = str(e) 756 else: 757 s.close() 758 return skipWithClientIf( 759 err is not None, 760 "can't create socket({0}): {1}".format( 761 ", ".join(str(o) for o in args), err)) 762 763 764####################################################################### 765## Begin Tests 766 767class GeneralModuleTests(unittest.TestCase): 768 769 def test_SocketType_is_socketobject(self): 770 import _socket 771 self.assertTrue(socket.SocketType is _socket.socket) 772 s = socket.socket() 773 self.assertIsInstance(s, socket.SocketType) 774 s.close() 775 776 def test_repr(self): 777 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 778 with s: 779 self.assertIn('fd=%i' % s.fileno(), repr(s)) 780 self.assertIn('family=%s' % socket.AF_INET, repr(s)) 781 self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) 782 self.assertIn('proto=0', repr(s)) 783 self.assertNotIn('raddr', repr(s)) 784 s.bind(('127.0.0.1', 0)) 785 self.assertIn('laddr', repr(s)) 786 self.assertIn(str(s.getsockname()), repr(s)) 787 self.assertIn('[closed]', repr(s)) 788 self.assertNotIn('laddr', repr(s)) 789 790 @unittest.skipUnless(_socket is not None, 'need _socket module') 791 def test_csocket_repr(self): 792 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 793 try: 794 expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' 795 % (s.fileno(), s.family, s.type, s.proto)) 796 self.assertEqual(repr(s), expected) 797 finally: 798 s.close() 799 expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' 800 % (s.family, s.type, s.proto)) 801 self.assertEqual(repr(s), expected) 802 803 def test_weakref(self): 804 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 805 p = proxy(s) 806 self.assertEqual(p.fileno(), s.fileno()) 807 s = None 808 try: 809 p.fileno() 810 except ReferenceError: 811 pass 812 else: 813 self.fail('Socket proxy still exists') 814 815 def testSocketError(self): 816 # Testing socket module exceptions 817 msg = "Error raising socket exception (%s)." 818 with self.assertRaises(OSError, msg=msg % 'OSError'): 819 raise OSError 820 with self.assertRaises(OSError, msg=msg % 'socket.herror'): 821 raise socket.herror 822 with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): 823 raise socket.gaierror 824 825 def testSendtoErrors(self): 826 # Testing that sendto doesn't mask failures. See #10169. 827 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 828 self.addCleanup(s.close) 829 s.bind(('', 0)) 830 sockname = s.getsockname() 831 # 2 args 832 with self.assertRaises(TypeError) as cm: 833 s.sendto('\u2620', sockname) 834 self.assertEqual(str(cm.exception), 835 "a bytes-like object is required, not 'str'") 836 with self.assertRaises(TypeError) as cm: 837 s.sendto(5j, sockname) 838 self.assertEqual(str(cm.exception), 839 "a bytes-like object is required, not 'complex'") 840 with self.assertRaises(TypeError) as cm: 841 s.sendto(b'foo', None) 842 self.assertIn('not NoneType',str(cm.exception)) 843 # 3 args 844 with self.assertRaises(TypeError) as cm: 845 s.sendto('\u2620', 0, sockname) 846 self.assertEqual(str(cm.exception), 847 "a bytes-like object is required, not 'str'") 848 with self.assertRaises(TypeError) as cm: 849 s.sendto(5j, 0, sockname) 850 self.assertEqual(str(cm.exception), 851 "a bytes-like object is required, not 'complex'") 852 with self.assertRaises(TypeError) as cm: 853 s.sendto(b'foo', 0, None) 854 self.assertIn('not NoneType', str(cm.exception)) 855 with self.assertRaises(TypeError) as cm: 856 s.sendto(b'foo', 'bar', sockname) 857 self.assertIn('an integer is required', str(cm.exception)) 858 with self.assertRaises(TypeError) as cm: 859 s.sendto(b'foo', None, None) 860 self.assertIn('an integer is required', str(cm.exception)) 861 # wrong number of args 862 with self.assertRaises(TypeError) as cm: 863 s.sendto(b'foo') 864 self.assertIn('(1 given)', str(cm.exception)) 865 with self.assertRaises(TypeError) as cm: 866 s.sendto(b'foo', 0, sockname, 4) 867 self.assertIn('(4 given)', str(cm.exception)) 868 869 def testCrucialConstants(self): 870 # Testing for mission critical constants 871 socket.AF_INET 872 if socket.has_ipv6: 873 socket.AF_INET6 874 socket.SOCK_STREAM 875 socket.SOCK_DGRAM 876 socket.SOCK_RAW 877 socket.SOCK_RDM 878 socket.SOCK_SEQPACKET 879 socket.SOL_SOCKET 880 socket.SO_REUSEADDR 881 882 def testCrucialIpProtoConstants(self): 883 socket.IPPROTO_TCP 884 socket.IPPROTO_UDP 885 if socket.has_ipv6: 886 socket.IPPROTO_IPV6 887 888 @unittest.skipUnless(os.name == "nt", "Windows specific") 889 def testWindowsSpecificConstants(self): 890 socket.IPPROTO_ICLFXBM 891 socket.IPPROTO_ST 892 socket.IPPROTO_CBT 893 socket.IPPROTO_IGP 894 socket.IPPROTO_RDP 895 socket.IPPROTO_PGM 896 socket.IPPROTO_L2TP 897 socket.IPPROTO_SCTP 898 899 def testHostnameRes(self): 900 # Testing hostname resolution mechanisms 901 hostname = socket.gethostname() 902 try: 903 ip = socket.gethostbyname(hostname) 904 except OSError: 905 # Probably name lookup wasn't set up right; skip this test 906 self.skipTest('name lookup failure') 907 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 908 try: 909 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 910 except OSError: 911 # Probably a similar problem as above; skip this test 912 self.skipTest('name lookup failure') 913 all_host_names = [hostname, hname] + aliases 914 fqhn = socket.getfqdn(ip) 915 if not fqhn in all_host_names: 916 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 917 918 def test_host_resolution(self): 919 for addr in [support.HOSTv4, '10.0.0.1', '255.255.255.255']: 920 self.assertEqual(socket.gethostbyname(addr), addr) 921 922 # we don't test support.HOSTv6 because there's a chance it doesn't have 923 # a matching name entry (e.g. 'ip6-localhost') 924 for host in [support.HOSTv4]: 925 self.assertIn(host, socket.gethostbyaddr(host)[2]) 926 927 def test_host_resolution_bad_address(self): 928 # These are all malformed IP addresses and expected not to resolve to 929 # any result. But some ISPs, e.g. AWS, may successfully resolve these 930 # IPs. 931 explanation = ( 932 "resolving an invalid IP address did not raise OSError; " 933 "can be caused by a broken DNS server" 934 ) 935 for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', 936 '1:1:1:1:1:1:1:1:1']: 937 with self.assertRaises(OSError, msg=addr): 938 socket.gethostbyname(addr) 939 with self.assertRaises(OSError, msg=explanation): 940 socket.gethostbyaddr(addr) 941 942 @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") 943 @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") 944 def test_sethostname(self): 945 oldhn = socket.gethostname() 946 try: 947 socket.sethostname('new') 948 except OSError as e: 949 if e.errno == errno.EPERM: 950 self.skipTest("test should be run as root") 951 else: 952 raise 953 try: 954 # running test as root! 955 self.assertEqual(socket.gethostname(), 'new') 956 # Should work with bytes objects too 957 socket.sethostname(b'bar') 958 self.assertEqual(socket.gethostname(), 'bar') 959 finally: 960 socket.sethostname(oldhn) 961 962 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 963 'socket.if_nameindex() not available.') 964 def testInterfaceNameIndex(self): 965 interfaces = socket.if_nameindex() 966 for index, name in interfaces: 967 self.assertIsInstance(index, int) 968 self.assertIsInstance(name, str) 969 # interface indices are non-zero integers 970 self.assertGreater(index, 0) 971 _index = socket.if_nametoindex(name) 972 self.assertIsInstance(_index, int) 973 self.assertEqual(index, _index) 974 _name = socket.if_indextoname(index) 975 self.assertIsInstance(_name, str) 976 self.assertEqual(name, _name) 977 978 @unittest.skipUnless(hasattr(socket, 'if_indextoname'), 979 'socket.if_indextoname() not available.') 980 def testInvalidInterfaceIndexToName(self): 981 self.assertRaises(OSError, socket.if_indextoname, 0) 982 self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') 983 984 @unittest.skipUnless(hasattr(socket, 'if_nametoindex'), 985 'socket.if_nametoindex() not available.') 986 def testInvalidInterfaceNameToIndex(self): 987 self.assertRaises(TypeError, socket.if_nametoindex, 0) 988 self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') 989 990 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 991 'test needs sys.getrefcount()') 992 def testRefCountGetNameInfo(self): 993 # Testing reference count for getnameinfo 994 try: 995 # On some versions, this loses a reference 996 orig = sys.getrefcount(__name__) 997 socket.getnameinfo(__name__,0) 998 except TypeError: 999 if sys.getrefcount(__name__) != orig: 1000 self.fail("socket.getnameinfo loses a reference") 1001 1002 def testInterpreterCrash(self): 1003 # Making sure getnameinfo doesn't crash the interpreter 1004 try: 1005 # On some versions, this crashes the interpreter. 1006 socket.getnameinfo(('x', 0, 0, 0), 0) 1007 except OSError: 1008 pass 1009 1010 def testNtoH(self): 1011 # This just checks that htons etc. are their own inverse, 1012 # when looking at the lower 16 or 32 bits. 1013 sizes = {socket.htonl: 32, socket.ntohl: 32, 1014 socket.htons: 16, socket.ntohs: 16} 1015 for func, size in sizes.items(): 1016 mask = (1<<size) - 1 1017 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 1018 self.assertEqual(i & mask, func(func(i&mask)) & mask) 1019 1020 swapped = func(mask) 1021 self.assertEqual(swapped & mask, mask) 1022 self.assertRaises(OverflowError, func, 1<<34) 1023 1024 @support.cpython_only 1025 def testNtoHErrors(self): 1026 import _testcapi 1027 s_good_values = [0, 1, 2, 0xffff] 1028 l_good_values = s_good_values + [0xffffffff] 1029 l_bad_values = [-1, -2, 1<<32, 1<<1000] 1030 s_bad_values = l_bad_values + [_testcapi.INT_MIN - 1, 1031 _testcapi.INT_MAX + 1] 1032 s_deprecated_values = [1<<16, _testcapi.INT_MAX] 1033 for k in s_good_values: 1034 socket.ntohs(k) 1035 socket.htons(k) 1036 for k in l_good_values: 1037 socket.ntohl(k) 1038 socket.htonl(k) 1039 for k in s_bad_values: 1040 self.assertRaises(OverflowError, socket.ntohs, k) 1041 self.assertRaises(OverflowError, socket.htons, k) 1042 for k in l_bad_values: 1043 self.assertRaises(OverflowError, socket.ntohl, k) 1044 self.assertRaises(OverflowError, socket.htonl, k) 1045 for k in s_deprecated_values: 1046 self.assertWarns(DeprecationWarning, socket.ntohs, k) 1047 self.assertWarns(DeprecationWarning, socket.htons, k) 1048 1049 def testGetServBy(self): 1050 eq = self.assertEqual 1051 # Find one service that exists, then check all the related interfaces. 1052 # I've ordered this by protocols that have both a tcp and udp 1053 # protocol, at least for modern Linuxes. 1054 if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) 1055 or sys.platform in ('linux', 'darwin')): 1056 # avoid the 'echo' service on this platform, as there is an 1057 # assumption breaking non-standard port/protocol entry 1058 services = ('daytime', 'qotd', 'domain') 1059 else: 1060 services = ('echo', 'daytime', 'domain') 1061 for service in services: 1062 try: 1063 port = socket.getservbyname(service, 'tcp') 1064 break 1065 except OSError: 1066 pass 1067 else: 1068 raise OSError 1069 # Try same call with optional protocol omitted 1070 # Issue #26936: Android getservbyname() was broken before API 23. 1071 if (not hasattr(sys, 'getandroidapilevel') or 1072 sys.getandroidapilevel() >= 23): 1073 port2 = socket.getservbyname(service) 1074 eq(port, port2) 1075 # Try udp, but don't barf if it doesn't exist 1076 try: 1077 udpport = socket.getservbyname(service, 'udp') 1078 except OSError: 1079 udpport = None 1080 else: 1081 eq(udpport, port) 1082 # Now make sure the lookup by port returns the same service name 1083 # Issue #26936: Android getservbyport() is broken. 1084 if not support.is_android: 1085 eq(socket.getservbyport(port2), service) 1086 eq(socket.getservbyport(port, 'tcp'), service) 1087 if udpport is not None: 1088 eq(socket.getservbyport(udpport, 'udp'), service) 1089 # Make sure getservbyport does not accept out of range ports. 1090 self.assertRaises(OverflowError, socket.getservbyport, -1) 1091 self.assertRaises(OverflowError, socket.getservbyport, 65536) 1092 1093 def testDefaultTimeout(self): 1094 # Testing default timeout 1095 # The default timeout should initially be None 1096 self.assertEqual(socket.getdefaulttimeout(), None) 1097 with socket.socket() as s: 1098 self.assertEqual(s.gettimeout(), None) 1099 1100 # Set the default timeout to 10, and see if it propagates 1101 with socket_setdefaulttimeout(10): 1102 self.assertEqual(socket.getdefaulttimeout(), 10) 1103 with socket.socket() as sock: 1104 self.assertEqual(sock.gettimeout(), 10) 1105 1106 # Reset the default timeout to None, and see if it propagates 1107 socket.setdefaulttimeout(None) 1108 self.assertEqual(socket.getdefaulttimeout(), None) 1109 with socket.socket() as sock: 1110 self.assertEqual(sock.gettimeout(), None) 1111 1112 # Check that setting it to an invalid value raises ValueError 1113 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 1114 1115 # Check that setting it to an invalid type raises TypeError 1116 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 1117 1118 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 1119 'test needs socket.inet_aton()') 1120 def testIPv4_inet_aton_fourbytes(self): 1121 # Test that issue1008086 and issue767150 are fixed. 1122 # It must return 4 bytes. 1123 self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) 1124 self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) 1125 1126 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1127 'test needs socket.inet_pton()') 1128 def testIPv4toString(self): 1129 from socket import inet_aton as f, inet_pton, AF_INET 1130 g = lambda a: inet_pton(AF_INET, a) 1131 1132 assertInvalid = lambda func,a: self.assertRaises( 1133 (OSError, ValueError), func, a 1134 ) 1135 1136 self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) 1137 self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) 1138 self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) 1139 self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) 1140 self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) 1141 # bpo-29972: inet_pton() doesn't fail on AIX 1142 if not AIX: 1143 assertInvalid(f, '0.0.0.') 1144 assertInvalid(f, '300.0.0.0') 1145 assertInvalid(f, 'a.0.0.0') 1146 assertInvalid(f, '1.2.3.4.5') 1147 assertInvalid(f, '::1') 1148 1149 self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) 1150 self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) 1151 self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) 1152 self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) 1153 assertInvalid(g, '0.0.0.') 1154 assertInvalid(g, '300.0.0.0') 1155 assertInvalid(g, 'a.0.0.0') 1156 assertInvalid(g, '1.2.3.4.5') 1157 assertInvalid(g, '::1') 1158 1159 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1160 'test needs socket.inet_pton()') 1161 def testIPv6toString(self): 1162 try: 1163 from socket import inet_pton, AF_INET6, has_ipv6 1164 if not has_ipv6: 1165 self.skipTest('IPv6 not available') 1166 except ImportError: 1167 self.skipTest('could not import needed symbols from socket') 1168 1169 if sys.platform == "win32": 1170 try: 1171 inet_pton(AF_INET6, '::') 1172 except OSError as e: 1173 if e.winerror == 10022: 1174 self.skipTest('IPv6 might not be supported') 1175 1176 f = lambda a: inet_pton(AF_INET6, a) 1177 assertInvalid = lambda a: self.assertRaises( 1178 (OSError, ValueError), f, a 1179 ) 1180 1181 self.assertEqual(b'\x00' * 16, f('::')) 1182 self.assertEqual(b'\x00' * 16, f('0::0')) 1183 self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) 1184 self.assertEqual( 1185 b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 1186 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 1187 ) 1188 self.assertEqual( 1189 b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', 1190 f('ad42:abc::127:0:254:2') 1191 ) 1192 self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) 1193 assertInvalid('0x20::') 1194 assertInvalid(':::') 1195 assertInvalid('::0::') 1196 assertInvalid('1::abc::') 1197 assertInvalid('1::abc::def') 1198 assertInvalid('1:2:3:4:5:6') 1199 assertInvalid('1:2:3:4:5:6:') 1200 assertInvalid('1:2:3:4:5:6:7:8:0') 1201 # bpo-29972: inet_pton() doesn't fail on AIX 1202 if not AIX: 1203 assertInvalid('1:2:3:4:5:6:7:8:') 1204 1205 self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', 1206 f('::254.42.23.64') 1207 ) 1208 self.assertEqual( 1209 b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', 1210 f('42::a29b:254.42.23.64') 1211 ) 1212 self.assertEqual( 1213 b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', 1214 f('42:a8b9:0:2:ffff:a29b:254.42.23.64') 1215 ) 1216 assertInvalid('255.254.253.252') 1217 assertInvalid('1::260.2.3.0') 1218 assertInvalid('1::0.be.e.0') 1219 assertInvalid('1:2:3:4:5:6:7:1.2.3.4') 1220 assertInvalid('::1.2.3.4:0') 1221 assertInvalid('0.100.200.0:3:4:5:6:7:8') 1222 1223 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1224 'test needs socket.inet_ntop()') 1225 def testStringToIPv4(self): 1226 from socket import inet_ntoa as f, inet_ntop, AF_INET 1227 g = lambda a: inet_ntop(AF_INET, a) 1228 assertInvalid = lambda func,a: self.assertRaises( 1229 (OSError, ValueError), func, a 1230 ) 1231 1232 self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) 1233 self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) 1234 self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) 1235 self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) 1236 assertInvalid(f, b'\x00' * 3) 1237 assertInvalid(f, b'\x00' * 5) 1238 assertInvalid(f, b'\x00' * 16) 1239 self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) 1240 1241 self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) 1242 self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) 1243 self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) 1244 assertInvalid(g, b'\x00' * 3) 1245 assertInvalid(g, b'\x00' * 5) 1246 assertInvalid(g, b'\x00' * 16) 1247 self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) 1248 1249 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1250 'test needs socket.inet_ntop()') 1251 def testStringToIPv6(self): 1252 try: 1253 from socket import inet_ntop, AF_INET6, has_ipv6 1254 if not has_ipv6: 1255 self.skipTest('IPv6 not available') 1256 except ImportError: 1257 self.skipTest('could not import needed symbols from socket') 1258 1259 if sys.platform == "win32": 1260 try: 1261 inet_ntop(AF_INET6, b'\x00' * 16) 1262 except OSError as e: 1263 if e.winerror == 10022: 1264 self.skipTest('IPv6 might not be supported') 1265 1266 f = lambda a: inet_ntop(AF_INET6, a) 1267 assertInvalid = lambda a: self.assertRaises( 1268 (OSError, ValueError), f, a 1269 ) 1270 1271 self.assertEqual('::', f(b'\x00' * 16)) 1272 self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) 1273 self.assertEqual( 1274 'aef:b01:506:1001:ffff:9997:55:170', 1275 f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 1276 ) 1277 self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) 1278 1279 assertInvalid(b'\x12' * 15) 1280 assertInvalid(b'\x12' * 17) 1281 assertInvalid(b'\x12' * 4) 1282 1283 # XXX The following don't test module-level functionality... 1284 1285 def testSockName(self): 1286 # Testing getsockname() 1287 port = support.find_unused_port() 1288 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1289 self.addCleanup(sock.close) 1290 sock.bind(("0.0.0.0", port)) 1291 name = sock.getsockname() 1292 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 1293 # it reasonable to get the host's addr in addition to 0.0.0.0. 1294 # At least for eCos. This is required for the S/390 to pass. 1295 try: 1296 my_ip_addr = socket.gethostbyname(socket.gethostname()) 1297 except OSError: 1298 # Probably name lookup wasn't set up right; skip this test 1299 self.skipTest('name lookup failure') 1300 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 1301 self.assertEqual(name[1], port) 1302 1303 def testGetSockOpt(self): 1304 # Testing getsockopt() 1305 # We know a socket should start without reuse==0 1306 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1307 self.addCleanup(sock.close) 1308 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1309 self.assertFalse(reuse != 0, "initial mode is reuse") 1310 1311 def testSetSockOpt(self): 1312 # Testing setsockopt() 1313 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1314 self.addCleanup(sock.close) 1315 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1316 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1317 self.assertFalse(reuse == 0, "failed to set reuse mode") 1318 1319 def testSendAfterClose(self): 1320 # testing send() after close() with timeout 1321 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1322 sock.settimeout(1) 1323 self.assertRaises(OSError, sock.send, b"spam") 1324 1325 def testCloseException(self): 1326 sock = socket.socket() 1327 sock.bind((socket._LOCALHOST, 0)) 1328 socket.socket(fileno=sock.fileno()).close() 1329 try: 1330 sock.close() 1331 except OSError as err: 1332 # Winsock apparently raises ENOTSOCK 1333 self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) 1334 else: 1335 self.fail("close() should raise EBADF/ENOTSOCK") 1336 1337 def testNewAttributes(self): 1338 # testing .family, .type and .protocol 1339 1340 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1341 self.assertEqual(sock.family, socket.AF_INET) 1342 if hasattr(socket, 'SOCK_CLOEXEC'): 1343 self.assertIn(sock.type, 1344 (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, 1345 socket.SOCK_STREAM)) 1346 else: 1347 self.assertEqual(sock.type, socket.SOCK_STREAM) 1348 self.assertEqual(sock.proto, 0) 1349 1350 def test_getsockaddrarg(self): 1351 sock = socket.socket() 1352 self.addCleanup(sock.close) 1353 port = support.find_unused_port() 1354 big_port = port + 65536 1355 neg_port = port - 65536 1356 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 1357 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 1358 # Since find_unused_port() is inherently subject to race conditions, we 1359 # call it a couple times if necessary. 1360 for i in itertools.count(): 1361 port = support.find_unused_port() 1362 try: 1363 sock.bind((HOST, port)) 1364 except OSError as e: 1365 if e.errno != errno.EADDRINUSE or i == 5: 1366 raise 1367 else: 1368 break 1369 1370 @unittest.skipUnless(os.name == "nt", "Windows specific") 1371 def test_sock_ioctl(self): 1372 self.assertTrue(hasattr(socket.socket, 'ioctl')) 1373 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 1374 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 1375 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 1376 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 1377 s = socket.socket() 1378 self.addCleanup(s.close) 1379 self.assertRaises(ValueError, s.ioctl, -1, None) 1380 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 1381 1382 @unittest.skipUnless(os.name == "nt", "Windows specific") 1383 @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), 1384 'Loopback fast path support required for this test') 1385 def test_sio_loopback_fast_path(self): 1386 s = socket.socket() 1387 self.addCleanup(s.close) 1388 try: 1389 s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) 1390 except OSError as exc: 1391 WSAEOPNOTSUPP = 10045 1392 if exc.winerror == WSAEOPNOTSUPP: 1393 self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " 1394 "doesn't implemented in this Windows version") 1395 raise 1396 self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) 1397 1398 def testGetaddrinfo(self): 1399 try: 1400 socket.getaddrinfo('localhost', 80) 1401 except socket.gaierror as err: 1402 if err.errno == socket.EAI_SERVICE: 1403 # see http://bugs.python.org/issue1282647 1404 self.skipTest("buggy libc version") 1405 raise 1406 # len of every sequence is supposed to be == 5 1407 for info in socket.getaddrinfo(HOST, None): 1408 self.assertEqual(len(info), 5) 1409 # host can be a domain name, a string representation of an 1410 # IPv4/v6 address or None 1411 socket.getaddrinfo('localhost', 80) 1412 socket.getaddrinfo('127.0.0.1', 80) 1413 socket.getaddrinfo(None, 80) 1414 if support.IPV6_ENABLED: 1415 socket.getaddrinfo('::1', 80) 1416 # port can be a string service name such as "http", a numeric 1417 # port number or None 1418 # Issue #26936: Android getaddrinfo() was broken before API level 23. 1419 if (not hasattr(sys, 'getandroidapilevel') or 1420 sys.getandroidapilevel() >= 23): 1421 socket.getaddrinfo(HOST, "http") 1422 socket.getaddrinfo(HOST, 80) 1423 socket.getaddrinfo(HOST, None) 1424 # test family and socktype filters 1425 infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) 1426 for family, type, _, _, _ in infos: 1427 self.assertEqual(family, socket.AF_INET) 1428 self.assertEqual(str(family), 'AddressFamily.AF_INET') 1429 self.assertEqual(type, socket.SOCK_STREAM) 1430 self.assertEqual(str(type), 'SocketKind.SOCK_STREAM') 1431 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1432 for _, socktype, _, _, _ in infos: 1433 self.assertEqual(socktype, socket.SOCK_STREAM) 1434 # test proto and flags arguments 1435 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1436 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1437 # a server willing to support both IPv4 and IPv6 will 1438 # usually do this 1439 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1440 socket.AI_PASSIVE) 1441 # test keyword arguments 1442 a = socket.getaddrinfo(HOST, None) 1443 b = socket.getaddrinfo(host=HOST, port=None) 1444 self.assertEqual(a, b) 1445 a = socket.getaddrinfo(HOST, None, socket.AF_INET) 1446 b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) 1447 self.assertEqual(a, b) 1448 a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1449 b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) 1450 self.assertEqual(a, b) 1451 a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1452 b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) 1453 self.assertEqual(a, b) 1454 a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1455 b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) 1456 self.assertEqual(a, b) 1457 a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1458 socket.AI_PASSIVE) 1459 b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, 1460 type=socket.SOCK_STREAM, proto=0, 1461 flags=socket.AI_PASSIVE) 1462 self.assertEqual(a, b) 1463 # Issue #6697. 1464 self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') 1465 1466 # Issue 17269: test workaround for OS X platform bug segfault 1467 if hasattr(socket, 'AI_NUMERICSERV'): 1468 try: 1469 # The arguments here are undefined and the call may succeed 1470 # or fail. All we care here is that it doesn't segfault. 1471 socket.getaddrinfo("localhost", None, 0, 0, 0, 1472 socket.AI_NUMERICSERV) 1473 except socket.gaierror: 1474 pass 1475 1476 def test_getnameinfo(self): 1477 # only IP addresses are allowed 1478 self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) 1479 1480 @unittest.skipUnless(support.is_resource_enabled('network'), 1481 'network is not enabled') 1482 def test_idna(self): 1483 # Check for internet access before running test 1484 # (issue #12804, issue #25138). 1485 with support.transient_internet('python.org'): 1486 socket.gethostbyname('python.org') 1487 1488 # these should all be successful 1489 domain = 'испытание.pythontest.net' 1490 socket.gethostbyname(domain) 1491 socket.gethostbyname_ex(domain) 1492 socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) 1493 # this may not work if the forward lookup chooses the IPv6 address, as that doesn't 1494 # have a reverse entry yet 1495 # socket.gethostbyaddr('испытание.python.org') 1496 1497 def check_sendall_interrupted(self, with_timeout): 1498 # socketpair() is not strictly required, but it makes things easier. 1499 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 1500 self.skipTest("signal.alarm and socket.socketpair required for this test") 1501 # Our signal handlers clobber the C errno by calling a math function 1502 # with an invalid domain value. 1503 def ok_handler(*args): 1504 self.assertRaises(ValueError, math.acosh, 0) 1505 def raising_handler(*args): 1506 self.assertRaises(ValueError, math.acosh, 0) 1507 1 // 0 1508 c, s = socket.socketpair() 1509 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 1510 try: 1511 if with_timeout: 1512 # Just above the one second minimum for signal.alarm 1513 c.settimeout(1.5) 1514 with self.assertRaises(ZeroDivisionError): 1515 signal.alarm(1) 1516 c.sendall(b"x" * support.SOCK_MAX_SIZE) 1517 if with_timeout: 1518 signal.signal(signal.SIGALRM, ok_handler) 1519 signal.alarm(1) 1520 self.assertRaises(socket.timeout, c.sendall, 1521 b"x" * support.SOCK_MAX_SIZE) 1522 finally: 1523 signal.alarm(0) 1524 signal.signal(signal.SIGALRM, old_alarm) 1525 c.close() 1526 s.close() 1527 1528 def test_sendall_interrupted(self): 1529 self.check_sendall_interrupted(False) 1530 1531 def test_sendall_interrupted_with_timeout(self): 1532 self.check_sendall_interrupted(True) 1533 1534 def test_dealloc_warn(self): 1535 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1536 r = repr(sock) 1537 with self.assertWarns(ResourceWarning) as cm: 1538 sock = None 1539 support.gc_collect() 1540 self.assertIn(r, str(cm.warning.args[0])) 1541 # An open socket file object gets dereferenced after the socket 1542 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1543 f = sock.makefile('rb') 1544 r = repr(sock) 1545 sock = None 1546 support.gc_collect() 1547 with self.assertWarns(ResourceWarning): 1548 f = None 1549 support.gc_collect() 1550 1551 def test_name_closed_socketio(self): 1552 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1553 fp = sock.makefile("rb") 1554 fp.close() 1555 self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") 1556 1557 def test_unusable_closed_socketio(self): 1558 with socket.socket() as sock: 1559 fp = sock.makefile("rb", buffering=0) 1560 self.assertTrue(fp.readable()) 1561 self.assertFalse(fp.writable()) 1562 self.assertFalse(fp.seekable()) 1563 fp.close() 1564 self.assertRaises(ValueError, fp.readable) 1565 self.assertRaises(ValueError, fp.writable) 1566 self.assertRaises(ValueError, fp.seekable) 1567 1568 def test_socket_close(self): 1569 sock = socket.socket() 1570 try: 1571 sock.bind((HOST, 0)) 1572 socket.close(sock.fileno()) 1573 with self.assertRaises(OSError): 1574 sock.listen(1) 1575 finally: 1576 with self.assertRaises(OSError): 1577 # sock.close() fails with EBADF 1578 sock.close() 1579 with self.assertRaises(TypeError): 1580 socket.close(None) 1581 with self.assertRaises(OSError): 1582 socket.close(-1) 1583 1584 def test_makefile_mode(self): 1585 for mode in 'r', 'rb', 'rw', 'w', 'wb': 1586 with self.subTest(mode=mode): 1587 with socket.socket() as sock: 1588 with sock.makefile(mode) as fp: 1589 self.assertEqual(fp.mode, mode) 1590 1591 def test_makefile_invalid_mode(self): 1592 for mode in 'rt', 'x', '+', 'a': 1593 with self.subTest(mode=mode): 1594 with socket.socket() as sock: 1595 with self.assertRaisesRegex(ValueError, 'invalid mode'): 1596 sock.makefile(mode) 1597 1598 def test_pickle(self): 1599 sock = socket.socket() 1600 with sock: 1601 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1602 self.assertRaises(TypeError, pickle.dumps, sock, protocol) 1603 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1604 family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) 1605 self.assertEqual(family, socket.AF_INET) 1606 type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) 1607 self.assertEqual(type, socket.SOCK_STREAM) 1608 1609 def test_listen_backlog(self): 1610 for backlog in 0, -1: 1611 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1612 srv.bind((HOST, 0)) 1613 srv.listen(backlog) 1614 1615 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1616 srv.bind((HOST, 0)) 1617 srv.listen() 1618 1619 @support.cpython_only 1620 def test_listen_backlog_overflow(self): 1621 # Issue 15989 1622 import _testcapi 1623 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1624 srv.bind((HOST, 0)) 1625 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 1626 1627 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1628 def test_flowinfo(self): 1629 self.assertRaises(OverflowError, socket.getnameinfo, 1630 (support.HOSTv6, 0, 0xffffffff), 0) 1631 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 1632 self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) 1633 1634 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1635 def test_getaddrinfo_ipv6_basic(self): 1636 ((*_, sockaddr),) = socket.getaddrinfo( 1637 'ff02::1de:c0:face:8D', # Note capital letter `D`. 1638 1234, socket.AF_INET6, 1639 socket.SOCK_DGRAM, 1640 socket.IPPROTO_UDP 1641 ) 1642 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) 1643 1644 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1645 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1646 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1647 def test_getaddrinfo_ipv6_scopeid_symbolic(self): 1648 # Just pick up any network interface (Linux, Mac OS X) 1649 (ifindex, test_interface) = socket.if_nameindex()[0] 1650 ((*_, sockaddr),) = socket.getaddrinfo( 1651 'ff02::1de:c0:face:8D%' + test_interface, 1652 1234, socket.AF_INET6, 1653 socket.SOCK_DGRAM, 1654 socket.IPPROTO_UDP 1655 ) 1656 # Note missing interface name part in IPv6 address 1657 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1658 1659 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1660 @unittest.skipUnless( 1661 sys.platform == 'win32', 1662 'Numeric scope id does not work or undocumented') 1663 def test_getaddrinfo_ipv6_scopeid_numeric(self): 1664 # Also works on Linux and Mac OS X, but is not documented (?) 1665 # Windows, Linux and Max OS X allow nonexistent interface numbers here. 1666 ifindex = 42 1667 ((*_, sockaddr),) = socket.getaddrinfo( 1668 'ff02::1de:c0:face:8D%' + str(ifindex), 1669 1234, socket.AF_INET6, 1670 socket.SOCK_DGRAM, 1671 socket.IPPROTO_UDP 1672 ) 1673 # Note missing interface name part in IPv6 address 1674 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1675 1676 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1677 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1678 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1679 def test_getnameinfo_ipv6_scopeid_symbolic(self): 1680 # Just pick up any network interface. 1681 (ifindex, test_interface) = socket.if_nameindex()[0] 1682 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1683 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1684 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) 1685 1686 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1687 @unittest.skipUnless( sys.platform == 'win32', 1688 'Numeric scope id does not work or undocumented') 1689 def test_getnameinfo_ipv6_scopeid_numeric(self): 1690 # Also works on Linux (undocumented), but does not work on Mac OS X 1691 # Windows and Linux allow nonexistent interface numbers here. 1692 ifindex = 42 1693 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1694 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1695 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234')) 1696 1697 def test_str_for_enums(self): 1698 # Make sure that the AF_* and SOCK_* constants have enum-like string 1699 # reprs. 1700 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1701 self.assertEqual(str(s.family), 'AddressFamily.AF_INET') 1702 self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM') 1703 1704 def test_socket_consistent_sock_type(self): 1705 SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) 1706 SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0) 1707 sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC 1708 1709 with socket.socket(socket.AF_INET, sock_type) as s: 1710 self.assertEqual(s.type, socket.SOCK_STREAM) 1711 s.settimeout(1) 1712 self.assertEqual(s.type, socket.SOCK_STREAM) 1713 s.settimeout(0) 1714 self.assertEqual(s.type, socket.SOCK_STREAM) 1715 s.setblocking(True) 1716 self.assertEqual(s.type, socket.SOCK_STREAM) 1717 s.setblocking(False) 1718 self.assertEqual(s.type, socket.SOCK_STREAM) 1719 1720 def test_unknown_socket_family_repr(self): 1721 # Test that when created with a family that's not one of the known 1722 # AF_*/SOCK_* constants, socket.family just returns the number. 1723 # 1724 # To do this we fool socket.socket into believing it already has an 1725 # open fd because on this path it doesn't actually verify the family and 1726 # type and populates the socket object. 1727 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1728 fd = sock.detach() 1729 unknown_family = max(socket.AddressFamily.__members__.values()) + 1 1730 1731 unknown_type = max( 1732 kind 1733 for name, kind in socket.SocketKind.__members__.items() 1734 if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'} 1735 ) + 1 1736 1737 with socket.socket( 1738 family=unknown_family, type=unknown_type, proto=23, 1739 fileno=fd) as s: 1740 self.assertEqual(s.family, unknown_family) 1741 self.assertEqual(s.type, unknown_type) 1742 # some OS like macOS ignore proto 1743 self.assertIn(s.proto, {0, 23}) 1744 1745 @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') 1746 def test__sendfile_use_sendfile(self): 1747 class File: 1748 def __init__(self, fd): 1749 self.fd = fd 1750 1751 def fileno(self): 1752 return self.fd 1753 with socket.socket() as sock: 1754 fd = os.open(os.curdir, os.O_RDONLY) 1755 os.close(fd) 1756 with self.assertRaises(socket._GiveupOnSendfile): 1757 sock._sendfile_use_sendfile(File(fd)) 1758 with self.assertRaises(OverflowError): 1759 sock._sendfile_use_sendfile(File(2**1000)) 1760 with self.assertRaises(TypeError): 1761 sock._sendfile_use_sendfile(File(None)) 1762 1763 def _test_socket_fileno(self, s, family, stype): 1764 self.assertEqual(s.family, family) 1765 self.assertEqual(s.type, stype) 1766 1767 fd = s.fileno() 1768 s2 = socket.socket(fileno=fd) 1769 self.addCleanup(s2.close) 1770 # detach old fd to avoid double close 1771 s.detach() 1772 self.assertEqual(s2.family, family) 1773 self.assertEqual(s2.type, stype) 1774 self.assertEqual(s2.fileno(), fd) 1775 1776 def test_socket_fileno(self): 1777 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1778 self.addCleanup(s.close) 1779 s.bind((support.HOST, 0)) 1780 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) 1781 1782 if hasattr(socket, "SOCK_DGRAM"): 1783 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1784 self.addCleanup(s.close) 1785 s.bind((support.HOST, 0)) 1786 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) 1787 1788 if support.IPV6_ENABLED: 1789 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 1790 self.addCleanup(s.close) 1791 s.bind((support.HOSTv6, 0, 0, 0)) 1792 self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) 1793 1794 if hasattr(socket, "AF_UNIX"): 1795 tmpdir = tempfile.mkdtemp() 1796 self.addCleanup(shutil.rmtree, tmpdir) 1797 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1798 self.addCleanup(s.close) 1799 try: 1800 s.bind(os.path.join(tmpdir, 'socket')) 1801 except PermissionError: 1802 pass 1803 else: 1804 self._test_socket_fileno(s, socket.AF_UNIX, 1805 socket.SOCK_STREAM) 1806 1807 def test_socket_fileno_rejects_float(self): 1808 with self.assertRaisesRegex(TypeError, "integer argument expected"): 1809 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5) 1810 1811 def test_socket_fileno_rejects_other_types(self): 1812 with self.assertRaisesRegex(TypeError, "integer is required"): 1813 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo") 1814 1815 def test_socket_fileno_rejects_invalid_socket(self): 1816 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1817 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1) 1818 1819 @unittest.skipIf(os.name == "nt", "Windows disallows -1 only") 1820 def test_socket_fileno_rejects_negative(self): 1821 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1822 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42) 1823 1824 def test_socket_fileno_requires_valid_fd(self): 1825 WSAENOTSOCK = 10038 1826 with self.assertRaises(OSError) as cm: 1827 socket.socket(fileno=support.make_bad_fd()) 1828 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1829 1830 with self.assertRaises(OSError) as cm: 1831 socket.socket( 1832 socket.AF_INET, 1833 socket.SOCK_STREAM, 1834 fileno=support.make_bad_fd()) 1835 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1836 1837 def test_socket_fileno_requires_socket_fd(self): 1838 with tempfile.NamedTemporaryFile() as afile: 1839 with self.assertRaises(OSError): 1840 socket.socket(fileno=afile.fileno()) 1841 1842 with self.assertRaises(OSError) as cm: 1843 socket.socket( 1844 socket.AF_INET, 1845 socket.SOCK_STREAM, 1846 fileno=afile.fileno()) 1847 self.assertEqual(cm.exception.errno, errno.ENOTSOCK) 1848 1849 1850@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1851class BasicCANTest(unittest.TestCase): 1852 1853 def testCrucialConstants(self): 1854 socket.AF_CAN 1855 socket.PF_CAN 1856 socket.CAN_RAW 1857 1858 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1859 'socket.CAN_BCM required for this test.') 1860 def testBCMConstants(self): 1861 socket.CAN_BCM 1862 1863 # opcodes 1864 socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task 1865 socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task 1866 socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task 1867 socket.CAN_BCM_TX_SEND # send one CAN frame 1868 socket.CAN_BCM_RX_SETUP # create RX content filter subscription 1869 socket.CAN_BCM_RX_DELETE # remove RX content filter subscription 1870 socket.CAN_BCM_RX_READ # read properties of RX content filter subscription 1871 socket.CAN_BCM_TX_STATUS # reply to TX_READ request 1872 socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) 1873 socket.CAN_BCM_RX_STATUS # reply to RX_READ request 1874 socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent 1875 socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) 1876 1877 # flags 1878 socket.CAN_BCM_SETTIMER 1879 socket.CAN_BCM_STARTTIMER 1880 socket.CAN_BCM_TX_COUNTEVT 1881 socket.CAN_BCM_TX_ANNOUNCE 1882 socket.CAN_BCM_TX_CP_CAN_ID 1883 socket.CAN_BCM_RX_FILTER_ID 1884 socket.CAN_BCM_RX_CHECK_DLC 1885 socket.CAN_BCM_RX_NO_AUTOTIMER 1886 socket.CAN_BCM_RX_ANNOUNCE_RESUME 1887 socket.CAN_BCM_TX_RESET_MULTI_IDX 1888 socket.CAN_BCM_RX_RTR_FRAME 1889 1890 def testCreateSocket(self): 1891 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1892 pass 1893 1894 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1895 'socket.CAN_BCM required for this test.') 1896 def testCreateBCMSocket(self): 1897 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: 1898 pass 1899 1900 def testBindAny(self): 1901 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1902 address = ('', ) 1903 s.bind(address) 1904 self.assertEqual(s.getsockname(), address) 1905 1906 def testTooLongInterfaceName(self): 1907 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 1908 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1909 self.assertRaisesRegex(OSError, 'interface name too long', 1910 s.bind, ('x' * 1024,)) 1911 1912 @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), 1913 'socket.CAN_RAW_LOOPBACK required for this test.') 1914 def testLoopback(self): 1915 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1916 for loopback in (0, 1): 1917 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, 1918 loopback) 1919 self.assertEqual(loopback, 1920 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) 1921 1922 @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), 1923 'socket.CAN_RAW_FILTER required for this test.') 1924 def testFilter(self): 1925 can_id, can_mask = 0x200, 0x700 1926 can_filter = struct.pack("=II", can_id, can_mask) 1927 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1928 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) 1929 self.assertEqual(can_filter, 1930 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) 1931 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) 1932 1933 1934@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1935class CANTest(ThreadedCANSocketTest): 1936 1937 def __init__(self, methodName='runTest'): 1938 ThreadedCANSocketTest.__init__(self, methodName=methodName) 1939 1940 @classmethod 1941 def build_can_frame(cls, can_id, data): 1942 """Build a CAN frame.""" 1943 can_dlc = len(data) 1944 data = data.ljust(8, b'\x00') 1945 return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) 1946 1947 @classmethod 1948 def dissect_can_frame(cls, frame): 1949 """Dissect a CAN frame.""" 1950 can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) 1951 return (can_id, can_dlc, data[:can_dlc]) 1952 1953 def testSendFrame(self): 1954 cf, addr = self.s.recvfrom(self.bufsize) 1955 self.assertEqual(self.cf, cf) 1956 self.assertEqual(addr[0], self.interface) 1957 self.assertEqual(addr[1], socket.AF_CAN) 1958 1959 def _testSendFrame(self): 1960 self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') 1961 self.cli.send(self.cf) 1962 1963 def testSendMaxFrame(self): 1964 cf, addr = self.s.recvfrom(self.bufsize) 1965 self.assertEqual(self.cf, cf) 1966 1967 def _testSendMaxFrame(self): 1968 self.cf = self.build_can_frame(0x00, b'\x07' * 8) 1969 self.cli.send(self.cf) 1970 1971 def testSendMultiFrames(self): 1972 cf, addr = self.s.recvfrom(self.bufsize) 1973 self.assertEqual(self.cf1, cf) 1974 1975 cf, addr = self.s.recvfrom(self.bufsize) 1976 self.assertEqual(self.cf2, cf) 1977 1978 def _testSendMultiFrames(self): 1979 self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') 1980 self.cli.send(self.cf1) 1981 1982 self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') 1983 self.cli.send(self.cf2) 1984 1985 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1986 'socket.CAN_BCM required for this test.') 1987 def _testBCM(self): 1988 cf, addr = self.cli.recvfrom(self.bufsize) 1989 self.assertEqual(self.cf, cf) 1990 can_id, can_dlc, data = self.dissect_can_frame(cf) 1991 self.assertEqual(self.can_id, can_id) 1992 self.assertEqual(self.data, data) 1993 1994 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1995 'socket.CAN_BCM required for this test.') 1996 def testBCM(self): 1997 bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) 1998 self.addCleanup(bcm.close) 1999 bcm.connect((self.interface,)) 2000 self.can_id = 0x123 2001 self.data = bytes([0xc0, 0xff, 0xee]) 2002 self.cf = self.build_can_frame(self.can_id, self.data) 2003 opcode = socket.CAN_BCM_TX_SEND 2004 flags = 0 2005 count = 0 2006 ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 2007 bcm_can_id = 0x0222 2008 nframes = 1 2009 assert len(self.cf) == 16 2010 header = struct.pack(self.bcm_cmd_msg_fmt, 2011 opcode, 2012 flags, 2013 count, 2014 ival1_seconds, 2015 ival1_usec, 2016 ival2_seconds, 2017 ival2_usec, 2018 bcm_can_id, 2019 nframes, 2020 ) 2021 header_plus_frame = header + self.cf 2022 bytes_sent = bcm.send(header_plus_frame) 2023 self.assertEqual(bytes_sent, len(header_plus_frame)) 2024 2025 2026@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.') 2027class ISOTPTest(unittest.TestCase): 2028 2029 def __init__(self, *args, **kwargs): 2030 super().__init__(*args, **kwargs) 2031 self.interface = "vcan0" 2032 2033 def testCrucialConstants(self): 2034 socket.AF_CAN 2035 socket.PF_CAN 2036 socket.CAN_ISOTP 2037 socket.SOCK_DGRAM 2038 2039 def testCreateSocket(self): 2040 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2041 pass 2042 2043 @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"), 2044 'socket.CAN_ISOTP required for this test.') 2045 def testCreateISOTPSocket(self): 2046 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2047 pass 2048 2049 def testTooLongInterfaceName(self): 2050 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2051 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2052 with self.assertRaisesRegex(OSError, 'interface name too long'): 2053 s.bind(('x' * 1024, 1, 2)) 2054 2055 def testBind(self): 2056 try: 2057 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2058 addr = self.interface, 0x123, 0x456 2059 s.bind(addr) 2060 self.assertEqual(s.getsockname(), addr) 2061 except OSError as e: 2062 if e.errno == errno.ENODEV: 2063 self.skipTest('network interface `%s` does not exist' % 2064 self.interface) 2065 else: 2066 raise 2067 2068 2069@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2070class BasicRDSTest(unittest.TestCase): 2071 2072 def testCrucialConstants(self): 2073 socket.AF_RDS 2074 socket.PF_RDS 2075 2076 def testCreateSocket(self): 2077 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2078 pass 2079 2080 def testSocketBufferSize(self): 2081 bufsize = 16384 2082 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2083 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) 2084 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) 2085 2086 2087@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2088class RDSTest(ThreadedRDSSocketTest): 2089 2090 def __init__(self, methodName='runTest'): 2091 ThreadedRDSSocketTest.__init__(self, methodName=methodName) 2092 2093 def setUp(self): 2094 super().setUp() 2095 self.evt = threading.Event() 2096 2097 def testSendAndRecv(self): 2098 data, addr = self.serv.recvfrom(self.bufsize) 2099 self.assertEqual(self.data, data) 2100 self.assertEqual(self.cli_addr, addr) 2101 2102 def _testSendAndRecv(self): 2103 self.data = b'spam' 2104 self.cli.sendto(self.data, 0, (HOST, self.port)) 2105 2106 def testPeek(self): 2107 data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) 2108 self.assertEqual(self.data, data) 2109 data, addr = self.serv.recvfrom(self.bufsize) 2110 self.assertEqual(self.data, data) 2111 2112 def _testPeek(self): 2113 self.data = b'spam' 2114 self.cli.sendto(self.data, 0, (HOST, self.port)) 2115 2116 @requireAttrs(socket.socket, 'recvmsg') 2117 def testSendAndRecvMsg(self): 2118 data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) 2119 self.assertEqual(self.data, data) 2120 2121 @requireAttrs(socket.socket, 'sendmsg') 2122 def _testSendAndRecvMsg(self): 2123 self.data = b'hello ' * 10 2124 self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) 2125 2126 def testSendAndRecvMulti(self): 2127 data, addr = self.serv.recvfrom(self.bufsize) 2128 self.assertEqual(self.data1, data) 2129 2130 data, addr = self.serv.recvfrom(self.bufsize) 2131 self.assertEqual(self.data2, data) 2132 2133 def _testSendAndRecvMulti(self): 2134 self.data1 = b'bacon' 2135 self.cli.sendto(self.data1, 0, (HOST, self.port)) 2136 2137 self.data2 = b'egg' 2138 self.cli.sendto(self.data2, 0, (HOST, self.port)) 2139 2140 def testSelect(self): 2141 r, w, x = select.select([self.serv], [], [], 3.0) 2142 self.assertIn(self.serv, r) 2143 data, addr = self.serv.recvfrom(self.bufsize) 2144 self.assertEqual(self.data, data) 2145 2146 def _testSelect(self): 2147 self.data = b'select' 2148 self.cli.sendto(self.data, 0, (HOST, self.port)) 2149 2150@unittest.skipUnless(HAVE_SOCKET_QIPCRTR, 2151 'QIPCRTR sockets required for this test.') 2152class BasicQIPCRTRTest(unittest.TestCase): 2153 2154 def testCrucialConstants(self): 2155 socket.AF_QIPCRTR 2156 2157 def testCreateSocket(self): 2158 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2159 pass 2160 2161 def testUnbound(self): 2162 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2163 self.assertEqual(s.getsockname()[1], 0) 2164 2165 def testBindSock(self): 2166 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2167 support.bind_port(s, host=s.getsockname()[0]) 2168 self.assertNotEqual(s.getsockname()[1], 0) 2169 2170 def testInvalidBindSock(self): 2171 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2172 self.assertRaises(OSError, support.bind_port, s, host=-2) 2173 2174 def testAutoBindSock(self): 2175 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2176 s.connect((123, 123)) 2177 self.assertNotEqual(s.getsockname()[1], 0) 2178 2179@unittest.skipIf(fcntl is None, "need fcntl") 2180@unittest.skipUnless(HAVE_SOCKET_VSOCK, 2181 'VSOCK sockets required for this test.') 2182class BasicVSOCKTest(unittest.TestCase): 2183 2184 def testCrucialConstants(self): 2185 socket.AF_VSOCK 2186 2187 def testVSOCKConstants(self): 2188 socket.SO_VM_SOCKETS_BUFFER_SIZE 2189 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE 2190 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE 2191 socket.VMADDR_CID_ANY 2192 socket.VMADDR_PORT_ANY 2193 socket.VMADDR_CID_HOST 2194 socket.VM_SOCKETS_INVALID_VERSION 2195 socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID 2196 2197 def testCreateSocket(self): 2198 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2199 pass 2200 2201 def testSocketBufferSize(self): 2202 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2203 orig_max = s.getsockopt(socket.AF_VSOCK, 2204 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE) 2205 orig = s.getsockopt(socket.AF_VSOCK, 2206 socket.SO_VM_SOCKETS_BUFFER_SIZE) 2207 orig_min = s.getsockopt(socket.AF_VSOCK, 2208 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE) 2209 2210 s.setsockopt(socket.AF_VSOCK, 2211 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2) 2212 s.setsockopt(socket.AF_VSOCK, 2213 socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2) 2214 s.setsockopt(socket.AF_VSOCK, 2215 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2) 2216 2217 self.assertEqual(orig_max * 2, 2218 s.getsockopt(socket.AF_VSOCK, 2219 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)) 2220 self.assertEqual(orig * 2, 2221 s.getsockopt(socket.AF_VSOCK, 2222 socket.SO_VM_SOCKETS_BUFFER_SIZE)) 2223 self.assertEqual(orig_min * 2, 2224 s.getsockopt(socket.AF_VSOCK, 2225 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)) 2226 2227 2228class BasicTCPTest(SocketConnectedTest): 2229 2230 def __init__(self, methodName='runTest'): 2231 SocketConnectedTest.__init__(self, methodName=methodName) 2232 2233 def testRecv(self): 2234 # Testing large receive over TCP 2235 msg = self.cli_conn.recv(1024) 2236 self.assertEqual(msg, MSG) 2237 2238 def _testRecv(self): 2239 self.serv_conn.send(MSG) 2240 2241 def testOverFlowRecv(self): 2242 # Testing receive in chunks over TCP 2243 seg1 = self.cli_conn.recv(len(MSG) - 3) 2244 seg2 = self.cli_conn.recv(1024) 2245 msg = seg1 + seg2 2246 self.assertEqual(msg, MSG) 2247 2248 def _testOverFlowRecv(self): 2249 self.serv_conn.send(MSG) 2250 2251 def testRecvFrom(self): 2252 # Testing large recvfrom() over TCP 2253 msg, addr = self.cli_conn.recvfrom(1024) 2254 self.assertEqual(msg, MSG) 2255 2256 def _testRecvFrom(self): 2257 self.serv_conn.send(MSG) 2258 2259 def testOverFlowRecvFrom(self): 2260 # Testing recvfrom() in chunks over TCP 2261 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 2262 seg2, addr = self.cli_conn.recvfrom(1024) 2263 msg = seg1 + seg2 2264 self.assertEqual(msg, MSG) 2265 2266 def _testOverFlowRecvFrom(self): 2267 self.serv_conn.send(MSG) 2268 2269 def testSendAll(self): 2270 # Testing sendall() with a 2048 byte string over TCP 2271 msg = b'' 2272 while 1: 2273 read = self.cli_conn.recv(1024) 2274 if not read: 2275 break 2276 msg += read 2277 self.assertEqual(msg, b'f' * 2048) 2278 2279 def _testSendAll(self): 2280 big_chunk = b'f' * 2048 2281 self.serv_conn.sendall(big_chunk) 2282 2283 def testFromFd(self): 2284 # Testing fromfd() 2285 fd = self.cli_conn.fileno() 2286 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 2287 self.addCleanup(sock.close) 2288 self.assertIsInstance(sock, socket.socket) 2289 msg = sock.recv(1024) 2290 self.assertEqual(msg, MSG) 2291 2292 def _testFromFd(self): 2293 self.serv_conn.send(MSG) 2294 2295 def testDup(self): 2296 # Testing dup() 2297 sock = self.cli_conn.dup() 2298 self.addCleanup(sock.close) 2299 msg = sock.recv(1024) 2300 self.assertEqual(msg, MSG) 2301 2302 def _testDup(self): 2303 self.serv_conn.send(MSG) 2304 2305 def testShutdown(self): 2306 # Testing shutdown() 2307 msg = self.cli_conn.recv(1024) 2308 self.assertEqual(msg, MSG) 2309 # wait for _testShutdown to finish: on OS X, when the server 2310 # closes the connection the client also becomes disconnected, 2311 # and the client's shutdown call will fail. (Issue #4397.) 2312 self.done.wait() 2313 2314 def _testShutdown(self): 2315 self.serv_conn.send(MSG) 2316 self.serv_conn.shutdown(2) 2317 2318 testShutdown_overflow = support.cpython_only(testShutdown) 2319 2320 @support.cpython_only 2321 def _testShutdown_overflow(self): 2322 import _testcapi 2323 self.serv_conn.send(MSG) 2324 # Issue 15989 2325 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2326 _testcapi.INT_MAX + 1) 2327 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2328 2 + (_testcapi.UINT_MAX + 1)) 2329 self.serv_conn.shutdown(2) 2330 2331 def testDetach(self): 2332 # Testing detach() 2333 fileno = self.cli_conn.fileno() 2334 f = self.cli_conn.detach() 2335 self.assertEqual(f, fileno) 2336 # cli_conn cannot be used anymore... 2337 self.assertTrue(self.cli_conn._closed) 2338 self.assertRaises(OSError, self.cli_conn.recv, 1024) 2339 self.cli_conn.close() 2340 # ...but we can create another socket using the (still open) 2341 # file descriptor 2342 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) 2343 self.addCleanup(sock.close) 2344 msg = sock.recv(1024) 2345 self.assertEqual(msg, MSG) 2346 2347 def _testDetach(self): 2348 self.serv_conn.send(MSG) 2349 2350 2351class BasicUDPTest(ThreadedUDPSocketTest): 2352 2353 def __init__(self, methodName='runTest'): 2354 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 2355 2356 def testSendtoAndRecv(self): 2357 # Testing sendto() and Recv() over UDP 2358 msg = self.serv.recv(len(MSG)) 2359 self.assertEqual(msg, MSG) 2360 2361 def _testSendtoAndRecv(self): 2362 self.cli.sendto(MSG, 0, (HOST, self.port)) 2363 2364 def testRecvFrom(self): 2365 # Testing recvfrom() over UDP 2366 msg, addr = self.serv.recvfrom(len(MSG)) 2367 self.assertEqual(msg, MSG) 2368 2369 def _testRecvFrom(self): 2370 self.cli.sendto(MSG, 0, (HOST, self.port)) 2371 2372 def testRecvFromNegative(self): 2373 # Negative lengths passed to recvfrom should give ValueError. 2374 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2375 2376 def _testRecvFromNegative(self): 2377 self.cli.sendto(MSG, 0, (HOST, self.port)) 2378 2379# Tests for the sendmsg()/recvmsg() interface. Where possible, the 2380# same test code is used with different families and types of socket 2381# (e.g. stream, datagram), and tests using recvmsg() are repeated 2382# using recvmsg_into(). 2383# 2384# The generic test classes such as SendmsgTests and 2385# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be 2386# supplied with sockets cli_sock and serv_sock representing the 2387# client's and the server's end of the connection respectively, and 2388# attributes cli_addr and serv_addr holding their (numeric where 2389# appropriate) addresses. 2390# 2391# The final concrete test classes combine these with subclasses of 2392# SocketTestBase which set up client and server sockets of a specific 2393# type, and with subclasses of SendrecvmsgBase such as 2394# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these 2395# sockets to cli_sock and serv_sock and override the methods and 2396# attributes of SendrecvmsgBase to fill in destination addresses if 2397# needed when sending, check for specific flags in msg_flags, etc. 2398# 2399# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using 2400# recvmsg_into(). 2401 2402# XXX: like the other datagram (UDP) tests in this module, the code 2403# here assumes that datagram delivery on the local machine will be 2404# reliable. 2405 2406class SendrecvmsgBase(ThreadSafeCleanupTestCase): 2407 # Base class for sendmsg()/recvmsg() tests. 2408 2409 # Time in seconds to wait before considering a test failed, or 2410 # None for no timeout. Not all tests actually set a timeout. 2411 fail_timeout = 3.0 2412 2413 def setUp(self): 2414 self.misc_event = threading.Event() 2415 super().setUp() 2416 2417 def sendToServer(self, msg): 2418 # Send msg to the server. 2419 return self.cli_sock.send(msg) 2420 2421 # Tuple of alternative default arguments for sendmsg() when called 2422 # via sendmsgToServer() (e.g. to include a destination address). 2423 sendmsg_to_server_defaults = () 2424 2425 def sendmsgToServer(self, *args): 2426 # Call sendmsg() on self.cli_sock with the given arguments, 2427 # filling in any arguments which are not supplied with the 2428 # corresponding items of self.sendmsg_to_server_defaults, if 2429 # any. 2430 return self.cli_sock.sendmsg( 2431 *(args + self.sendmsg_to_server_defaults[len(args):])) 2432 2433 def doRecvmsg(self, sock, bufsize, *args): 2434 # Call recvmsg() on sock with given arguments and return its 2435 # result. Should be used for tests which can use either 2436 # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides 2437 # this method with one which emulates it using recvmsg_into(), 2438 # thus allowing the same test to be used for both methods. 2439 result = sock.recvmsg(bufsize, *args) 2440 self.registerRecvmsgResult(result) 2441 return result 2442 2443 def registerRecvmsgResult(self, result): 2444 # Called by doRecvmsg() with the return value of recvmsg() or 2445 # recvmsg_into(). Can be overridden to arrange cleanup based 2446 # on the returned ancillary data, for instance. 2447 pass 2448 2449 def checkRecvmsgAddress(self, addr1, addr2): 2450 # Called to compare the received address with the address of 2451 # the peer. 2452 self.assertEqual(addr1, addr2) 2453 2454 # Flags that are normally unset in msg_flags 2455 msg_flags_common_unset = 0 2456 for name in ("MSG_CTRUNC", "MSG_OOB"): 2457 msg_flags_common_unset |= getattr(socket, name, 0) 2458 2459 # Flags that are normally set 2460 msg_flags_common_set = 0 2461 2462 # Flags set when a complete record has been received (e.g. MSG_EOR 2463 # for SCTP) 2464 msg_flags_eor_indicator = 0 2465 2466 # Flags set when a complete record has not been received 2467 # (e.g. MSG_TRUNC for datagram sockets) 2468 msg_flags_non_eor_indicator = 0 2469 2470 def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): 2471 # Method to check the value of msg_flags returned by recvmsg[_into](). 2472 # 2473 # Checks that all bits in msg_flags_common_set attribute are 2474 # set in "flags" and all bits in msg_flags_common_unset are 2475 # unset. 2476 # 2477 # The "eor" argument specifies whether the flags should 2478 # indicate that a full record (or datagram) has been received. 2479 # If "eor" is None, no checks are done; otherwise, checks 2480 # that: 2481 # 2482 # * if "eor" is true, all bits in msg_flags_eor_indicator are 2483 # set and all bits in msg_flags_non_eor_indicator are unset 2484 # 2485 # * if "eor" is false, all bits in msg_flags_non_eor_indicator 2486 # are set and all bits in msg_flags_eor_indicator are unset 2487 # 2488 # If "checkset" and/or "checkunset" are supplied, they require 2489 # the given bits to be set or unset respectively, overriding 2490 # what the attributes require for those bits. 2491 # 2492 # If any bits are set in "ignore", they will not be checked, 2493 # regardless of the other inputs. 2494 # 2495 # Will raise Exception if the inputs require a bit to be both 2496 # set and unset, and it is not ignored. 2497 2498 defaultset = self.msg_flags_common_set 2499 defaultunset = self.msg_flags_common_unset 2500 2501 if eor: 2502 defaultset |= self.msg_flags_eor_indicator 2503 defaultunset |= self.msg_flags_non_eor_indicator 2504 elif eor is not None: 2505 defaultset |= self.msg_flags_non_eor_indicator 2506 defaultunset |= self.msg_flags_eor_indicator 2507 2508 # Function arguments override defaults 2509 defaultset &= ~checkunset 2510 defaultunset &= ~checkset 2511 2512 # Merge arguments with remaining defaults, and check for conflicts 2513 checkset |= defaultset 2514 checkunset |= defaultunset 2515 inboth = checkset & checkunset & ~ignore 2516 if inboth: 2517 raise Exception("contradictory set, unset requirements for flags " 2518 "{0:#x}".format(inboth)) 2519 2520 # Compare with given msg_flags value 2521 mask = (checkset | checkunset) & ~ignore 2522 self.assertEqual(flags & mask, checkset & mask) 2523 2524 2525class RecvmsgIntoMixin(SendrecvmsgBase): 2526 # Mixin to implement doRecvmsg() using recvmsg_into(). 2527 2528 def doRecvmsg(self, sock, bufsize, *args): 2529 buf = bytearray(bufsize) 2530 result = sock.recvmsg_into([buf], *args) 2531 self.registerRecvmsgResult(result) 2532 self.assertGreaterEqual(result[0], 0) 2533 self.assertLessEqual(result[0], bufsize) 2534 return (bytes(buf[:result[0]]),) + result[1:] 2535 2536 2537class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): 2538 # Defines flags to be checked in msg_flags for datagram sockets. 2539 2540 @property 2541 def msg_flags_non_eor_indicator(self): 2542 return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC 2543 2544 2545class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): 2546 # Defines flags to be checked in msg_flags for SCTP sockets. 2547 2548 @property 2549 def msg_flags_eor_indicator(self): 2550 return super().msg_flags_eor_indicator | socket.MSG_EOR 2551 2552 2553class SendrecvmsgConnectionlessBase(SendrecvmsgBase): 2554 # Base class for tests on connectionless-mode sockets. Users must 2555 # supply sockets on attributes cli and serv to be mapped to 2556 # cli_sock and serv_sock respectively. 2557 2558 @property 2559 def serv_sock(self): 2560 return self.serv 2561 2562 @property 2563 def cli_sock(self): 2564 return self.cli 2565 2566 @property 2567 def sendmsg_to_server_defaults(self): 2568 return ([], [], 0, self.serv_addr) 2569 2570 def sendToServer(self, msg): 2571 return self.cli_sock.sendto(msg, self.serv_addr) 2572 2573 2574class SendrecvmsgConnectedBase(SendrecvmsgBase): 2575 # Base class for tests on connected sockets. Users must supply 2576 # sockets on attributes serv_conn and cli_conn (representing the 2577 # connections *to* the server and the client), to be mapped to 2578 # cli_sock and serv_sock respectively. 2579 2580 @property 2581 def serv_sock(self): 2582 return self.cli_conn 2583 2584 @property 2585 def cli_sock(self): 2586 return self.serv_conn 2587 2588 def checkRecvmsgAddress(self, addr1, addr2): 2589 # Address is currently "unspecified" for a connected socket, 2590 # so we don't examine it 2591 pass 2592 2593 2594class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): 2595 # Base class to set a timeout on server's socket. 2596 2597 def setUp(self): 2598 super().setUp() 2599 self.serv_sock.settimeout(self.fail_timeout) 2600 2601 2602class SendmsgTests(SendrecvmsgServerTimeoutBase): 2603 # Tests for sendmsg() which can use any socket type and do not 2604 # involve recvmsg() or recvmsg_into(). 2605 2606 def testSendmsg(self): 2607 # Send a simple message with sendmsg(). 2608 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2609 2610 def _testSendmsg(self): 2611 self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) 2612 2613 def testSendmsgDataGenerator(self): 2614 # Send from buffer obtained from a generator (not a sequence). 2615 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2616 2617 def _testSendmsgDataGenerator(self): 2618 self.assertEqual(self.sendmsgToServer((o for o in [MSG])), 2619 len(MSG)) 2620 2621 def testSendmsgAncillaryGenerator(self): 2622 # Gather (empty) ancillary data from a generator. 2623 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2624 2625 def _testSendmsgAncillaryGenerator(self): 2626 self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), 2627 len(MSG)) 2628 2629 def testSendmsgArray(self): 2630 # Send data from an array instead of the usual bytes object. 2631 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2632 2633 def _testSendmsgArray(self): 2634 self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), 2635 len(MSG)) 2636 2637 def testSendmsgGather(self): 2638 # Send message data from more than one buffer (gather write). 2639 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2640 2641 def _testSendmsgGather(self): 2642 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2643 2644 def testSendmsgBadArgs(self): 2645 # Check that sendmsg() rejects invalid arguments. 2646 self.assertEqual(self.serv_sock.recv(1000), b"done") 2647 2648 def _testSendmsgBadArgs(self): 2649 self.assertRaises(TypeError, self.cli_sock.sendmsg) 2650 self.assertRaises(TypeError, self.sendmsgToServer, 2651 b"not in an iterable") 2652 self.assertRaises(TypeError, self.sendmsgToServer, 2653 object()) 2654 self.assertRaises(TypeError, self.sendmsgToServer, 2655 [object()]) 2656 self.assertRaises(TypeError, self.sendmsgToServer, 2657 [MSG, object()]) 2658 self.assertRaises(TypeError, self.sendmsgToServer, 2659 [MSG], object()) 2660 self.assertRaises(TypeError, self.sendmsgToServer, 2661 [MSG], [], object()) 2662 self.assertRaises(TypeError, self.sendmsgToServer, 2663 [MSG], [], 0, object()) 2664 self.sendToServer(b"done") 2665 2666 def testSendmsgBadCmsg(self): 2667 # Check that invalid ancillary data items are rejected. 2668 self.assertEqual(self.serv_sock.recv(1000), b"done") 2669 2670 def _testSendmsgBadCmsg(self): 2671 self.assertRaises(TypeError, self.sendmsgToServer, 2672 [MSG], [object()]) 2673 self.assertRaises(TypeError, self.sendmsgToServer, 2674 [MSG], [(object(), 0, b"data")]) 2675 self.assertRaises(TypeError, self.sendmsgToServer, 2676 [MSG], [(0, object(), b"data")]) 2677 self.assertRaises(TypeError, self.sendmsgToServer, 2678 [MSG], [(0, 0, object())]) 2679 self.assertRaises(TypeError, self.sendmsgToServer, 2680 [MSG], [(0, 0)]) 2681 self.assertRaises(TypeError, self.sendmsgToServer, 2682 [MSG], [(0, 0, b"data", 42)]) 2683 self.sendToServer(b"done") 2684 2685 @requireAttrs(socket, "CMSG_SPACE") 2686 def testSendmsgBadMultiCmsg(self): 2687 # Check that invalid ancillary data items are rejected when 2688 # more than one item is present. 2689 self.assertEqual(self.serv_sock.recv(1000), b"done") 2690 2691 @testSendmsgBadMultiCmsg.client_skip 2692 def _testSendmsgBadMultiCmsg(self): 2693 self.assertRaises(TypeError, self.sendmsgToServer, 2694 [MSG], [0, 0, b""]) 2695 self.assertRaises(TypeError, self.sendmsgToServer, 2696 [MSG], [(0, 0, b""), object()]) 2697 self.sendToServer(b"done") 2698 2699 def testSendmsgExcessCmsgReject(self): 2700 # Check that sendmsg() rejects excess ancillary data items 2701 # when the number that can be sent is limited. 2702 self.assertEqual(self.serv_sock.recv(1000), b"done") 2703 2704 def _testSendmsgExcessCmsgReject(self): 2705 if not hasattr(socket, "CMSG_SPACE"): 2706 # Can only send one item 2707 with self.assertRaises(OSError) as cm: 2708 self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) 2709 self.assertIsNone(cm.exception.errno) 2710 self.sendToServer(b"done") 2711 2712 def testSendmsgAfterClose(self): 2713 # Check that sendmsg() fails on a closed socket. 2714 pass 2715 2716 def _testSendmsgAfterClose(self): 2717 self.cli_sock.close() 2718 self.assertRaises(OSError, self.sendmsgToServer, [MSG]) 2719 2720 2721class SendmsgStreamTests(SendmsgTests): 2722 # Tests for sendmsg() which require a stream socket and do not 2723 # involve recvmsg() or recvmsg_into(). 2724 2725 def testSendmsgExplicitNoneAddr(self): 2726 # Check that peer address can be specified as None. 2727 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2728 2729 def _testSendmsgExplicitNoneAddr(self): 2730 self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) 2731 2732 def testSendmsgTimeout(self): 2733 # Check that timeout works with sendmsg(). 2734 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2735 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2736 2737 def _testSendmsgTimeout(self): 2738 try: 2739 self.cli_sock.settimeout(0.03) 2740 try: 2741 while True: 2742 self.sendmsgToServer([b"a"*512]) 2743 except socket.timeout: 2744 pass 2745 except OSError as exc: 2746 if exc.errno != errno.ENOMEM: 2747 raise 2748 # bpo-33937 the test randomly fails on Travis CI with 2749 # "OSError: [Errno 12] Cannot allocate memory" 2750 else: 2751 self.fail("socket.timeout not raised") 2752 finally: 2753 self.misc_event.set() 2754 2755 # XXX: would be nice to have more tests for sendmsg flags argument. 2756 2757 # Linux supports MSG_DONTWAIT when sending, but in general, it 2758 # only works when receiving. Could add other platforms if they 2759 # support it too. 2760 @skipWithClientIf(sys.platform not in {"linux"}, 2761 "MSG_DONTWAIT not known to work on this platform when " 2762 "sending") 2763 def testSendmsgDontWait(self): 2764 # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. 2765 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2766 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2767 2768 @testSendmsgDontWait.client_skip 2769 def _testSendmsgDontWait(self): 2770 try: 2771 with self.assertRaises(OSError) as cm: 2772 while True: 2773 self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) 2774 # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI 2775 # with "OSError: [Errno 12] Cannot allocate memory" 2776 self.assertIn(cm.exception.errno, 2777 (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM)) 2778 finally: 2779 self.misc_event.set() 2780 2781 2782class SendmsgConnectionlessTests(SendmsgTests): 2783 # Tests for sendmsg() which require a connectionless-mode 2784 # (e.g. datagram) socket, and do not involve recvmsg() or 2785 # recvmsg_into(). 2786 2787 def testSendmsgNoDestAddr(self): 2788 # Check that sendmsg() fails when no destination address is 2789 # given for unconnected socket. 2790 pass 2791 2792 def _testSendmsgNoDestAddr(self): 2793 self.assertRaises(OSError, self.cli_sock.sendmsg, 2794 [MSG]) 2795 self.assertRaises(OSError, self.cli_sock.sendmsg, 2796 [MSG], [], 0, None) 2797 2798 2799class RecvmsgGenericTests(SendrecvmsgBase): 2800 # Tests for recvmsg() which can also be emulated using 2801 # recvmsg_into(), and can use any socket type. 2802 2803 def testRecvmsg(self): 2804 # Receive a simple message with recvmsg[_into](). 2805 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2806 self.assertEqual(msg, MSG) 2807 self.checkRecvmsgAddress(addr, self.cli_addr) 2808 self.assertEqual(ancdata, []) 2809 self.checkFlags(flags, eor=True) 2810 2811 def _testRecvmsg(self): 2812 self.sendToServer(MSG) 2813 2814 def testRecvmsgExplicitDefaults(self): 2815 # Test recvmsg[_into]() with default arguments provided explicitly. 2816 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2817 len(MSG), 0, 0) 2818 self.assertEqual(msg, MSG) 2819 self.checkRecvmsgAddress(addr, self.cli_addr) 2820 self.assertEqual(ancdata, []) 2821 self.checkFlags(flags, eor=True) 2822 2823 def _testRecvmsgExplicitDefaults(self): 2824 self.sendToServer(MSG) 2825 2826 def testRecvmsgShorter(self): 2827 # Receive a message smaller than buffer. 2828 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2829 len(MSG) + 42) 2830 self.assertEqual(msg, MSG) 2831 self.checkRecvmsgAddress(addr, self.cli_addr) 2832 self.assertEqual(ancdata, []) 2833 self.checkFlags(flags, eor=True) 2834 2835 def _testRecvmsgShorter(self): 2836 self.sendToServer(MSG) 2837 2838 def testRecvmsgTrunc(self): 2839 # Receive part of message, check for truncation indicators. 2840 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2841 len(MSG) - 3) 2842 self.assertEqual(msg, MSG[:-3]) 2843 self.checkRecvmsgAddress(addr, self.cli_addr) 2844 self.assertEqual(ancdata, []) 2845 self.checkFlags(flags, eor=False) 2846 2847 def _testRecvmsgTrunc(self): 2848 self.sendToServer(MSG) 2849 2850 def testRecvmsgShortAncillaryBuf(self): 2851 # Test ancillary data buffer too small to hold any ancillary data. 2852 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2853 len(MSG), 1) 2854 self.assertEqual(msg, MSG) 2855 self.checkRecvmsgAddress(addr, self.cli_addr) 2856 self.assertEqual(ancdata, []) 2857 self.checkFlags(flags, eor=True) 2858 2859 def _testRecvmsgShortAncillaryBuf(self): 2860 self.sendToServer(MSG) 2861 2862 def testRecvmsgLongAncillaryBuf(self): 2863 # Test large ancillary data buffer. 2864 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2865 len(MSG), 10240) 2866 self.assertEqual(msg, MSG) 2867 self.checkRecvmsgAddress(addr, self.cli_addr) 2868 self.assertEqual(ancdata, []) 2869 self.checkFlags(flags, eor=True) 2870 2871 def _testRecvmsgLongAncillaryBuf(self): 2872 self.sendToServer(MSG) 2873 2874 def testRecvmsgAfterClose(self): 2875 # Check that recvmsg[_into]() fails on a closed socket. 2876 self.serv_sock.close() 2877 self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) 2878 2879 def _testRecvmsgAfterClose(self): 2880 pass 2881 2882 def testRecvmsgTimeout(self): 2883 # Check that timeout works. 2884 try: 2885 self.serv_sock.settimeout(0.03) 2886 self.assertRaises(socket.timeout, 2887 self.doRecvmsg, self.serv_sock, len(MSG)) 2888 finally: 2889 self.misc_event.set() 2890 2891 def _testRecvmsgTimeout(self): 2892 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2893 2894 @requireAttrs(socket, "MSG_PEEK") 2895 def testRecvmsgPeek(self): 2896 # Check that MSG_PEEK in flags enables examination of pending 2897 # data without consuming it. 2898 2899 # Receive part of data with MSG_PEEK. 2900 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2901 len(MSG) - 3, 0, 2902 socket.MSG_PEEK) 2903 self.assertEqual(msg, MSG[:-3]) 2904 self.checkRecvmsgAddress(addr, self.cli_addr) 2905 self.assertEqual(ancdata, []) 2906 # Ignoring MSG_TRUNC here (so this test is the same for stream 2907 # and datagram sockets). Some wording in POSIX seems to 2908 # suggest that it needn't be set when peeking, but that may 2909 # just be a slip. 2910 self.checkFlags(flags, eor=False, 2911 ignore=getattr(socket, "MSG_TRUNC", 0)) 2912 2913 # Receive all data with MSG_PEEK. 2914 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2915 len(MSG), 0, 2916 socket.MSG_PEEK) 2917 self.assertEqual(msg, MSG) 2918 self.checkRecvmsgAddress(addr, self.cli_addr) 2919 self.assertEqual(ancdata, []) 2920 self.checkFlags(flags, eor=True) 2921 2922 # Check that the same data can still be received normally. 2923 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2924 self.assertEqual(msg, MSG) 2925 self.checkRecvmsgAddress(addr, self.cli_addr) 2926 self.assertEqual(ancdata, []) 2927 self.checkFlags(flags, eor=True) 2928 2929 @testRecvmsgPeek.client_skip 2930 def _testRecvmsgPeek(self): 2931 self.sendToServer(MSG) 2932 2933 @requireAttrs(socket.socket, "sendmsg") 2934 def testRecvmsgFromSendmsg(self): 2935 # Test receiving with recvmsg[_into]() when message is sent 2936 # using sendmsg(). 2937 self.serv_sock.settimeout(self.fail_timeout) 2938 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2939 self.assertEqual(msg, MSG) 2940 self.checkRecvmsgAddress(addr, self.cli_addr) 2941 self.assertEqual(ancdata, []) 2942 self.checkFlags(flags, eor=True) 2943 2944 @testRecvmsgFromSendmsg.client_skip 2945 def _testRecvmsgFromSendmsg(self): 2946 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2947 2948 2949class RecvmsgGenericStreamTests(RecvmsgGenericTests): 2950 # Tests which require a stream socket and can use either recvmsg() 2951 # or recvmsg_into(). 2952 2953 def testRecvmsgEOF(self): 2954 # Receive end-of-stream indicator (b"", peer socket closed). 2955 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 2956 self.assertEqual(msg, b"") 2957 self.checkRecvmsgAddress(addr, self.cli_addr) 2958 self.assertEqual(ancdata, []) 2959 self.checkFlags(flags, eor=None) # Might not have end-of-record marker 2960 2961 def _testRecvmsgEOF(self): 2962 self.cli_sock.close() 2963 2964 def testRecvmsgOverflow(self): 2965 # Receive a message in more than one chunk. 2966 seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2967 len(MSG) - 3) 2968 self.checkRecvmsgAddress(addr, self.cli_addr) 2969 self.assertEqual(ancdata, []) 2970 self.checkFlags(flags, eor=False) 2971 2972 seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 2973 self.checkRecvmsgAddress(addr, self.cli_addr) 2974 self.assertEqual(ancdata, []) 2975 self.checkFlags(flags, eor=True) 2976 2977 msg = seg1 + seg2 2978 self.assertEqual(msg, MSG) 2979 2980 def _testRecvmsgOverflow(self): 2981 self.sendToServer(MSG) 2982 2983 2984class RecvmsgTests(RecvmsgGenericTests): 2985 # Tests for recvmsg() which can use any socket type. 2986 2987 def testRecvmsgBadArgs(self): 2988 # Check that recvmsg() rejects invalid arguments. 2989 self.assertRaises(TypeError, self.serv_sock.recvmsg) 2990 self.assertRaises(ValueError, self.serv_sock.recvmsg, 2991 -1, 0, 0) 2992 self.assertRaises(ValueError, self.serv_sock.recvmsg, 2993 len(MSG), -1, 0) 2994 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2995 [bytearray(10)], 0, 0) 2996 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2997 object(), 0, 0) 2998 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2999 len(MSG), object(), 0) 3000 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3001 len(MSG), 0, object()) 3002 3003 msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) 3004 self.assertEqual(msg, MSG) 3005 self.checkRecvmsgAddress(addr, self.cli_addr) 3006 self.assertEqual(ancdata, []) 3007 self.checkFlags(flags, eor=True) 3008 3009 def _testRecvmsgBadArgs(self): 3010 self.sendToServer(MSG) 3011 3012 3013class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): 3014 # Tests for recvmsg_into() which can use any socket type. 3015 3016 def testRecvmsgIntoBadArgs(self): 3017 # Check that recvmsg_into() rejects invalid arguments. 3018 buf = bytearray(len(MSG)) 3019 self.assertRaises(TypeError, self.serv_sock.recvmsg_into) 3020 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3021 len(MSG), 0, 0) 3022 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3023 buf, 0, 0) 3024 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3025 [object()], 0, 0) 3026 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3027 [b"I'm not writable"], 0, 0) 3028 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3029 [buf, object()], 0, 0) 3030 self.assertRaises(ValueError, self.serv_sock.recvmsg_into, 3031 [buf], -1, 0) 3032 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3033 [buf], object(), 0) 3034 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3035 [buf], 0, object()) 3036 3037 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) 3038 self.assertEqual(nbytes, len(MSG)) 3039 self.assertEqual(buf, bytearray(MSG)) 3040 self.checkRecvmsgAddress(addr, self.cli_addr) 3041 self.assertEqual(ancdata, []) 3042 self.checkFlags(flags, eor=True) 3043 3044 def _testRecvmsgIntoBadArgs(self): 3045 self.sendToServer(MSG) 3046 3047 def testRecvmsgIntoGenerator(self): 3048 # Receive into buffer obtained from a generator (not a sequence). 3049 buf = bytearray(len(MSG)) 3050 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3051 (o for o in [buf])) 3052 self.assertEqual(nbytes, len(MSG)) 3053 self.assertEqual(buf, bytearray(MSG)) 3054 self.checkRecvmsgAddress(addr, self.cli_addr) 3055 self.assertEqual(ancdata, []) 3056 self.checkFlags(flags, eor=True) 3057 3058 def _testRecvmsgIntoGenerator(self): 3059 self.sendToServer(MSG) 3060 3061 def testRecvmsgIntoArray(self): 3062 # Receive into an array rather than the usual bytearray. 3063 buf = array.array("B", [0] * len(MSG)) 3064 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) 3065 self.assertEqual(nbytes, len(MSG)) 3066 self.assertEqual(buf.tobytes(), MSG) 3067 self.checkRecvmsgAddress(addr, self.cli_addr) 3068 self.assertEqual(ancdata, []) 3069 self.checkFlags(flags, eor=True) 3070 3071 def _testRecvmsgIntoArray(self): 3072 self.sendToServer(MSG) 3073 3074 def testRecvmsgIntoScatter(self): 3075 # Receive into multiple buffers (scatter write). 3076 b1 = bytearray(b"----") 3077 b2 = bytearray(b"0123456789") 3078 b3 = bytearray(b"--------------") 3079 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3080 [b1, memoryview(b2)[2:9], b3]) 3081 self.assertEqual(nbytes, len(b"Mary had a little lamb")) 3082 self.assertEqual(b1, bytearray(b"Mary")) 3083 self.assertEqual(b2, bytearray(b"01 had a 9")) 3084 self.assertEqual(b3, bytearray(b"little lamb---")) 3085 self.checkRecvmsgAddress(addr, self.cli_addr) 3086 self.assertEqual(ancdata, []) 3087 self.checkFlags(flags, eor=True) 3088 3089 def _testRecvmsgIntoScatter(self): 3090 self.sendToServer(b"Mary had a little lamb") 3091 3092 3093class CmsgMacroTests(unittest.TestCase): 3094 # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests 3095 # assumptions used by sendmsg() and recvmsg[_into](), which share 3096 # code with these functions. 3097 3098 # Match the definition in socketmodule.c 3099 try: 3100 import _testcapi 3101 except ImportError: 3102 socklen_t_limit = 0x7fffffff 3103 else: 3104 socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) 3105 3106 @requireAttrs(socket, "CMSG_LEN") 3107 def testCMSG_LEN(self): 3108 # Test CMSG_LEN() with various valid and invalid values, 3109 # checking the assumptions used by recvmsg() and sendmsg(). 3110 toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 3111 values = list(range(257)) + list(range(toobig - 257, toobig)) 3112 3113 # struct cmsghdr has at least three members, two of which are ints 3114 self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) 3115 for n in values: 3116 ret = socket.CMSG_LEN(n) 3117 # This is how recvmsg() calculates the data size 3118 self.assertEqual(ret - socket.CMSG_LEN(0), n) 3119 self.assertLessEqual(ret, self.socklen_t_limit) 3120 3121 self.assertRaises(OverflowError, socket.CMSG_LEN, -1) 3122 # sendmsg() shares code with these functions, and requires 3123 # that it reject values over the limit. 3124 self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) 3125 self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) 3126 3127 @requireAttrs(socket, "CMSG_SPACE") 3128 def testCMSG_SPACE(self): 3129 # Test CMSG_SPACE() with various valid and invalid values, 3130 # checking the assumptions used by sendmsg(). 3131 toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 3132 values = list(range(257)) + list(range(toobig - 257, toobig)) 3133 3134 last = socket.CMSG_SPACE(0) 3135 # struct cmsghdr has at least three members, two of which are ints 3136 self.assertGreater(last, array.array("i").itemsize * 2) 3137 for n in values: 3138 ret = socket.CMSG_SPACE(n) 3139 self.assertGreaterEqual(ret, last) 3140 self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) 3141 self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) 3142 self.assertLessEqual(ret, self.socklen_t_limit) 3143 last = ret 3144 3145 self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) 3146 # sendmsg() shares code with these functions, and requires 3147 # that it reject values over the limit. 3148 self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) 3149 self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) 3150 3151 3152class SCMRightsTest(SendrecvmsgServerTimeoutBase): 3153 # Tests for file descriptor passing on Unix-domain sockets. 3154 3155 # Invalid file descriptor value that's unlikely to evaluate to a 3156 # real FD even if one of its bytes is replaced with a different 3157 # value (which shouldn't actually happen). 3158 badfd = -0x5555 3159 3160 def newFDs(self, n): 3161 # Return a list of n file descriptors for newly-created files 3162 # containing their list indices as ASCII numbers. 3163 fds = [] 3164 for i in range(n): 3165 fd, path = tempfile.mkstemp() 3166 self.addCleanup(os.unlink, path) 3167 self.addCleanup(os.close, fd) 3168 os.write(fd, str(i).encode()) 3169 fds.append(fd) 3170 return fds 3171 3172 def checkFDs(self, fds): 3173 # Check that the file descriptors in the given list contain 3174 # their correct list indices as ASCII numbers. 3175 for n, fd in enumerate(fds): 3176 os.lseek(fd, 0, os.SEEK_SET) 3177 self.assertEqual(os.read(fd, 1024), str(n).encode()) 3178 3179 def registerRecvmsgResult(self, result): 3180 self.addCleanup(self.closeRecvmsgFDs, result) 3181 3182 def closeRecvmsgFDs(self, recvmsg_result): 3183 # Close all file descriptors specified in the ancillary data 3184 # of the given return value from recvmsg() or recvmsg_into(). 3185 for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: 3186 if (cmsg_level == socket.SOL_SOCKET and 3187 cmsg_type == socket.SCM_RIGHTS): 3188 fds = array.array("i") 3189 fds.frombytes(cmsg_data[: 3190 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3191 for fd in fds: 3192 os.close(fd) 3193 3194 def createAndSendFDs(self, n): 3195 # Send n new file descriptors created by newFDs() to the 3196 # server, with the constant MSG as the non-ancillary data. 3197 self.assertEqual( 3198 self.sendmsgToServer([MSG], 3199 [(socket.SOL_SOCKET, 3200 socket.SCM_RIGHTS, 3201 array.array("i", self.newFDs(n)))]), 3202 len(MSG)) 3203 3204 def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): 3205 # Check that constant MSG was received with numfds file 3206 # descriptors in a maximum of maxcmsgs control messages (which 3207 # must contain only complete integers). By default, check 3208 # that MSG_CTRUNC is unset, but ignore any flags in 3209 # ignoreflags. 3210 msg, ancdata, flags, addr = result 3211 self.assertEqual(msg, MSG) 3212 self.checkRecvmsgAddress(addr, self.cli_addr) 3213 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3214 ignore=ignoreflags) 3215 3216 self.assertIsInstance(ancdata, list) 3217 self.assertLessEqual(len(ancdata), maxcmsgs) 3218 fds = array.array("i") 3219 for item in ancdata: 3220 self.assertIsInstance(item, tuple) 3221 cmsg_level, cmsg_type, cmsg_data = item 3222 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3223 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3224 self.assertIsInstance(cmsg_data, bytes) 3225 self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) 3226 fds.frombytes(cmsg_data) 3227 3228 self.assertEqual(len(fds), numfds) 3229 self.checkFDs(fds) 3230 3231 def testFDPassSimple(self): 3232 # Pass a single FD (array read from bytes object). 3233 self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, 3234 len(MSG), 10240)) 3235 3236 def _testFDPassSimple(self): 3237 self.assertEqual( 3238 self.sendmsgToServer( 3239 [MSG], 3240 [(socket.SOL_SOCKET, 3241 socket.SCM_RIGHTS, 3242 array.array("i", self.newFDs(1)).tobytes())]), 3243 len(MSG)) 3244 3245 def testMultipleFDPass(self): 3246 # Pass multiple FDs in a single array. 3247 self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, 3248 len(MSG), 10240)) 3249 3250 def _testMultipleFDPass(self): 3251 self.createAndSendFDs(4) 3252 3253 @requireAttrs(socket, "CMSG_SPACE") 3254 def testFDPassCMSG_SPACE(self): 3255 # Test using CMSG_SPACE() to calculate ancillary buffer size. 3256 self.checkRecvmsgFDs( 3257 4, self.doRecvmsg(self.serv_sock, len(MSG), 3258 socket.CMSG_SPACE(4 * SIZEOF_INT))) 3259 3260 @testFDPassCMSG_SPACE.client_skip 3261 def _testFDPassCMSG_SPACE(self): 3262 self.createAndSendFDs(4) 3263 3264 def testFDPassCMSG_LEN(self): 3265 # Test using CMSG_LEN() to calculate ancillary buffer size. 3266 self.checkRecvmsgFDs(1, 3267 self.doRecvmsg(self.serv_sock, len(MSG), 3268 socket.CMSG_LEN(4 * SIZEOF_INT)), 3269 # RFC 3542 says implementations may set 3270 # MSG_CTRUNC if there isn't enough space 3271 # for trailing padding. 3272 ignoreflags=socket.MSG_CTRUNC) 3273 3274 def _testFDPassCMSG_LEN(self): 3275 self.createAndSendFDs(1) 3276 3277 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3278 @unittest.skipIf(AIX, "skipping, see issue #22397") 3279 @requireAttrs(socket, "CMSG_SPACE") 3280 def testFDPassSeparate(self): 3281 # Pass two FDs in two separate arrays. Arrays may be combined 3282 # into a single control message by the OS. 3283 self.checkRecvmsgFDs(2, 3284 self.doRecvmsg(self.serv_sock, len(MSG), 10240), 3285 maxcmsgs=2) 3286 3287 @testFDPassSeparate.client_skip 3288 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3289 @unittest.skipIf(AIX, "skipping, see issue #22397") 3290 def _testFDPassSeparate(self): 3291 fd0, fd1 = self.newFDs(2) 3292 self.assertEqual( 3293 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3294 socket.SCM_RIGHTS, 3295 array.array("i", [fd0])), 3296 (socket.SOL_SOCKET, 3297 socket.SCM_RIGHTS, 3298 array.array("i", [fd1]))]), 3299 len(MSG)) 3300 3301 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3302 @unittest.skipIf(AIX, "skipping, see issue #22397") 3303 @requireAttrs(socket, "CMSG_SPACE") 3304 def testFDPassSeparateMinSpace(self): 3305 # Pass two FDs in two separate arrays, receiving them into the 3306 # minimum space for two arrays. 3307 num_fds = 2 3308 self.checkRecvmsgFDs(num_fds, 3309 self.doRecvmsg(self.serv_sock, len(MSG), 3310 socket.CMSG_SPACE(SIZEOF_INT) + 3311 socket.CMSG_LEN(SIZEOF_INT * num_fds)), 3312 maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) 3313 3314 @testFDPassSeparateMinSpace.client_skip 3315 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3316 @unittest.skipIf(AIX, "skipping, see issue #22397") 3317 def _testFDPassSeparateMinSpace(self): 3318 fd0, fd1 = self.newFDs(2) 3319 self.assertEqual( 3320 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3321 socket.SCM_RIGHTS, 3322 array.array("i", [fd0])), 3323 (socket.SOL_SOCKET, 3324 socket.SCM_RIGHTS, 3325 array.array("i", [fd1]))]), 3326 len(MSG)) 3327 3328 def sendAncillaryIfPossible(self, msg, ancdata): 3329 # Try to send msg and ancdata to server, but if the system 3330 # call fails, just send msg with no ancillary data. 3331 try: 3332 nbytes = self.sendmsgToServer([msg], ancdata) 3333 except OSError as e: 3334 # Check that it was the system call that failed 3335 self.assertIsInstance(e.errno, int) 3336 nbytes = self.sendmsgToServer([msg]) 3337 self.assertEqual(nbytes, len(msg)) 3338 3339 @unittest.skipIf(sys.platform == "darwin", "see issue #24725") 3340 def testFDPassEmpty(self): 3341 # Try to pass an empty FD array. Can receive either no array 3342 # or an empty array. 3343 self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, 3344 len(MSG), 10240), 3345 ignoreflags=socket.MSG_CTRUNC) 3346 3347 def _testFDPassEmpty(self): 3348 self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, 3349 socket.SCM_RIGHTS, 3350 b"")]) 3351 3352 def testFDPassPartialInt(self): 3353 # Try to pass a truncated FD array. 3354 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3355 len(MSG), 10240) 3356 self.assertEqual(msg, MSG) 3357 self.checkRecvmsgAddress(addr, self.cli_addr) 3358 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3359 self.assertLessEqual(len(ancdata), 1) 3360 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3361 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3362 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3363 self.assertLess(len(cmsg_data), SIZEOF_INT) 3364 3365 def _testFDPassPartialInt(self): 3366 self.sendAncillaryIfPossible( 3367 MSG, 3368 [(socket.SOL_SOCKET, 3369 socket.SCM_RIGHTS, 3370 array.array("i", [self.badfd]).tobytes()[:-1])]) 3371 3372 @requireAttrs(socket, "CMSG_SPACE") 3373 def testFDPassPartialIntInMiddle(self): 3374 # Try to pass two FD arrays, the first of which is truncated. 3375 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3376 len(MSG), 10240) 3377 self.assertEqual(msg, MSG) 3378 self.checkRecvmsgAddress(addr, self.cli_addr) 3379 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3380 self.assertLessEqual(len(ancdata), 2) 3381 fds = array.array("i") 3382 # Arrays may have been combined in a single control message 3383 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3384 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3385 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3386 fds.frombytes(cmsg_data[: 3387 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3388 self.assertLessEqual(len(fds), 2) 3389 self.checkFDs(fds) 3390 3391 @testFDPassPartialIntInMiddle.client_skip 3392 def _testFDPassPartialIntInMiddle(self): 3393 fd0, fd1 = self.newFDs(2) 3394 self.sendAncillaryIfPossible( 3395 MSG, 3396 [(socket.SOL_SOCKET, 3397 socket.SCM_RIGHTS, 3398 array.array("i", [fd0, self.badfd]).tobytes()[:-1]), 3399 (socket.SOL_SOCKET, 3400 socket.SCM_RIGHTS, 3401 array.array("i", [fd1]))]) 3402 3403 def checkTruncatedHeader(self, result, ignoreflags=0): 3404 # Check that no ancillary data items are returned when data is 3405 # truncated inside the cmsghdr structure. 3406 msg, ancdata, flags, addr = result 3407 self.assertEqual(msg, MSG) 3408 self.checkRecvmsgAddress(addr, self.cli_addr) 3409 self.assertEqual(ancdata, []) 3410 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3411 ignore=ignoreflags) 3412 3413 def testCmsgTruncNoBufSize(self): 3414 # Check that no ancillary data is received when no buffer size 3415 # is specified. 3416 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), 3417 # BSD seems to set MSG_CTRUNC only 3418 # if an item has been partially 3419 # received. 3420 ignoreflags=socket.MSG_CTRUNC) 3421 3422 def _testCmsgTruncNoBufSize(self): 3423 self.createAndSendFDs(1) 3424 3425 def testCmsgTrunc0(self): 3426 # Check that no ancillary data is received when buffer size is 0. 3427 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), 3428 ignoreflags=socket.MSG_CTRUNC) 3429 3430 def _testCmsgTrunc0(self): 3431 self.createAndSendFDs(1) 3432 3433 # Check that no ancillary data is returned for various non-zero 3434 # (but still too small) buffer sizes. 3435 3436 def testCmsgTrunc1(self): 3437 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) 3438 3439 def _testCmsgTrunc1(self): 3440 self.createAndSendFDs(1) 3441 3442 def testCmsgTrunc2Int(self): 3443 # The cmsghdr structure has at least three members, two of 3444 # which are ints, so we still shouldn't see any ancillary 3445 # data. 3446 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3447 SIZEOF_INT * 2)) 3448 3449 def _testCmsgTrunc2Int(self): 3450 self.createAndSendFDs(1) 3451 3452 def testCmsgTruncLen0Minus1(self): 3453 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3454 socket.CMSG_LEN(0) - 1)) 3455 3456 def _testCmsgTruncLen0Minus1(self): 3457 self.createAndSendFDs(1) 3458 3459 # The following tests try to truncate the control message in the 3460 # middle of the FD array. 3461 3462 def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): 3463 # Check that file descriptor data is truncated to between 3464 # mindata and maxdata bytes when received with buffer size 3465 # ancbuf, and that any complete file descriptor numbers are 3466 # valid. 3467 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3468 len(MSG), ancbuf) 3469 self.assertEqual(msg, MSG) 3470 self.checkRecvmsgAddress(addr, self.cli_addr) 3471 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3472 3473 if mindata == 0 and ancdata == []: 3474 return 3475 self.assertEqual(len(ancdata), 1) 3476 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3477 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3478 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3479 self.assertGreaterEqual(len(cmsg_data), mindata) 3480 self.assertLessEqual(len(cmsg_data), maxdata) 3481 fds = array.array("i") 3482 fds.frombytes(cmsg_data[: 3483 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3484 self.checkFDs(fds) 3485 3486 def testCmsgTruncLen0(self): 3487 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) 3488 3489 def _testCmsgTruncLen0(self): 3490 self.createAndSendFDs(1) 3491 3492 def testCmsgTruncLen0Plus1(self): 3493 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) 3494 3495 def _testCmsgTruncLen0Plus1(self): 3496 self.createAndSendFDs(2) 3497 3498 def testCmsgTruncLen1(self): 3499 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), 3500 maxdata=SIZEOF_INT) 3501 3502 def _testCmsgTruncLen1(self): 3503 self.createAndSendFDs(2) 3504 3505 def testCmsgTruncLen2Minus1(self): 3506 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, 3507 maxdata=(2 * SIZEOF_INT) - 1) 3508 3509 def _testCmsgTruncLen2Minus1(self): 3510 self.createAndSendFDs(2) 3511 3512 3513class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): 3514 # Test sendmsg() and recvmsg[_into]() using the ancillary data 3515 # features of the RFC 3542 Advanced Sockets API for IPv6. 3516 # Currently we can only handle certain data items (e.g. traffic 3517 # class, hop limit, MTU discovery and fragmentation settings) 3518 # without resorting to unportable means such as the struct module, 3519 # but the tests here are aimed at testing the ancillary data 3520 # handling in sendmsg() and recvmsg() rather than the IPv6 API 3521 # itself. 3522 3523 # Test value to use when setting hop limit of packet 3524 hop_limit = 2 3525 3526 # Test value to use when setting traffic class of packet. 3527 # -1 means "use kernel default". 3528 traffic_class = -1 3529 3530 def ancillaryMapping(self, ancdata): 3531 # Given ancillary data list ancdata, return a mapping from 3532 # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. 3533 # Check that no (level, type) pair appears more than once. 3534 d = {} 3535 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3536 self.assertNotIn((cmsg_level, cmsg_type), d) 3537 d[(cmsg_level, cmsg_type)] = cmsg_data 3538 return d 3539 3540 def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): 3541 # Receive hop limit into ancbufsize bytes of ancillary data 3542 # space. Check that data is MSG, ancillary data is not 3543 # truncated (but ignore any flags in ignoreflags), and hop 3544 # limit is between 0 and maxhop inclusive. 3545 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3546 socket.IPV6_RECVHOPLIMIT, 1) 3547 self.misc_event.set() 3548 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3549 len(MSG), ancbufsize) 3550 3551 self.assertEqual(msg, MSG) 3552 self.checkRecvmsgAddress(addr, self.cli_addr) 3553 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3554 ignore=ignoreflags) 3555 3556 self.assertEqual(len(ancdata), 1) 3557 self.assertIsInstance(ancdata[0], tuple) 3558 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3559 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3560 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3561 self.assertIsInstance(cmsg_data, bytes) 3562 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3563 a = array.array("i") 3564 a.frombytes(cmsg_data) 3565 self.assertGreaterEqual(a[0], 0) 3566 self.assertLessEqual(a[0], maxhop) 3567 3568 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3569 def testRecvHopLimit(self): 3570 # Test receiving the packet hop limit as ancillary data. 3571 self.checkHopLimit(ancbufsize=10240) 3572 3573 @testRecvHopLimit.client_skip 3574 def _testRecvHopLimit(self): 3575 # Need to wait until server has asked to receive ancillary 3576 # data, as implementations are not required to buffer it 3577 # otherwise. 3578 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3579 self.sendToServer(MSG) 3580 3581 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3582 def testRecvHopLimitCMSG_SPACE(self): 3583 # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. 3584 self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) 3585 3586 @testRecvHopLimitCMSG_SPACE.client_skip 3587 def _testRecvHopLimitCMSG_SPACE(self): 3588 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3589 self.sendToServer(MSG) 3590 3591 # Could test receiving into buffer sized using CMSG_LEN, but RFC 3592 # 3542 says portable applications must provide space for trailing 3593 # padding. Implementations may set MSG_CTRUNC if there isn't 3594 # enough space for the padding. 3595 3596 @requireAttrs(socket.socket, "sendmsg") 3597 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3598 def testSetHopLimit(self): 3599 # Test setting hop limit on outgoing packet and receiving it 3600 # at the other end. 3601 self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) 3602 3603 @testSetHopLimit.client_skip 3604 def _testSetHopLimit(self): 3605 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3606 self.assertEqual( 3607 self.sendmsgToServer([MSG], 3608 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3609 array.array("i", [self.hop_limit]))]), 3610 len(MSG)) 3611 3612 def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, 3613 ignoreflags=0): 3614 # Receive traffic class and hop limit into ancbufsize bytes of 3615 # ancillary data space. Check that data is MSG, ancillary 3616 # data is not truncated (but ignore any flags in ignoreflags), 3617 # and traffic class and hop limit are in range (hop limit no 3618 # more than maxhop). 3619 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3620 socket.IPV6_RECVHOPLIMIT, 1) 3621 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3622 socket.IPV6_RECVTCLASS, 1) 3623 self.misc_event.set() 3624 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3625 len(MSG), ancbufsize) 3626 3627 self.assertEqual(msg, MSG) 3628 self.checkRecvmsgAddress(addr, self.cli_addr) 3629 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3630 ignore=ignoreflags) 3631 self.assertEqual(len(ancdata), 2) 3632 ancmap = self.ancillaryMapping(ancdata) 3633 3634 tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] 3635 self.assertEqual(len(tcdata), SIZEOF_INT) 3636 a = array.array("i") 3637 a.frombytes(tcdata) 3638 self.assertGreaterEqual(a[0], 0) 3639 self.assertLessEqual(a[0], 255) 3640 3641 hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] 3642 self.assertEqual(len(hldata), SIZEOF_INT) 3643 a = array.array("i") 3644 a.frombytes(hldata) 3645 self.assertGreaterEqual(a[0], 0) 3646 self.assertLessEqual(a[0], maxhop) 3647 3648 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3649 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3650 def testRecvTrafficClassAndHopLimit(self): 3651 # Test receiving traffic class and hop limit as ancillary data. 3652 self.checkTrafficClassAndHopLimit(ancbufsize=10240) 3653 3654 @testRecvTrafficClassAndHopLimit.client_skip 3655 def _testRecvTrafficClassAndHopLimit(self): 3656 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3657 self.sendToServer(MSG) 3658 3659 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3660 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3661 def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3662 # Test receiving traffic class and hop limit, using 3663 # CMSG_SPACE() to calculate buffer size. 3664 self.checkTrafficClassAndHopLimit( 3665 ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) 3666 3667 @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip 3668 def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3669 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3670 self.sendToServer(MSG) 3671 3672 @requireAttrs(socket.socket, "sendmsg") 3673 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3674 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3675 def testSetTrafficClassAndHopLimit(self): 3676 # Test setting traffic class and hop limit on outgoing packet, 3677 # and receiving them at the other end. 3678 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3679 maxhop=self.hop_limit) 3680 3681 @testSetTrafficClassAndHopLimit.client_skip 3682 def _testSetTrafficClassAndHopLimit(self): 3683 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3684 self.assertEqual( 3685 self.sendmsgToServer([MSG], 3686 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3687 array.array("i", [self.traffic_class])), 3688 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3689 array.array("i", [self.hop_limit]))]), 3690 len(MSG)) 3691 3692 @requireAttrs(socket.socket, "sendmsg") 3693 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3694 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3695 def testOddCmsgSize(self): 3696 # Try to send ancillary data with first item one byte too 3697 # long. Fall back to sending with correct size if this fails, 3698 # and check that second item was handled correctly. 3699 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3700 maxhop=self.hop_limit) 3701 3702 @testOddCmsgSize.client_skip 3703 def _testOddCmsgSize(self): 3704 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3705 try: 3706 nbytes = self.sendmsgToServer( 3707 [MSG], 3708 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3709 array.array("i", [self.traffic_class]).tobytes() + b"\x00"), 3710 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3711 array.array("i", [self.hop_limit]))]) 3712 except OSError as e: 3713 self.assertIsInstance(e.errno, int) 3714 nbytes = self.sendmsgToServer( 3715 [MSG], 3716 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3717 array.array("i", [self.traffic_class])), 3718 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3719 array.array("i", [self.hop_limit]))]) 3720 self.assertEqual(nbytes, len(MSG)) 3721 3722 # Tests for proper handling of truncated ancillary data 3723 3724 def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): 3725 # Receive hop limit into ancbufsize bytes of ancillary data 3726 # space, which should be too small to contain the ancillary 3727 # data header (if ancbufsize is None, pass no second argument 3728 # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set 3729 # (unless included in ignoreflags), and no ancillary data is 3730 # returned. 3731 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3732 socket.IPV6_RECVHOPLIMIT, 1) 3733 self.misc_event.set() 3734 args = () if ancbufsize is None else (ancbufsize,) 3735 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3736 len(MSG), *args) 3737 3738 self.assertEqual(msg, MSG) 3739 self.checkRecvmsgAddress(addr, self.cli_addr) 3740 self.assertEqual(ancdata, []) 3741 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3742 ignore=ignoreflags) 3743 3744 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3745 def testCmsgTruncNoBufSize(self): 3746 # Check that no ancillary data is received when no ancillary 3747 # buffer size is provided. 3748 self.checkHopLimitTruncatedHeader(ancbufsize=None, 3749 # BSD seems to set 3750 # MSG_CTRUNC only if an item 3751 # has been partially 3752 # received. 3753 ignoreflags=socket.MSG_CTRUNC) 3754 3755 @testCmsgTruncNoBufSize.client_skip 3756 def _testCmsgTruncNoBufSize(self): 3757 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3758 self.sendToServer(MSG) 3759 3760 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3761 def testSingleCmsgTrunc0(self): 3762 # Check that no ancillary data is received when ancillary 3763 # buffer size is zero. 3764 self.checkHopLimitTruncatedHeader(ancbufsize=0, 3765 ignoreflags=socket.MSG_CTRUNC) 3766 3767 @testSingleCmsgTrunc0.client_skip 3768 def _testSingleCmsgTrunc0(self): 3769 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3770 self.sendToServer(MSG) 3771 3772 # Check that no ancillary data is returned for various non-zero 3773 # (but still too small) buffer sizes. 3774 3775 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3776 def testSingleCmsgTrunc1(self): 3777 self.checkHopLimitTruncatedHeader(ancbufsize=1) 3778 3779 @testSingleCmsgTrunc1.client_skip 3780 def _testSingleCmsgTrunc1(self): 3781 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3782 self.sendToServer(MSG) 3783 3784 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3785 def testSingleCmsgTrunc2Int(self): 3786 self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) 3787 3788 @testSingleCmsgTrunc2Int.client_skip 3789 def _testSingleCmsgTrunc2Int(self): 3790 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3791 self.sendToServer(MSG) 3792 3793 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3794 def testSingleCmsgTruncLen0Minus1(self): 3795 self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) 3796 3797 @testSingleCmsgTruncLen0Minus1.client_skip 3798 def _testSingleCmsgTruncLen0Minus1(self): 3799 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3800 self.sendToServer(MSG) 3801 3802 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3803 def testSingleCmsgTruncInData(self): 3804 # Test truncation of a control message inside its associated 3805 # data. The message may be returned with its data truncated, 3806 # or not returned at all. 3807 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3808 socket.IPV6_RECVHOPLIMIT, 1) 3809 self.misc_event.set() 3810 msg, ancdata, flags, addr = self.doRecvmsg( 3811 self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) 3812 3813 self.assertEqual(msg, MSG) 3814 self.checkRecvmsgAddress(addr, self.cli_addr) 3815 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3816 3817 self.assertLessEqual(len(ancdata), 1) 3818 if ancdata: 3819 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3820 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3821 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3822 self.assertLess(len(cmsg_data), SIZEOF_INT) 3823 3824 @testSingleCmsgTruncInData.client_skip 3825 def _testSingleCmsgTruncInData(self): 3826 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3827 self.sendToServer(MSG) 3828 3829 def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): 3830 # Receive traffic class and hop limit into ancbufsize bytes of 3831 # ancillary data space, which should be large enough to 3832 # contain the first item, but too small to contain the header 3833 # of the second. Check that data is MSG, MSG_CTRUNC is set 3834 # (unless included in ignoreflags), and only one ancillary 3835 # data item is returned. 3836 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3837 socket.IPV6_RECVHOPLIMIT, 1) 3838 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3839 socket.IPV6_RECVTCLASS, 1) 3840 self.misc_event.set() 3841 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3842 len(MSG), ancbufsize) 3843 3844 self.assertEqual(msg, MSG) 3845 self.checkRecvmsgAddress(addr, self.cli_addr) 3846 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3847 ignore=ignoreflags) 3848 3849 self.assertEqual(len(ancdata), 1) 3850 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3851 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3852 self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) 3853 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3854 a = array.array("i") 3855 a.frombytes(cmsg_data) 3856 self.assertGreaterEqual(a[0], 0) 3857 self.assertLessEqual(a[0], 255) 3858 3859 # Try the above test with various buffer sizes. 3860 3861 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3862 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3863 def testSecondCmsgTrunc0(self): 3864 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), 3865 ignoreflags=socket.MSG_CTRUNC) 3866 3867 @testSecondCmsgTrunc0.client_skip 3868 def _testSecondCmsgTrunc0(self): 3869 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3870 self.sendToServer(MSG) 3871 3872 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3873 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3874 def testSecondCmsgTrunc1(self): 3875 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) 3876 3877 @testSecondCmsgTrunc1.client_skip 3878 def _testSecondCmsgTrunc1(self): 3879 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3880 self.sendToServer(MSG) 3881 3882 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3883 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3884 def testSecondCmsgTrunc2Int(self): 3885 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 3886 2 * SIZEOF_INT) 3887 3888 @testSecondCmsgTrunc2Int.client_skip 3889 def _testSecondCmsgTrunc2Int(self): 3890 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3891 self.sendToServer(MSG) 3892 3893 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3894 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3895 def testSecondCmsgTruncLen0Minus1(self): 3896 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 3897 socket.CMSG_LEN(0) - 1) 3898 3899 @testSecondCmsgTruncLen0Minus1.client_skip 3900 def _testSecondCmsgTruncLen0Minus1(self): 3901 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3902 self.sendToServer(MSG) 3903 3904 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3905 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3906 def testSecomdCmsgTruncInData(self): 3907 # Test truncation of the second of two control messages inside 3908 # its associated data. 3909 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3910 socket.IPV6_RECVHOPLIMIT, 1) 3911 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3912 socket.IPV6_RECVTCLASS, 1) 3913 self.misc_event.set() 3914 msg, ancdata, flags, addr = self.doRecvmsg( 3915 self.serv_sock, len(MSG), 3916 socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) 3917 3918 self.assertEqual(msg, MSG) 3919 self.checkRecvmsgAddress(addr, self.cli_addr) 3920 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3921 3922 cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} 3923 3924 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 3925 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3926 cmsg_types.remove(cmsg_type) 3927 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3928 a = array.array("i") 3929 a.frombytes(cmsg_data) 3930 self.assertGreaterEqual(a[0], 0) 3931 self.assertLessEqual(a[0], 255) 3932 3933 if ancdata: 3934 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 3935 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3936 cmsg_types.remove(cmsg_type) 3937 self.assertLess(len(cmsg_data), SIZEOF_INT) 3938 3939 self.assertEqual(ancdata, []) 3940 3941 @testSecomdCmsgTruncInData.client_skip 3942 def _testSecomdCmsgTruncInData(self): 3943 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3944 self.sendToServer(MSG) 3945 3946 3947# Derive concrete test classes for different socket types. 3948 3949class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, 3950 SendrecvmsgConnectionlessBase, 3951 ThreadedSocketTestMixin, UDPTestBase): 3952 pass 3953 3954@requireAttrs(socket.socket, "sendmsg") 3955class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): 3956 pass 3957 3958@requireAttrs(socket.socket, "recvmsg") 3959class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): 3960 pass 3961 3962@requireAttrs(socket.socket, "recvmsg_into") 3963class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): 3964 pass 3965 3966 3967class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, 3968 SendrecvmsgConnectionlessBase, 3969 ThreadedSocketTestMixin, UDP6TestBase): 3970 3971 def checkRecvmsgAddress(self, addr1, addr2): 3972 # Called to compare the received address with the address of 3973 # the peer, ignoring scope ID 3974 self.assertEqual(addr1[:-1], addr2[:-1]) 3975 3976@requireAttrs(socket.socket, "sendmsg") 3977@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3978@requireSocket("AF_INET6", "SOCK_DGRAM") 3979class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): 3980 pass 3981 3982@requireAttrs(socket.socket, "recvmsg") 3983@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3984@requireSocket("AF_INET6", "SOCK_DGRAM") 3985class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): 3986 pass 3987 3988@requireAttrs(socket.socket, "recvmsg_into") 3989@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3990@requireSocket("AF_INET6", "SOCK_DGRAM") 3991class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): 3992 pass 3993 3994@requireAttrs(socket.socket, "recvmsg") 3995@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3996@requireAttrs(socket, "IPPROTO_IPV6") 3997@requireSocket("AF_INET6", "SOCK_DGRAM") 3998class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, 3999 SendrecvmsgUDP6TestBase): 4000 pass 4001 4002@requireAttrs(socket.socket, "recvmsg_into") 4003@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 4004@requireAttrs(socket, "IPPROTO_IPV6") 4005@requireSocket("AF_INET6", "SOCK_DGRAM") 4006class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, 4007 RFC3542AncillaryTest, 4008 SendrecvmsgUDP6TestBase): 4009 pass 4010 4011 4012class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, 4013 ConnectedStreamTestMixin, TCPTestBase): 4014 pass 4015 4016@requireAttrs(socket.socket, "sendmsg") 4017class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): 4018 pass 4019 4020@requireAttrs(socket.socket, "recvmsg") 4021class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, 4022 SendrecvmsgTCPTestBase): 4023 pass 4024 4025@requireAttrs(socket.socket, "recvmsg_into") 4026class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4027 SendrecvmsgTCPTestBase): 4028 pass 4029 4030 4031class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, 4032 SendrecvmsgConnectedBase, 4033 ConnectedStreamTestMixin, SCTPStreamBase): 4034 pass 4035 4036@requireAttrs(socket.socket, "sendmsg") 4037@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4038@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4039class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): 4040 pass 4041 4042@requireAttrs(socket.socket, "recvmsg") 4043@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4044@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4045class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4046 SendrecvmsgSCTPStreamTestBase): 4047 4048 def testRecvmsgEOF(self): 4049 try: 4050 super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() 4051 except OSError as e: 4052 if e.errno != errno.ENOTCONN: 4053 raise 4054 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4055 4056@requireAttrs(socket.socket, "recvmsg_into") 4057@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4058@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4059class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4060 SendrecvmsgSCTPStreamTestBase): 4061 4062 def testRecvmsgEOF(self): 4063 try: 4064 super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() 4065 except OSError as e: 4066 if e.errno != errno.ENOTCONN: 4067 raise 4068 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4069 4070 4071class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, 4072 ConnectedStreamTestMixin, UnixStreamBase): 4073 pass 4074 4075@requireAttrs(socket.socket, "sendmsg") 4076@requireAttrs(socket, "AF_UNIX") 4077class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): 4078 pass 4079 4080@requireAttrs(socket.socket, "recvmsg") 4081@requireAttrs(socket, "AF_UNIX") 4082class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4083 SendrecvmsgUnixStreamTestBase): 4084 pass 4085 4086@requireAttrs(socket.socket, "recvmsg_into") 4087@requireAttrs(socket, "AF_UNIX") 4088class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4089 SendrecvmsgUnixStreamTestBase): 4090 pass 4091 4092@requireAttrs(socket.socket, "sendmsg", "recvmsg") 4093@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4094class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): 4095 pass 4096 4097@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") 4098@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4099class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, 4100 SendrecvmsgUnixStreamTestBase): 4101 pass 4102 4103 4104# Test interrupting the interruptible send/receive methods with a 4105# signal when a timeout is set. These tests avoid having multiple 4106# threads alive during the test so that the OS cannot deliver the 4107# signal to the wrong one. 4108 4109class InterruptedTimeoutBase(unittest.TestCase): 4110 # Base class for interrupted send/receive tests. Installs an 4111 # empty handler for SIGALRM and removes it on teardown, along with 4112 # any scheduled alarms. 4113 4114 def setUp(self): 4115 super().setUp() 4116 orig_alrm_handler = signal.signal(signal.SIGALRM, 4117 lambda signum, frame: 1 / 0) 4118 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) 4119 4120 # Timeout for socket operations 4121 timeout = 4.0 4122 4123 # Provide setAlarm() method to schedule delivery of SIGALRM after 4124 # given number of seconds, or cancel it if zero, and an 4125 # appropriate time value to use. Use setitimer() if available. 4126 if hasattr(signal, "setitimer"): 4127 alarm_time = 0.05 4128 4129 def setAlarm(self, seconds): 4130 signal.setitimer(signal.ITIMER_REAL, seconds) 4131 else: 4132 # Old systems may deliver the alarm up to one second early 4133 alarm_time = 2 4134 4135 def setAlarm(self, seconds): 4136 signal.alarm(seconds) 4137 4138 4139# Require siginterrupt() in order to ensure that system calls are 4140# interrupted by default. 4141@requireAttrs(signal, "siginterrupt") 4142@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4143 "Don't have signal.alarm or signal.setitimer") 4144class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): 4145 # Test interrupting the recv*() methods with signals when a 4146 # timeout is set. 4147 4148 def setUp(self): 4149 super().setUp() 4150 self.serv.settimeout(self.timeout) 4151 4152 def checkInterruptedRecv(self, func, *args, **kwargs): 4153 # Check that func(*args, **kwargs) raises 4154 # errno of EINTR when interrupted by a signal. 4155 try: 4156 self.setAlarm(self.alarm_time) 4157 with self.assertRaises(ZeroDivisionError) as cm: 4158 func(*args, **kwargs) 4159 finally: 4160 self.setAlarm(0) 4161 4162 def testInterruptedRecvTimeout(self): 4163 self.checkInterruptedRecv(self.serv.recv, 1024) 4164 4165 def testInterruptedRecvIntoTimeout(self): 4166 self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) 4167 4168 def testInterruptedRecvfromTimeout(self): 4169 self.checkInterruptedRecv(self.serv.recvfrom, 1024) 4170 4171 def testInterruptedRecvfromIntoTimeout(self): 4172 self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) 4173 4174 @requireAttrs(socket.socket, "recvmsg") 4175 def testInterruptedRecvmsgTimeout(self): 4176 self.checkInterruptedRecv(self.serv.recvmsg, 1024) 4177 4178 @requireAttrs(socket.socket, "recvmsg_into") 4179 def testInterruptedRecvmsgIntoTimeout(self): 4180 self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) 4181 4182 4183# Require siginterrupt() in order to ensure that system calls are 4184# interrupted by default. 4185@requireAttrs(signal, "siginterrupt") 4186@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4187 "Don't have signal.alarm or signal.setitimer") 4188class InterruptedSendTimeoutTest(InterruptedTimeoutBase, 4189 ThreadSafeCleanupTestCase, 4190 SocketListeningTestMixin, TCPTestBase): 4191 # Test interrupting the interruptible send*() methods with signals 4192 # when a timeout is set. 4193 4194 def setUp(self): 4195 super().setUp() 4196 self.serv_conn = self.newSocket() 4197 self.addCleanup(self.serv_conn.close) 4198 # Use a thread to complete the connection, but wait for it to 4199 # terminate before running the test, so that there is only one 4200 # thread to accept the signal. 4201 cli_thread = threading.Thread(target=self.doConnect) 4202 cli_thread.start() 4203 self.cli_conn, addr = self.serv.accept() 4204 self.addCleanup(self.cli_conn.close) 4205 cli_thread.join() 4206 self.serv_conn.settimeout(self.timeout) 4207 4208 def doConnect(self): 4209 self.serv_conn.connect(self.serv_addr) 4210 4211 def checkInterruptedSend(self, func, *args, **kwargs): 4212 # Check that func(*args, **kwargs), run in a loop, raises 4213 # OSError with an errno of EINTR when interrupted by a 4214 # signal. 4215 try: 4216 with self.assertRaises(ZeroDivisionError) as cm: 4217 while True: 4218 self.setAlarm(self.alarm_time) 4219 func(*args, **kwargs) 4220 finally: 4221 self.setAlarm(0) 4222 4223 # Issue #12958: The following tests have problems on OS X prior to 10.7 4224 @support.requires_mac_ver(10, 7) 4225 def testInterruptedSendTimeout(self): 4226 self.checkInterruptedSend(self.serv_conn.send, b"a"*512) 4227 4228 @support.requires_mac_ver(10, 7) 4229 def testInterruptedSendtoTimeout(self): 4230 # Passing an actual address here as Python's wrapper for 4231 # sendto() doesn't allow passing a zero-length one; POSIX 4232 # requires that the address is ignored since the socket is 4233 # connection-mode, however. 4234 self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, 4235 self.serv_addr) 4236 4237 @support.requires_mac_ver(10, 7) 4238 @requireAttrs(socket.socket, "sendmsg") 4239 def testInterruptedSendmsgTimeout(self): 4240 self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) 4241 4242 4243class TCPCloserTest(ThreadedTCPSocketTest): 4244 4245 def testClose(self): 4246 conn, addr = self.serv.accept() 4247 conn.close() 4248 4249 sd = self.cli 4250 read, write, err = select.select([sd], [], [], 1.0) 4251 self.assertEqual(read, [sd]) 4252 self.assertEqual(sd.recv(1), b'') 4253 4254 # Calling close() many times should be safe. 4255 conn.close() 4256 conn.close() 4257 4258 def _testClose(self): 4259 self.cli.connect((HOST, self.port)) 4260 time.sleep(1.0) 4261 4262 4263class BasicSocketPairTest(SocketPairTest): 4264 4265 def __init__(self, methodName='runTest'): 4266 SocketPairTest.__init__(self, methodName=methodName) 4267 4268 def _check_defaults(self, sock): 4269 self.assertIsInstance(sock, socket.socket) 4270 if hasattr(socket, 'AF_UNIX'): 4271 self.assertEqual(sock.family, socket.AF_UNIX) 4272 else: 4273 self.assertEqual(sock.family, socket.AF_INET) 4274 self.assertEqual(sock.type, socket.SOCK_STREAM) 4275 self.assertEqual(sock.proto, 0) 4276 4277 def _testDefaults(self): 4278 self._check_defaults(self.cli) 4279 4280 def testDefaults(self): 4281 self._check_defaults(self.serv) 4282 4283 def testRecv(self): 4284 msg = self.serv.recv(1024) 4285 self.assertEqual(msg, MSG) 4286 4287 def _testRecv(self): 4288 self.cli.send(MSG) 4289 4290 def testSend(self): 4291 self.serv.send(MSG) 4292 4293 def _testSend(self): 4294 msg = self.cli.recv(1024) 4295 self.assertEqual(msg, MSG) 4296 4297 4298class NonBlockingTCPTests(ThreadedTCPSocketTest): 4299 4300 def __init__(self, methodName='runTest'): 4301 self.event = threading.Event() 4302 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 4303 4304 def assert_sock_timeout(self, sock, timeout): 4305 self.assertEqual(self.serv.gettimeout(), timeout) 4306 4307 blocking = (timeout != 0.0) 4308 self.assertEqual(sock.getblocking(), blocking) 4309 4310 if fcntl is not None: 4311 # When a Python socket has a non-zero timeout, it's switched 4312 # internally to a non-blocking mode. Later, sock.sendall(), 4313 # sock.recv(), and other socket operations use a select() call and 4314 # handle EWOULDBLOCK/EGAIN on all socket operations. That's how 4315 # timeouts are enforced. 4316 fd_blocking = (timeout is None) 4317 4318 flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK) 4319 self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking) 4320 4321 def testSetBlocking(self): 4322 # Test setblocking() and settimeout() methods 4323 self.serv.setblocking(True) 4324 self.assert_sock_timeout(self.serv, None) 4325 4326 self.serv.setblocking(False) 4327 self.assert_sock_timeout(self.serv, 0.0) 4328 4329 self.serv.settimeout(None) 4330 self.assert_sock_timeout(self.serv, None) 4331 4332 self.serv.settimeout(0) 4333 self.assert_sock_timeout(self.serv, 0) 4334 4335 self.serv.settimeout(10) 4336 self.assert_sock_timeout(self.serv, 10) 4337 4338 self.serv.settimeout(0) 4339 self.assert_sock_timeout(self.serv, 0) 4340 4341 def _testSetBlocking(self): 4342 pass 4343 4344 @support.cpython_only 4345 def testSetBlocking_overflow(self): 4346 # Issue 15989 4347 import _testcapi 4348 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 4349 self.skipTest('needs UINT_MAX < ULONG_MAX') 4350 4351 self.serv.setblocking(False) 4352 self.assertEqual(self.serv.gettimeout(), 0.0) 4353 4354 self.serv.setblocking(_testcapi.UINT_MAX + 1) 4355 self.assertIsNone(self.serv.gettimeout()) 4356 4357 _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) 4358 4359 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 4360 'test needs socket.SOCK_NONBLOCK') 4361 @support.requires_linux_version(2, 6, 28) 4362 def testInitNonBlocking(self): 4363 # create a socket with SOCK_NONBLOCK 4364 self.serv.close() 4365 self.serv = socket.socket(socket.AF_INET, 4366 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 4367 self.assert_sock_timeout(self.serv, 0) 4368 4369 def _testInitNonBlocking(self): 4370 pass 4371 4372 def testInheritFlagsBlocking(self): 4373 # bpo-7995: accept() on a listening socket with a timeout and the 4374 # default timeout is None, the resulting socket must be blocking. 4375 with socket_setdefaulttimeout(None): 4376 self.serv.settimeout(10) 4377 conn, addr = self.serv.accept() 4378 self.addCleanup(conn.close) 4379 self.assertIsNone(conn.gettimeout()) 4380 4381 def _testInheritFlagsBlocking(self): 4382 self.cli.connect((HOST, self.port)) 4383 4384 def testInheritFlagsTimeout(self): 4385 # bpo-7995: accept() on a listening socket with a timeout and the 4386 # default timeout is None, the resulting socket must inherit 4387 # the default timeout. 4388 default_timeout = 20.0 4389 with socket_setdefaulttimeout(default_timeout): 4390 self.serv.settimeout(10) 4391 conn, addr = self.serv.accept() 4392 self.addCleanup(conn.close) 4393 self.assertEqual(conn.gettimeout(), default_timeout) 4394 4395 def _testInheritFlagsTimeout(self): 4396 self.cli.connect((HOST, self.port)) 4397 4398 def testAccept(self): 4399 # Testing non-blocking accept 4400 self.serv.setblocking(0) 4401 4402 # connect() didn't start: non-blocking accept() fails 4403 start_time = time.monotonic() 4404 with self.assertRaises(BlockingIOError): 4405 conn, addr = self.serv.accept() 4406 dt = time.monotonic() - start_time 4407 self.assertLess(dt, 1.0) 4408 4409 self.event.set() 4410 4411 read, write, err = select.select([self.serv], [], [], MAIN_TIMEOUT) 4412 if self.serv not in read: 4413 self.fail("Error trying to do accept after select.") 4414 4415 # connect() completed: non-blocking accept() doesn't block 4416 conn, addr = self.serv.accept() 4417 self.addCleanup(conn.close) 4418 self.assertIsNone(conn.gettimeout()) 4419 4420 def _testAccept(self): 4421 # don't connect before event is set to check 4422 # that non-blocking accept() raises BlockingIOError 4423 self.event.wait() 4424 4425 self.cli.connect((HOST, self.port)) 4426 4427 def testRecv(self): 4428 # Testing non-blocking recv 4429 conn, addr = self.serv.accept() 4430 self.addCleanup(conn.close) 4431 conn.setblocking(0) 4432 4433 # the server didn't send data yet: non-blocking recv() fails 4434 with self.assertRaises(BlockingIOError): 4435 msg = conn.recv(len(MSG)) 4436 4437 self.event.set() 4438 4439 read, write, err = select.select([conn], [], [], MAIN_TIMEOUT) 4440 if conn not in read: 4441 self.fail("Error during select call to non-blocking socket.") 4442 4443 # the server sent data yet: non-blocking recv() doesn't block 4444 msg = conn.recv(len(MSG)) 4445 self.assertEqual(msg, MSG) 4446 4447 def _testRecv(self): 4448 self.cli.connect((HOST, self.port)) 4449 4450 # don't send anything before event is set to check 4451 # that non-blocking recv() raises BlockingIOError 4452 self.event.wait() 4453 4454 # send data: recv() will no longer block 4455 self.cli.sendall(MSG) 4456 4457 4458class FileObjectClassTestCase(SocketConnectedTest): 4459 """Unit tests for the object returned by socket.makefile() 4460 4461 self.read_file is the io object returned by makefile() on 4462 the client connection. You can read from this file to 4463 get output from the server. 4464 4465 self.write_file is the io object returned by makefile() on the 4466 server connection. You can write to this file to send output 4467 to the client. 4468 """ 4469 4470 bufsize = -1 # Use default buffer size 4471 encoding = 'utf-8' 4472 errors = 'strict' 4473 newline = None 4474 4475 read_mode = 'rb' 4476 read_msg = MSG 4477 write_mode = 'wb' 4478 write_msg = MSG 4479 4480 def __init__(self, methodName='runTest'): 4481 SocketConnectedTest.__init__(self, methodName=methodName) 4482 4483 def setUp(self): 4484 self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ 4485 threading.Event() for i in range(4)] 4486 SocketConnectedTest.setUp(self) 4487 self.read_file = self.cli_conn.makefile( 4488 self.read_mode, self.bufsize, 4489 encoding = self.encoding, 4490 errors = self.errors, 4491 newline = self.newline) 4492 4493 def tearDown(self): 4494 self.serv_finished.set() 4495 self.read_file.close() 4496 self.assertTrue(self.read_file.closed) 4497 self.read_file = None 4498 SocketConnectedTest.tearDown(self) 4499 4500 def clientSetUp(self): 4501 SocketConnectedTest.clientSetUp(self) 4502 self.write_file = self.serv_conn.makefile( 4503 self.write_mode, self.bufsize, 4504 encoding = self.encoding, 4505 errors = self.errors, 4506 newline = self.newline) 4507 4508 def clientTearDown(self): 4509 self.cli_finished.set() 4510 self.write_file.close() 4511 self.assertTrue(self.write_file.closed) 4512 self.write_file = None 4513 SocketConnectedTest.clientTearDown(self) 4514 4515 def testReadAfterTimeout(self): 4516 # Issue #7322: A file object must disallow further reads 4517 # after a timeout has occurred. 4518 self.cli_conn.settimeout(1) 4519 self.read_file.read(3) 4520 # First read raises a timeout 4521 self.assertRaises(socket.timeout, self.read_file.read, 1) 4522 # Second read is disallowed 4523 with self.assertRaises(OSError) as ctx: 4524 self.read_file.read(1) 4525 self.assertIn("cannot read from timed out object", str(ctx.exception)) 4526 4527 def _testReadAfterTimeout(self): 4528 self.write_file.write(self.write_msg[0:3]) 4529 self.write_file.flush() 4530 self.serv_finished.wait() 4531 4532 def testSmallRead(self): 4533 # Performing small file read test 4534 first_seg = self.read_file.read(len(self.read_msg)-3) 4535 second_seg = self.read_file.read(3) 4536 msg = first_seg + second_seg 4537 self.assertEqual(msg, self.read_msg) 4538 4539 def _testSmallRead(self): 4540 self.write_file.write(self.write_msg) 4541 self.write_file.flush() 4542 4543 def testFullRead(self): 4544 # read until EOF 4545 msg = self.read_file.read() 4546 self.assertEqual(msg, self.read_msg) 4547 4548 def _testFullRead(self): 4549 self.write_file.write(self.write_msg) 4550 self.write_file.close() 4551 4552 def testUnbufferedRead(self): 4553 # Performing unbuffered file read test 4554 buf = type(self.read_msg)() 4555 while 1: 4556 char = self.read_file.read(1) 4557 if not char: 4558 break 4559 buf += char 4560 self.assertEqual(buf, self.read_msg) 4561 4562 def _testUnbufferedRead(self): 4563 self.write_file.write(self.write_msg) 4564 self.write_file.flush() 4565 4566 def testReadline(self): 4567 # Performing file readline test 4568 line = self.read_file.readline() 4569 self.assertEqual(line, self.read_msg) 4570 4571 def _testReadline(self): 4572 self.write_file.write(self.write_msg) 4573 self.write_file.flush() 4574 4575 def testCloseAfterMakefile(self): 4576 # The file returned by makefile should keep the socket open. 4577 self.cli_conn.close() 4578 # read until EOF 4579 msg = self.read_file.read() 4580 self.assertEqual(msg, self.read_msg) 4581 4582 def _testCloseAfterMakefile(self): 4583 self.write_file.write(self.write_msg) 4584 self.write_file.flush() 4585 4586 def testMakefileAfterMakefileClose(self): 4587 self.read_file.close() 4588 msg = self.cli_conn.recv(len(MSG)) 4589 if isinstance(self.read_msg, str): 4590 msg = msg.decode() 4591 self.assertEqual(msg, self.read_msg) 4592 4593 def _testMakefileAfterMakefileClose(self): 4594 self.write_file.write(self.write_msg) 4595 self.write_file.flush() 4596 4597 def testClosedAttr(self): 4598 self.assertTrue(not self.read_file.closed) 4599 4600 def _testClosedAttr(self): 4601 self.assertTrue(not self.write_file.closed) 4602 4603 def testAttributes(self): 4604 self.assertEqual(self.read_file.mode, self.read_mode) 4605 self.assertEqual(self.read_file.name, self.cli_conn.fileno()) 4606 4607 def _testAttributes(self): 4608 self.assertEqual(self.write_file.mode, self.write_mode) 4609 self.assertEqual(self.write_file.name, self.serv_conn.fileno()) 4610 4611 def testRealClose(self): 4612 self.read_file.close() 4613 self.assertRaises(ValueError, self.read_file.fileno) 4614 self.cli_conn.close() 4615 self.assertRaises(OSError, self.cli_conn.getsockname) 4616 4617 def _testRealClose(self): 4618 pass 4619 4620 4621class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 4622 4623 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 4624 4625 In this case (and in this case only), it should be possible to 4626 create a file object, read a line from it, create another file 4627 object, read another line from it, without loss of data in the 4628 first file object's buffer. Note that http.client relies on this 4629 when reading multiple requests from the same socket.""" 4630 4631 bufsize = 0 # Use unbuffered mode 4632 4633 def testUnbufferedReadline(self): 4634 # Read a line, create a new file object, read another line with it 4635 line = self.read_file.readline() # first line 4636 self.assertEqual(line, b"A. " + self.write_msg) # first line 4637 self.read_file = self.cli_conn.makefile('rb', 0) 4638 line = self.read_file.readline() # second line 4639 self.assertEqual(line, b"B. " + self.write_msg) # second line 4640 4641 def _testUnbufferedReadline(self): 4642 self.write_file.write(b"A. " + self.write_msg) 4643 self.write_file.write(b"B. " + self.write_msg) 4644 self.write_file.flush() 4645 4646 def testMakefileClose(self): 4647 # The file returned by makefile should keep the socket open... 4648 self.cli_conn.close() 4649 msg = self.cli_conn.recv(1024) 4650 self.assertEqual(msg, self.read_msg) 4651 # ...until the file is itself closed 4652 self.read_file.close() 4653 self.assertRaises(OSError, self.cli_conn.recv, 1024) 4654 4655 def _testMakefileClose(self): 4656 self.write_file.write(self.write_msg) 4657 self.write_file.flush() 4658 4659 def testMakefileCloseSocketDestroy(self): 4660 refcount_before = sys.getrefcount(self.cli_conn) 4661 self.read_file.close() 4662 refcount_after = sys.getrefcount(self.cli_conn) 4663 self.assertEqual(refcount_before - 1, refcount_after) 4664 4665 def _testMakefileCloseSocketDestroy(self): 4666 pass 4667 4668 # Non-blocking ops 4669 # NOTE: to set `read_file` as non-blocking, we must call 4670 # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). 4671 4672 def testSmallReadNonBlocking(self): 4673 self.cli_conn.setblocking(False) 4674 self.assertEqual(self.read_file.readinto(bytearray(10)), None) 4675 self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) 4676 self.evt1.set() 4677 self.evt2.wait(1.0) 4678 first_seg = self.read_file.read(len(self.read_msg) - 3) 4679 if first_seg is None: 4680 # Data not arrived (can happen under Windows), wait a bit 4681 time.sleep(0.5) 4682 first_seg = self.read_file.read(len(self.read_msg) - 3) 4683 buf = bytearray(10) 4684 n = self.read_file.readinto(buf) 4685 self.assertEqual(n, 3) 4686 msg = first_seg + buf[:n] 4687 self.assertEqual(msg, self.read_msg) 4688 self.assertEqual(self.read_file.readinto(bytearray(16)), None) 4689 self.assertEqual(self.read_file.read(1), None) 4690 4691 def _testSmallReadNonBlocking(self): 4692 self.evt1.wait(1.0) 4693 self.write_file.write(self.write_msg) 4694 self.write_file.flush() 4695 self.evt2.set() 4696 # Avoid closing the socket before the server test has finished, 4697 # otherwise system recv() will return 0 instead of EWOULDBLOCK. 4698 self.serv_finished.wait(5.0) 4699 4700 def testWriteNonBlocking(self): 4701 self.cli_finished.wait(5.0) 4702 # The client thread can't skip directly - the SkipTest exception 4703 # would appear as a failure. 4704 if self.serv_skipped: 4705 self.skipTest(self.serv_skipped) 4706 4707 def _testWriteNonBlocking(self): 4708 self.serv_skipped = None 4709 self.serv_conn.setblocking(False) 4710 # Try to saturate the socket buffer pipe with repeated large writes. 4711 BIG = b"x" * support.SOCK_MAX_SIZE 4712 LIMIT = 10 4713 # The first write() succeeds since a chunk of data can be buffered 4714 n = self.write_file.write(BIG) 4715 self.assertGreater(n, 0) 4716 for i in range(LIMIT): 4717 n = self.write_file.write(BIG) 4718 if n is None: 4719 # Succeeded 4720 break 4721 self.assertGreater(n, 0) 4722 else: 4723 # Let us know that this test didn't manage to establish 4724 # the expected conditions. This is not a failure in itself but, 4725 # if it happens repeatedly, the test should be fixed. 4726 self.serv_skipped = "failed to saturate the socket buffer" 4727 4728 4729class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 4730 4731 bufsize = 1 # Default-buffered for reading; line-buffered for writing 4732 4733 4734class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 4735 4736 bufsize = 2 # Exercise the buffering code 4737 4738 4739class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): 4740 """Tests for socket.makefile() in text mode (rather than binary)""" 4741 4742 read_mode = 'r' 4743 read_msg = MSG.decode('utf-8') 4744 write_mode = 'wb' 4745 write_msg = MSG 4746 newline = '' 4747 4748 4749class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): 4750 """Tests for socket.makefile() in text mode (rather than binary)""" 4751 4752 read_mode = 'rb' 4753 read_msg = MSG 4754 write_mode = 'w' 4755 write_msg = MSG.decode('utf-8') 4756 newline = '' 4757 4758 4759class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): 4760 """Tests for socket.makefile() in text mode (rather than binary)""" 4761 4762 read_mode = 'r' 4763 read_msg = MSG.decode('utf-8') 4764 write_mode = 'w' 4765 write_msg = MSG.decode('utf-8') 4766 newline = '' 4767 4768 4769class NetworkConnectionTest(object): 4770 """Prove network connection.""" 4771 4772 def clientSetUp(self): 4773 # We're inherited below by BasicTCPTest2, which also inherits 4774 # BasicTCPTest, which defines self.port referenced below. 4775 self.cli = socket.create_connection((HOST, self.port)) 4776 self.serv_conn = self.cli 4777 4778class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 4779 """Tests that NetworkConnection does not break existing TCP functionality. 4780 """ 4781 4782class NetworkConnectionNoServer(unittest.TestCase): 4783 4784 class MockSocket(socket.socket): 4785 def connect(self, *args): 4786 raise socket.timeout('timed out') 4787 4788 @contextlib.contextmanager 4789 def mocked_socket_module(self): 4790 """Return a socket which times out on connect""" 4791 old_socket = socket.socket 4792 socket.socket = self.MockSocket 4793 try: 4794 yield 4795 finally: 4796 socket.socket = old_socket 4797 4798 def test_connect(self): 4799 port = support.find_unused_port() 4800 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4801 self.addCleanup(cli.close) 4802 with self.assertRaises(OSError) as cm: 4803 cli.connect((HOST, port)) 4804 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 4805 4806 def test_create_connection(self): 4807 # Issue #9792: errors raised by create_connection() should have 4808 # a proper errno attribute. 4809 port = support.find_unused_port() 4810 with self.assertRaises(OSError) as cm: 4811 socket.create_connection((HOST, port)) 4812 4813 # Issue #16257: create_connection() calls getaddrinfo() against 4814 # 'localhost'. This may result in an IPV6 addr being returned 4815 # as well as an IPV4 one: 4816 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 4817 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 4818 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 4819 # 4820 # create_connection() enumerates through all the addresses returned 4821 # and if it doesn't successfully bind to any of them, it propagates 4822 # the last exception it encountered. 4823 # 4824 # On Solaris, ENETUNREACH is returned in this circumstance instead 4825 # of ECONNREFUSED. So, if that errno exists, add it to our list of 4826 # expected errnos. 4827 expected_errnos = support.get_socket_conn_refused_errs() 4828 self.assertIn(cm.exception.errno, expected_errnos) 4829 4830 def test_create_connection_timeout(self): 4831 # Issue #9792: create_connection() should not recast timeout errors 4832 # as generic socket errors. 4833 with self.mocked_socket_module(): 4834 try: 4835 socket.create_connection((HOST, 1234)) 4836 except socket.timeout: 4837 pass 4838 except OSError as exc: 4839 if support.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: 4840 raise 4841 else: 4842 self.fail('socket.timeout not raised') 4843 4844 4845class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 4846 4847 def __init__(self, methodName='runTest'): 4848 SocketTCPTest.__init__(self, methodName=methodName) 4849 ThreadableTest.__init__(self) 4850 4851 def clientSetUp(self): 4852 self.source_port = support.find_unused_port() 4853 4854 def clientTearDown(self): 4855 self.cli.close() 4856 self.cli = None 4857 ThreadableTest.clientTearDown(self) 4858 4859 def _justAccept(self): 4860 conn, addr = self.serv.accept() 4861 conn.close() 4862 4863 testFamily = _justAccept 4864 def _testFamily(self): 4865 self.cli = socket.create_connection((HOST, self.port), timeout=30) 4866 self.addCleanup(self.cli.close) 4867 self.assertEqual(self.cli.family, 2) 4868 4869 testSourceAddress = _justAccept 4870 def _testSourceAddress(self): 4871 self.cli = socket.create_connection((HOST, self.port), timeout=30, 4872 source_address=('', self.source_port)) 4873 self.addCleanup(self.cli.close) 4874 self.assertEqual(self.cli.getsockname()[1], self.source_port) 4875 # The port number being used is sufficient to show that the bind() 4876 # call happened. 4877 4878 testTimeoutDefault = _justAccept 4879 def _testTimeoutDefault(self): 4880 # passing no explicit timeout uses socket's global default 4881 self.assertTrue(socket.getdefaulttimeout() is None) 4882 socket.setdefaulttimeout(42) 4883 try: 4884 self.cli = socket.create_connection((HOST, self.port)) 4885 self.addCleanup(self.cli.close) 4886 finally: 4887 socket.setdefaulttimeout(None) 4888 self.assertEqual(self.cli.gettimeout(), 42) 4889 4890 testTimeoutNone = _justAccept 4891 def _testTimeoutNone(self): 4892 # None timeout means the same as sock.settimeout(None) 4893 self.assertTrue(socket.getdefaulttimeout() is None) 4894 socket.setdefaulttimeout(30) 4895 try: 4896 self.cli = socket.create_connection((HOST, self.port), timeout=None) 4897 self.addCleanup(self.cli.close) 4898 finally: 4899 socket.setdefaulttimeout(None) 4900 self.assertEqual(self.cli.gettimeout(), None) 4901 4902 testTimeoutValueNamed = _justAccept 4903 def _testTimeoutValueNamed(self): 4904 self.cli = socket.create_connection((HOST, self.port), timeout=30) 4905 self.assertEqual(self.cli.gettimeout(), 30) 4906 4907 testTimeoutValueNonamed = _justAccept 4908 def _testTimeoutValueNonamed(self): 4909 self.cli = socket.create_connection((HOST, self.port), 30) 4910 self.addCleanup(self.cli.close) 4911 self.assertEqual(self.cli.gettimeout(), 30) 4912 4913 4914class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 4915 4916 def __init__(self, methodName='runTest'): 4917 SocketTCPTest.__init__(self, methodName=methodName) 4918 ThreadableTest.__init__(self) 4919 4920 def clientSetUp(self): 4921 pass 4922 4923 def clientTearDown(self): 4924 self.cli.close() 4925 self.cli = None 4926 ThreadableTest.clientTearDown(self) 4927 4928 def testInsideTimeout(self): 4929 conn, addr = self.serv.accept() 4930 self.addCleanup(conn.close) 4931 time.sleep(3) 4932 conn.send(b"done!") 4933 testOutsideTimeout = testInsideTimeout 4934 4935 def _testInsideTimeout(self): 4936 self.cli = sock = socket.create_connection((HOST, self.port)) 4937 data = sock.recv(5) 4938 self.assertEqual(data, b"done!") 4939 4940 def _testOutsideTimeout(self): 4941 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 4942 self.assertRaises(socket.timeout, lambda: sock.recv(5)) 4943 4944 4945class TCPTimeoutTest(SocketTCPTest): 4946 4947 def testTCPTimeout(self): 4948 def raise_timeout(*args, **kwargs): 4949 self.serv.settimeout(1.0) 4950 self.serv.accept() 4951 self.assertRaises(socket.timeout, raise_timeout, 4952 "Error generating a timeout exception (TCP)") 4953 4954 def testTimeoutZero(self): 4955 ok = False 4956 try: 4957 self.serv.settimeout(0.0) 4958 foo = self.serv.accept() 4959 except socket.timeout: 4960 self.fail("caught timeout instead of error (TCP)") 4961 except OSError: 4962 ok = True 4963 except: 4964 self.fail("caught unexpected exception (TCP)") 4965 if not ok: 4966 self.fail("accept() returned success when we did not expect it") 4967 4968 @unittest.skipUnless(hasattr(signal, 'alarm'), 4969 'test needs signal.alarm()') 4970 def testInterruptedTimeout(self): 4971 # XXX I don't know how to do this test on MSWindows or any other 4972 # platform that doesn't support signal.alarm() or os.kill(), though 4973 # the bug should have existed on all platforms. 4974 self.serv.settimeout(5.0) # must be longer than alarm 4975 class Alarm(Exception): 4976 pass 4977 def alarm_handler(signal, frame): 4978 raise Alarm 4979 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 4980 try: 4981 try: 4982 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 4983 foo = self.serv.accept() 4984 except socket.timeout: 4985 self.fail("caught timeout instead of Alarm") 4986 except Alarm: 4987 pass 4988 except: 4989 self.fail("caught other exception instead of Alarm:" 4990 " %s(%s):\n%s" % 4991 (sys.exc_info()[:2] + (traceback.format_exc(),))) 4992 else: 4993 self.fail("nothing caught") 4994 finally: 4995 signal.alarm(0) # shut off alarm 4996 except Alarm: 4997 self.fail("got Alarm in wrong place") 4998 finally: 4999 # no alarm can be pending. Safe to restore old handler. 5000 signal.signal(signal.SIGALRM, old_alarm) 5001 5002class UDPTimeoutTest(SocketUDPTest): 5003 5004 def testUDPTimeout(self): 5005 def raise_timeout(*args, **kwargs): 5006 self.serv.settimeout(1.0) 5007 self.serv.recv(1024) 5008 self.assertRaises(socket.timeout, raise_timeout, 5009 "Error generating a timeout exception (UDP)") 5010 5011 def testTimeoutZero(self): 5012 ok = False 5013 try: 5014 self.serv.settimeout(0.0) 5015 foo = self.serv.recv(1024) 5016 except socket.timeout: 5017 self.fail("caught timeout instead of error (UDP)") 5018 except OSError: 5019 ok = True 5020 except: 5021 self.fail("caught unexpected exception (UDP)") 5022 if not ok: 5023 self.fail("recv() returned success when we did not expect it") 5024 5025class TestExceptions(unittest.TestCase): 5026 5027 def testExceptionTree(self): 5028 self.assertTrue(issubclass(OSError, Exception)) 5029 self.assertTrue(issubclass(socket.herror, OSError)) 5030 self.assertTrue(issubclass(socket.gaierror, OSError)) 5031 self.assertTrue(issubclass(socket.timeout, OSError)) 5032 5033 def test_setblocking_invalidfd(self): 5034 # Regression test for issue #28471 5035 5036 sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 5037 sock = socket.socket( 5038 socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) 5039 sock0.close() 5040 self.addCleanup(sock.detach) 5041 5042 with self.assertRaises(OSError): 5043 sock.setblocking(False) 5044 5045 5046@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 5047class TestLinuxAbstractNamespace(unittest.TestCase): 5048 5049 UNIX_PATH_MAX = 108 5050 5051 def testLinuxAbstractNamespace(self): 5052 address = b"\x00python-test-hello\x00\xff" 5053 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 5054 s1.bind(address) 5055 s1.listen() 5056 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 5057 s2.connect(s1.getsockname()) 5058 with s1.accept()[0] as s3: 5059 self.assertEqual(s1.getsockname(), address) 5060 self.assertEqual(s2.getpeername(), address) 5061 5062 def testMaxName(self): 5063 address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) 5064 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5065 s.bind(address) 5066 self.assertEqual(s.getsockname(), address) 5067 5068 def testNameOverflow(self): 5069 address = "\x00" + "h" * self.UNIX_PATH_MAX 5070 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5071 self.assertRaises(OSError, s.bind, address) 5072 5073 def testStrName(self): 5074 # Check that an abstract name can be passed as a string. 5075 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5076 try: 5077 s.bind("\x00python\x00test\x00") 5078 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5079 finally: 5080 s.close() 5081 5082 def testBytearrayName(self): 5083 # Check that an abstract name can be passed as a bytearray. 5084 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5085 s.bind(bytearray(b"\x00python\x00test\x00")) 5086 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5087 5088@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') 5089class TestUnixDomain(unittest.TestCase): 5090 5091 def setUp(self): 5092 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5093 5094 def tearDown(self): 5095 self.sock.close() 5096 5097 def encoded(self, path): 5098 # Return the given path encoded in the file system encoding, 5099 # or skip the test if this is not possible. 5100 try: 5101 return os.fsencode(path) 5102 except UnicodeEncodeError: 5103 self.skipTest( 5104 "Pathname {0!a} cannot be represented in file " 5105 "system encoding {1!r}".format( 5106 path, sys.getfilesystemencoding())) 5107 5108 def bind(self, sock, path): 5109 # Bind the socket 5110 try: 5111 support.bind_unix_socket(sock, path) 5112 except OSError as e: 5113 if str(e) == "AF_UNIX path too long": 5114 self.skipTest( 5115 "Pathname {0!a} is too long to serve as an AF_UNIX path" 5116 .format(path)) 5117 else: 5118 raise 5119 5120 def testUnbound(self): 5121 # Issue #30205 (note getsockname() can return None on OS X) 5122 self.assertIn(self.sock.getsockname(), ('', None)) 5123 5124 def testStrAddr(self): 5125 # Test binding to and retrieving a normal string pathname. 5126 path = os.path.abspath(support.TESTFN) 5127 self.bind(self.sock, path) 5128 self.addCleanup(support.unlink, path) 5129 self.assertEqual(self.sock.getsockname(), path) 5130 5131 def testBytesAddr(self): 5132 # Test binding to a bytes pathname. 5133 path = os.path.abspath(support.TESTFN) 5134 self.bind(self.sock, self.encoded(path)) 5135 self.addCleanup(support.unlink, path) 5136 self.assertEqual(self.sock.getsockname(), path) 5137 5138 def testSurrogateescapeBind(self): 5139 # Test binding to a valid non-ASCII pathname, with the 5140 # non-ASCII bytes supplied using surrogateescape encoding. 5141 path = os.path.abspath(support.TESTFN_UNICODE) 5142 b = self.encoded(path) 5143 self.bind(self.sock, b.decode("ascii", "surrogateescape")) 5144 self.addCleanup(support.unlink, path) 5145 self.assertEqual(self.sock.getsockname(), path) 5146 5147 def testUnencodableAddr(self): 5148 # Test binding to a pathname that cannot be encoded in the 5149 # file system encoding. 5150 if support.TESTFN_UNENCODABLE is None: 5151 self.skipTest("No unencodable filename available") 5152 path = os.path.abspath(support.TESTFN_UNENCODABLE) 5153 self.bind(self.sock, path) 5154 self.addCleanup(support.unlink, path) 5155 self.assertEqual(self.sock.getsockname(), path) 5156 5157 5158class BufferIOTest(SocketConnectedTest): 5159 """ 5160 Test the buffer versions of socket.recv() and socket.send(). 5161 """ 5162 def __init__(self, methodName='runTest'): 5163 SocketConnectedTest.__init__(self, methodName=methodName) 5164 5165 def testRecvIntoArray(self): 5166 buf = array.array("B", [0] * len(MSG)) 5167 nbytes = self.cli_conn.recv_into(buf) 5168 self.assertEqual(nbytes, len(MSG)) 5169 buf = buf.tobytes() 5170 msg = buf[:len(MSG)] 5171 self.assertEqual(msg, MSG) 5172 5173 def _testRecvIntoArray(self): 5174 buf = bytes(MSG) 5175 self.serv_conn.send(buf) 5176 5177 def testRecvIntoBytearray(self): 5178 buf = bytearray(1024) 5179 nbytes = self.cli_conn.recv_into(buf) 5180 self.assertEqual(nbytes, len(MSG)) 5181 msg = buf[:len(MSG)] 5182 self.assertEqual(msg, MSG) 5183 5184 _testRecvIntoBytearray = _testRecvIntoArray 5185 5186 def testRecvIntoMemoryview(self): 5187 buf = bytearray(1024) 5188 nbytes = self.cli_conn.recv_into(memoryview(buf)) 5189 self.assertEqual(nbytes, len(MSG)) 5190 msg = buf[:len(MSG)] 5191 self.assertEqual(msg, MSG) 5192 5193 _testRecvIntoMemoryview = _testRecvIntoArray 5194 5195 def testRecvFromIntoArray(self): 5196 buf = array.array("B", [0] * len(MSG)) 5197 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5198 self.assertEqual(nbytes, len(MSG)) 5199 buf = buf.tobytes() 5200 msg = buf[:len(MSG)] 5201 self.assertEqual(msg, MSG) 5202 5203 def _testRecvFromIntoArray(self): 5204 buf = bytes(MSG) 5205 self.serv_conn.send(buf) 5206 5207 def testRecvFromIntoBytearray(self): 5208 buf = bytearray(1024) 5209 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5210 self.assertEqual(nbytes, len(MSG)) 5211 msg = buf[:len(MSG)] 5212 self.assertEqual(msg, MSG) 5213 5214 _testRecvFromIntoBytearray = _testRecvFromIntoArray 5215 5216 def testRecvFromIntoMemoryview(self): 5217 buf = bytearray(1024) 5218 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 5219 self.assertEqual(nbytes, len(MSG)) 5220 msg = buf[:len(MSG)] 5221 self.assertEqual(msg, MSG) 5222 5223 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 5224 5225 def testRecvFromIntoSmallBuffer(self): 5226 # See issue #20246. 5227 buf = bytearray(8) 5228 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 5229 5230 def _testRecvFromIntoSmallBuffer(self): 5231 self.serv_conn.send(MSG) 5232 5233 def testRecvFromIntoEmptyBuffer(self): 5234 buf = bytearray() 5235 self.cli_conn.recvfrom_into(buf) 5236 self.cli_conn.recvfrom_into(buf, 0) 5237 5238 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 5239 5240 5241TIPC_STYPE = 2000 5242TIPC_LOWER = 200 5243TIPC_UPPER = 210 5244 5245def isTipcAvailable(): 5246 """Check if the TIPC module is loaded 5247 5248 The TIPC module is not loaded automatically on Ubuntu and probably 5249 other Linux distros. 5250 """ 5251 if not hasattr(socket, "AF_TIPC"): 5252 return False 5253 try: 5254 f = open("/proc/modules") 5255 except (FileNotFoundError, IsADirectoryError, PermissionError): 5256 # It's ok if the file does not exist, is a directory or if we 5257 # have not the permission to read it. 5258 return False 5259 with f: 5260 for line in f: 5261 if line.startswith("tipc "): 5262 return True 5263 return False 5264 5265@unittest.skipUnless(isTipcAvailable(), 5266 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5267class TIPCTest(unittest.TestCase): 5268 def testRDM(self): 5269 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5270 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5271 self.addCleanup(srv.close) 5272 self.addCleanup(cli.close) 5273 5274 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5275 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5276 TIPC_LOWER, TIPC_UPPER) 5277 srv.bind(srvaddr) 5278 5279 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5280 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5281 cli.sendto(MSG, sendaddr) 5282 5283 msg, recvaddr = srv.recvfrom(1024) 5284 5285 self.assertEqual(cli.getsockname(), recvaddr) 5286 self.assertEqual(msg, MSG) 5287 5288 5289@unittest.skipUnless(isTipcAvailable(), 5290 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5291class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 5292 def __init__(self, methodName = 'runTest'): 5293 unittest.TestCase.__init__(self, methodName = methodName) 5294 ThreadableTest.__init__(self) 5295 5296 def setUp(self): 5297 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5298 self.addCleanup(self.srv.close) 5299 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5300 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5301 TIPC_LOWER, TIPC_UPPER) 5302 self.srv.bind(srvaddr) 5303 self.srv.listen() 5304 self.serverExplicitReady() 5305 self.conn, self.connaddr = self.srv.accept() 5306 self.addCleanup(self.conn.close) 5307 5308 def clientSetUp(self): 5309 # There is a hittable race between serverExplicitReady() and the 5310 # accept() call; sleep a little while to avoid it, otherwise 5311 # we could get an exception 5312 time.sleep(0.1) 5313 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5314 self.addCleanup(self.cli.close) 5315 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5316 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5317 self.cli.connect(addr) 5318 self.cliaddr = self.cli.getsockname() 5319 5320 def testStream(self): 5321 msg = self.conn.recv(1024) 5322 self.assertEqual(msg, MSG) 5323 self.assertEqual(self.cliaddr, self.connaddr) 5324 5325 def _testStream(self): 5326 self.cli.send(MSG) 5327 self.cli.close() 5328 5329 5330class ContextManagersTest(ThreadedTCPSocketTest): 5331 5332 def _testSocketClass(self): 5333 # base test 5334 with socket.socket() as sock: 5335 self.assertFalse(sock._closed) 5336 self.assertTrue(sock._closed) 5337 # close inside with block 5338 with socket.socket() as sock: 5339 sock.close() 5340 self.assertTrue(sock._closed) 5341 # exception inside with block 5342 with socket.socket() as sock: 5343 self.assertRaises(OSError, sock.sendall, b'foo') 5344 self.assertTrue(sock._closed) 5345 5346 def testCreateConnectionBase(self): 5347 conn, addr = self.serv.accept() 5348 self.addCleanup(conn.close) 5349 data = conn.recv(1024) 5350 conn.sendall(data) 5351 5352 def _testCreateConnectionBase(self): 5353 address = self.serv.getsockname() 5354 with socket.create_connection(address) as sock: 5355 self.assertFalse(sock._closed) 5356 sock.sendall(b'foo') 5357 self.assertEqual(sock.recv(1024), b'foo') 5358 self.assertTrue(sock._closed) 5359 5360 def testCreateConnectionClose(self): 5361 conn, addr = self.serv.accept() 5362 self.addCleanup(conn.close) 5363 data = conn.recv(1024) 5364 conn.sendall(data) 5365 5366 def _testCreateConnectionClose(self): 5367 address = self.serv.getsockname() 5368 with socket.create_connection(address) as sock: 5369 sock.close() 5370 self.assertTrue(sock._closed) 5371 self.assertRaises(OSError, sock.sendall, b'foo') 5372 5373 5374class InheritanceTest(unittest.TestCase): 5375 @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), 5376 "SOCK_CLOEXEC not defined") 5377 @support.requires_linux_version(2, 6, 28) 5378 def test_SOCK_CLOEXEC(self): 5379 with socket.socket(socket.AF_INET, 5380 socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: 5381 self.assertEqual(s.type, socket.SOCK_STREAM) 5382 self.assertFalse(s.get_inheritable()) 5383 5384 def test_default_inheritable(self): 5385 sock = socket.socket() 5386 with sock: 5387 self.assertEqual(sock.get_inheritable(), False) 5388 5389 def test_dup(self): 5390 sock = socket.socket() 5391 with sock: 5392 newsock = sock.dup() 5393 sock.close() 5394 with newsock: 5395 self.assertEqual(newsock.get_inheritable(), False) 5396 5397 def test_set_inheritable(self): 5398 sock = socket.socket() 5399 with sock: 5400 sock.set_inheritable(True) 5401 self.assertEqual(sock.get_inheritable(), True) 5402 5403 sock.set_inheritable(False) 5404 self.assertEqual(sock.get_inheritable(), False) 5405 5406 @unittest.skipIf(fcntl is None, "need fcntl") 5407 def test_get_inheritable_cloexec(self): 5408 sock = socket.socket() 5409 with sock: 5410 fd = sock.fileno() 5411 self.assertEqual(sock.get_inheritable(), False) 5412 5413 # clear FD_CLOEXEC flag 5414 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 5415 flags &= ~fcntl.FD_CLOEXEC 5416 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 5417 5418 self.assertEqual(sock.get_inheritable(), True) 5419 5420 @unittest.skipIf(fcntl is None, "need fcntl") 5421 def test_set_inheritable_cloexec(self): 5422 sock = socket.socket() 5423 with sock: 5424 fd = sock.fileno() 5425 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5426 fcntl.FD_CLOEXEC) 5427 5428 sock.set_inheritable(True) 5429 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5430 0) 5431 5432 5433 def test_socketpair(self): 5434 s1, s2 = socket.socketpair() 5435 self.addCleanup(s1.close) 5436 self.addCleanup(s2.close) 5437 self.assertEqual(s1.get_inheritable(), False) 5438 self.assertEqual(s2.get_inheritable(), False) 5439 5440 5441@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), 5442 "SOCK_NONBLOCK not defined") 5443class NonblockConstantTest(unittest.TestCase): 5444 def checkNonblock(self, s, nonblock=True, timeout=0.0): 5445 if nonblock: 5446 self.assertEqual(s.type, socket.SOCK_STREAM) 5447 self.assertEqual(s.gettimeout(), timeout) 5448 self.assertTrue( 5449 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5450 if timeout == 0: 5451 # timeout == 0: means that getblocking() must be False. 5452 self.assertFalse(s.getblocking()) 5453 else: 5454 # If timeout > 0, the socket will be in a "blocking" mode 5455 # from the standpoint of the Python API. For Python socket 5456 # object, "blocking" means that operations like 'sock.recv()' 5457 # will block. Internally, file descriptors for 5458 # "blocking" Python sockets *with timeouts* are in a 5459 # *non-blocking* mode, and 'sock.recv()' uses 'select()' 5460 # and handles EWOULDBLOCK/EAGAIN to enforce the timeout. 5461 self.assertTrue(s.getblocking()) 5462 else: 5463 self.assertEqual(s.type, socket.SOCK_STREAM) 5464 self.assertEqual(s.gettimeout(), None) 5465 self.assertFalse( 5466 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5467 self.assertTrue(s.getblocking()) 5468 5469 @support.requires_linux_version(2, 6, 28) 5470 def test_SOCK_NONBLOCK(self): 5471 # a lot of it seems silly and redundant, but I wanted to test that 5472 # changing back and forth worked ok 5473 with socket.socket(socket.AF_INET, 5474 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: 5475 self.checkNonblock(s) 5476 s.setblocking(1) 5477 self.checkNonblock(s, nonblock=False) 5478 s.setblocking(0) 5479 self.checkNonblock(s) 5480 s.settimeout(None) 5481 self.checkNonblock(s, nonblock=False) 5482 s.settimeout(2.0) 5483 self.checkNonblock(s, timeout=2.0) 5484 s.setblocking(1) 5485 self.checkNonblock(s, nonblock=False) 5486 # defaulttimeout 5487 t = socket.getdefaulttimeout() 5488 socket.setdefaulttimeout(0.0) 5489 with socket.socket() as s: 5490 self.checkNonblock(s) 5491 socket.setdefaulttimeout(None) 5492 with socket.socket() as s: 5493 self.checkNonblock(s, False) 5494 socket.setdefaulttimeout(2.0) 5495 with socket.socket() as s: 5496 self.checkNonblock(s, timeout=2.0) 5497 socket.setdefaulttimeout(None) 5498 with socket.socket() as s: 5499 self.checkNonblock(s, False) 5500 socket.setdefaulttimeout(t) 5501 5502 5503@unittest.skipUnless(os.name == "nt", "Windows specific") 5504@unittest.skipUnless(multiprocessing, "need multiprocessing") 5505class TestSocketSharing(SocketTCPTest): 5506 # This must be classmethod and not staticmethod or multiprocessing 5507 # won't be able to bootstrap it. 5508 @classmethod 5509 def remoteProcessServer(cls, q): 5510 # Recreate socket from shared data 5511 sdata = q.get() 5512 message = q.get() 5513 5514 s = socket.fromshare(sdata) 5515 s2, c = s.accept() 5516 5517 # Send the message 5518 s2.sendall(message) 5519 s2.close() 5520 s.close() 5521 5522 def testShare(self): 5523 # Transfer the listening server socket to another process 5524 # and service it from there. 5525 5526 # Create process: 5527 q = multiprocessing.Queue() 5528 p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) 5529 p.start() 5530 5531 # Get the shared socket data 5532 data = self.serv.share(p.pid) 5533 5534 # Pass the shared socket to the other process 5535 addr = self.serv.getsockname() 5536 self.serv.close() 5537 q.put(data) 5538 5539 # The data that the server will send us 5540 message = b"slapmahfro" 5541 q.put(message) 5542 5543 # Connect 5544 s = socket.create_connection(addr) 5545 # listen for the data 5546 m = [] 5547 while True: 5548 data = s.recv(100) 5549 if not data: 5550 break 5551 m.append(data) 5552 s.close() 5553 received = b"".join(m) 5554 self.assertEqual(received, message) 5555 p.join() 5556 5557 def testShareLength(self): 5558 data = self.serv.share(os.getpid()) 5559 self.assertRaises(ValueError, socket.fromshare, data[:-1]) 5560 self.assertRaises(ValueError, socket.fromshare, data+b"foo") 5561 5562 def compareSockets(self, org, other): 5563 # socket sharing is expected to work only for blocking socket 5564 # since the internal python timeout value isn't transferred. 5565 self.assertEqual(org.gettimeout(), None) 5566 self.assertEqual(org.gettimeout(), other.gettimeout()) 5567 5568 self.assertEqual(org.family, other.family) 5569 self.assertEqual(org.type, other.type) 5570 # If the user specified "0" for proto, then 5571 # internally windows will have picked the correct value. 5572 # Python introspection on the socket however will still return 5573 # 0. For the shared socket, the python value is recreated 5574 # from the actual value, so it may not compare correctly. 5575 if org.proto != 0: 5576 self.assertEqual(org.proto, other.proto) 5577 5578 def testShareLocal(self): 5579 data = self.serv.share(os.getpid()) 5580 s = socket.fromshare(data) 5581 try: 5582 self.compareSockets(self.serv, s) 5583 finally: 5584 s.close() 5585 5586 def testTypes(self): 5587 families = [socket.AF_INET, socket.AF_INET6] 5588 types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] 5589 for f in families: 5590 for t in types: 5591 try: 5592 source = socket.socket(f, t) 5593 except OSError: 5594 continue # This combination is not supported 5595 try: 5596 data = source.share(os.getpid()) 5597 shared = socket.fromshare(data) 5598 try: 5599 self.compareSockets(source, shared) 5600 finally: 5601 shared.close() 5602 finally: 5603 source.close() 5604 5605 5606class SendfileUsingSendTest(ThreadedTCPSocketTest): 5607 """ 5608 Test the send() implementation of socket.sendfile(). 5609 """ 5610 5611 FILESIZE = (10 * 1024 * 1024) # 10 MiB 5612 BUFSIZE = 8192 5613 FILEDATA = b"" 5614 TIMEOUT = 2 5615 5616 @classmethod 5617 def setUpClass(cls): 5618 def chunks(total, step): 5619 assert total >= step 5620 while total > step: 5621 yield step 5622 total -= step 5623 if total: 5624 yield total 5625 5626 chunk = b"".join([random.choice(string.ascii_letters).encode() 5627 for i in range(cls.BUFSIZE)]) 5628 with open(support.TESTFN, 'wb') as f: 5629 for csize in chunks(cls.FILESIZE, cls.BUFSIZE): 5630 f.write(chunk) 5631 with open(support.TESTFN, 'rb') as f: 5632 cls.FILEDATA = f.read() 5633 assert len(cls.FILEDATA) == cls.FILESIZE 5634 5635 @classmethod 5636 def tearDownClass(cls): 5637 support.unlink(support.TESTFN) 5638 5639 def accept_conn(self): 5640 self.serv.settimeout(MAIN_TIMEOUT) 5641 conn, addr = self.serv.accept() 5642 conn.settimeout(self.TIMEOUT) 5643 self.addCleanup(conn.close) 5644 return conn 5645 5646 def recv_data(self, conn): 5647 received = [] 5648 while True: 5649 chunk = conn.recv(self.BUFSIZE) 5650 if not chunk: 5651 break 5652 received.append(chunk) 5653 return b''.join(received) 5654 5655 def meth_from_sock(self, sock): 5656 # Depending on the mixin class being run return either send() 5657 # or sendfile() method implementation. 5658 return getattr(sock, "_sendfile_use_send") 5659 5660 # regular file 5661 5662 def _testRegularFile(self): 5663 address = self.serv.getsockname() 5664 file = open(support.TESTFN, 'rb') 5665 with socket.create_connection(address) as sock, file as file: 5666 meth = self.meth_from_sock(sock) 5667 sent = meth(file) 5668 self.assertEqual(sent, self.FILESIZE) 5669 self.assertEqual(file.tell(), self.FILESIZE) 5670 5671 def testRegularFile(self): 5672 conn = self.accept_conn() 5673 data = self.recv_data(conn) 5674 self.assertEqual(len(data), self.FILESIZE) 5675 self.assertEqual(data, self.FILEDATA) 5676 5677 # non regular file 5678 5679 def _testNonRegularFile(self): 5680 address = self.serv.getsockname() 5681 file = io.BytesIO(self.FILEDATA) 5682 with socket.create_connection(address) as sock, file as file: 5683 sent = sock.sendfile(file) 5684 self.assertEqual(sent, self.FILESIZE) 5685 self.assertEqual(file.tell(), self.FILESIZE) 5686 self.assertRaises(socket._GiveupOnSendfile, 5687 sock._sendfile_use_sendfile, file) 5688 5689 def testNonRegularFile(self): 5690 conn = self.accept_conn() 5691 data = self.recv_data(conn) 5692 self.assertEqual(len(data), self.FILESIZE) 5693 self.assertEqual(data, self.FILEDATA) 5694 5695 # empty file 5696 5697 def _testEmptyFileSend(self): 5698 address = self.serv.getsockname() 5699 filename = support.TESTFN + "2" 5700 with open(filename, 'wb'): 5701 self.addCleanup(support.unlink, filename) 5702 file = open(filename, 'rb') 5703 with socket.create_connection(address) as sock, file as file: 5704 meth = self.meth_from_sock(sock) 5705 sent = meth(file) 5706 self.assertEqual(sent, 0) 5707 self.assertEqual(file.tell(), 0) 5708 5709 def testEmptyFileSend(self): 5710 conn = self.accept_conn() 5711 data = self.recv_data(conn) 5712 self.assertEqual(data, b"") 5713 5714 # offset 5715 5716 def _testOffset(self): 5717 address = self.serv.getsockname() 5718 file = open(support.TESTFN, 'rb') 5719 with socket.create_connection(address) as sock, file as file: 5720 meth = self.meth_from_sock(sock) 5721 sent = meth(file, offset=5000) 5722 self.assertEqual(sent, self.FILESIZE - 5000) 5723 self.assertEqual(file.tell(), self.FILESIZE) 5724 5725 def testOffset(self): 5726 conn = self.accept_conn() 5727 data = self.recv_data(conn) 5728 self.assertEqual(len(data), self.FILESIZE - 5000) 5729 self.assertEqual(data, self.FILEDATA[5000:]) 5730 5731 # count 5732 5733 def _testCount(self): 5734 address = self.serv.getsockname() 5735 file = open(support.TESTFN, 'rb') 5736 with socket.create_connection(address, timeout=2) as sock, file as file: 5737 count = 5000007 5738 meth = self.meth_from_sock(sock) 5739 sent = meth(file, count=count) 5740 self.assertEqual(sent, count) 5741 self.assertEqual(file.tell(), count) 5742 5743 def testCount(self): 5744 count = 5000007 5745 conn = self.accept_conn() 5746 data = self.recv_data(conn) 5747 self.assertEqual(len(data), count) 5748 self.assertEqual(data, self.FILEDATA[:count]) 5749 5750 # count small 5751 5752 def _testCountSmall(self): 5753 address = self.serv.getsockname() 5754 file = open(support.TESTFN, 'rb') 5755 with socket.create_connection(address, timeout=2) as sock, file as file: 5756 count = 1 5757 meth = self.meth_from_sock(sock) 5758 sent = meth(file, count=count) 5759 self.assertEqual(sent, count) 5760 self.assertEqual(file.tell(), count) 5761 5762 def testCountSmall(self): 5763 count = 1 5764 conn = self.accept_conn() 5765 data = self.recv_data(conn) 5766 self.assertEqual(len(data), count) 5767 self.assertEqual(data, self.FILEDATA[:count]) 5768 5769 # count + offset 5770 5771 def _testCountWithOffset(self): 5772 address = self.serv.getsockname() 5773 file = open(support.TESTFN, 'rb') 5774 with socket.create_connection(address, timeout=2) as sock, file as file: 5775 count = 100007 5776 meth = self.meth_from_sock(sock) 5777 sent = meth(file, offset=2007, count=count) 5778 self.assertEqual(sent, count) 5779 self.assertEqual(file.tell(), count + 2007) 5780 5781 def testCountWithOffset(self): 5782 count = 100007 5783 conn = self.accept_conn() 5784 data = self.recv_data(conn) 5785 self.assertEqual(len(data), count) 5786 self.assertEqual(data, self.FILEDATA[2007:count+2007]) 5787 5788 # non blocking sockets are not supposed to work 5789 5790 def _testNonBlocking(self): 5791 address = self.serv.getsockname() 5792 file = open(support.TESTFN, 'rb') 5793 with socket.create_connection(address) as sock, file as file: 5794 sock.setblocking(False) 5795 meth = self.meth_from_sock(sock) 5796 self.assertRaises(ValueError, meth, file) 5797 self.assertRaises(ValueError, sock.sendfile, file) 5798 5799 def testNonBlocking(self): 5800 conn = self.accept_conn() 5801 if conn.recv(8192): 5802 self.fail('was not supposed to receive any data') 5803 5804 # timeout (non-triggered) 5805 5806 def _testWithTimeout(self): 5807 address = self.serv.getsockname() 5808 file = open(support.TESTFN, 'rb') 5809 with socket.create_connection(address, timeout=2) as sock, file as file: 5810 meth = self.meth_from_sock(sock) 5811 sent = meth(file) 5812 self.assertEqual(sent, self.FILESIZE) 5813 5814 def testWithTimeout(self): 5815 conn = self.accept_conn() 5816 data = self.recv_data(conn) 5817 self.assertEqual(len(data), self.FILESIZE) 5818 self.assertEqual(data, self.FILEDATA) 5819 5820 # timeout (triggered) 5821 5822 def _testWithTimeoutTriggeredSend(self): 5823 address = self.serv.getsockname() 5824 with open(support.TESTFN, 'rb') as file: 5825 with socket.create_connection(address) as sock: 5826 sock.settimeout(0.01) 5827 meth = self.meth_from_sock(sock) 5828 self.assertRaises(socket.timeout, meth, file) 5829 5830 def testWithTimeoutTriggeredSend(self): 5831 conn = self.accept_conn() 5832 conn.recv(88192) 5833 5834 # errors 5835 5836 def _test_errors(self): 5837 pass 5838 5839 def test_errors(self): 5840 with open(support.TESTFN, 'rb') as file: 5841 with socket.socket(type=socket.SOCK_DGRAM) as s: 5842 meth = self.meth_from_sock(s) 5843 self.assertRaisesRegex( 5844 ValueError, "SOCK_STREAM", meth, file) 5845 with open(support.TESTFN, 'rt') as file: 5846 with socket.socket() as s: 5847 meth = self.meth_from_sock(s) 5848 self.assertRaisesRegex( 5849 ValueError, "binary mode", meth, file) 5850 with open(support.TESTFN, 'rb') as file: 5851 with socket.socket() as s: 5852 meth = self.meth_from_sock(s) 5853 self.assertRaisesRegex(TypeError, "positive integer", 5854 meth, file, count='2') 5855 self.assertRaisesRegex(TypeError, "positive integer", 5856 meth, file, count=0.1) 5857 self.assertRaisesRegex(ValueError, "positive integer", 5858 meth, file, count=0) 5859 self.assertRaisesRegex(ValueError, "positive integer", 5860 meth, file, count=-1) 5861 5862 5863@unittest.skipUnless(hasattr(os, "sendfile"), 5864 'os.sendfile() required for this test.') 5865class SendfileUsingSendfileTest(SendfileUsingSendTest): 5866 """ 5867 Test the sendfile() implementation of socket.sendfile(). 5868 """ 5869 def meth_from_sock(self, sock): 5870 return getattr(sock, "_sendfile_use_sendfile") 5871 5872 5873@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') 5874class LinuxKernelCryptoAPI(unittest.TestCase): 5875 # tests for AF_ALG 5876 def create_alg(self, typ, name): 5877 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 5878 try: 5879 sock.bind((typ, name)) 5880 except FileNotFoundError as e: 5881 # type / algorithm is not available 5882 sock.close() 5883 raise unittest.SkipTest(str(e), typ, name) 5884 else: 5885 return sock 5886 5887 # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY, 5888 # at least on ppc64le architecture 5889 @support.requires_linux_version(4, 5) 5890 def test_sha256(self): 5891 expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" 5892 "177a9cb410ff61f20015ad") 5893 with self.create_alg('hash', 'sha256') as algo: 5894 op, _ = algo.accept() 5895 with op: 5896 op.sendall(b"abc") 5897 self.assertEqual(op.recv(512), expected) 5898 5899 op, _ = algo.accept() 5900 with op: 5901 op.send(b'a', socket.MSG_MORE) 5902 op.send(b'b', socket.MSG_MORE) 5903 op.send(b'c', socket.MSG_MORE) 5904 op.send(b'') 5905 self.assertEqual(op.recv(512), expected) 5906 5907 def test_hmac_sha1(self): 5908 expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") 5909 with self.create_alg('hash', 'hmac(sha1)') as algo: 5910 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") 5911 op, _ = algo.accept() 5912 with op: 5913 op.sendall(b"what do ya want for nothing?") 5914 self.assertEqual(op.recv(512), expected) 5915 5916 # Although it should work with 3.19 and newer the test blocks on 5917 # Ubuntu 15.10 with Kernel 4.2.0-19. 5918 @support.requires_linux_version(4, 3) 5919 def test_aes_cbc(self): 5920 key = bytes.fromhex('06a9214036b8a15b512e03d534120006') 5921 iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') 5922 msg = b"Single block msg" 5923 ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') 5924 msglen = len(msg) 5925 with self.create_alg('skcipher', 'cbc(aes)') as algo: 5926 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 5927 op, _ = algo.accept() 5928 with op: 5929 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 5930 flags=socket.MSG_MORE) 5931 op.sendall(msg) 5932 self.assertEqual(op.recv(msglen), ciphertext) 5933 5934 op, _ = algo.accept() 5935 with op: 5936 op.sendmsg_afalg([ciphertext], 5937 op=socket.ALG_OP_DECRYPT, iv=iv) 5938 self.assertEqual(op.recv(msglen), msg) 5939 5940 # long message 5941 multiplier = 1024 5942 longmsg = [msg] * multiplier 5943 op, _ = algo.accept() 5944 with op: 5945 op.sendmsg_afalg(longmsg, 5946 op=socket.ALG_OP_ENCRYPT, iv=iv) 5947 enc = op.recv(msglen * multiplier) 5948 self.assertEqual(len(enc), msglen * multiplier) 5949 self.assertEqual(enc[:msglen], ciphertext) 5950 5951 op, _ = algo.accept() 5952 with op: 5953 op.sendmsg_afalg([enc], 5954 op=socket.ALG_OP_DECRYPT, iv=iv) 5955 dec = op.recv(msglen * multiplier) 5956 self.assertEqual(len(dec), msglen * multiplier) 5957 self.assertEqual(dec, msg * multiplier) 5958 5959 @support.requires_linux_version(4, 9) # see issue29324 5960 def test_aead_aes_gcm(self): 5961 key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') 5962 iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') 5963 plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') 5964 assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') 5965 expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') 5966 expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') 5967 5968 taglen = len(expected_tag) 5969 assoclen = len(assoc) 5970 5971 with self.create_alg('aead', 'gcm(aes)') as algo: 5972 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 5973 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, 5974 None, taglen) 5975 5976 # send assoc, plain and tag buffer in separate steps 5977 op, _ = algo.accept() 5978 with op: 5979 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 5980 assoclen=assoclen, flags=socket.MSG_MORE) 5981 op.sendall(assoc, socket.MSG_MORE) 5982 op.sendall(plain) 5983 res = op.recv(assoclen + len(plain) + taglen) 5984 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5985 self.assertEqual(expected_tag, res[-taglen:]) 5986 5987 # now with msg 5988 op, _ = algo.accept() 5989 with op: 5990 msg = assoc + plain 5991 op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, 5992 assoclen=assoclen) 5993 res = op.recv(assoclen + len(plain) + taglen) 5994 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5995 self.assertEqual(expected_tag, res[-taglen:]) 5996 5997 # create anc data manually 5998 pack_uint32 = struct.Struct('I').pack 5999 op, _ = algo.accept() 6000 with op: 6001 msg = assoc + plain 6002 op.sendmsg( 6003 [msg], 6004 ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], 6005 [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], 6006 [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], 6007 ) 6008 ) 6009 res = op.recv(len(msg) + taglen) 6010 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6011 self.assertEqual(expected_tag, res[-taglen:]) 6012 6013 # decrypt and verify 6014 op, _ = algo.accept() 6015 with op: 6016 msg = assoc + expected_ct + expected_tag 6017 op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, 6018 assoclen=assoclen) 6019 res = op.recv(len(msg) - taglen) 6020 self.assertEqual(plain, res[assoclen:]) 6021 6022 @support.requires_linux_version(4, 3) # see test_aes_cbc 6023 def test_drbg_pr_sha256(self): 6024 # deterministic random bit generator, prediction resistance, sha256 6025 with self.create_alg('rng', 'drbg_pr_sha256') as algo: 6026 extra_seed = os.urandom(32) 6027 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) 6028 op, _ = algo.accept() 6029 with op: 6030 rn = op.recv(32) 6031 self.assertEqual(len(rn), 32) 6032 6033 def test_sendmsg_afalg_args(self): 6034 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6035 with sock: 6036 with self.assertRaises(TypeError): 6037 sock.sendmsg_afalg() 6038 6039 with self.assertRaises(TypeError): 6040 sock.sendmsg_afalg(op=None) 6041 6042 with self.assertRaises(TypeError): 6043 sock.sendmsg_afalg(1) 6044 6045 with self.assertRaises(TypeError): 6046 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) 6047 6048 with self.assertRaises(TypeError): 6049 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) 6050 6051 def test_length_restriction(self): 6052 # bpo-35050, off-by-one error in length check 6053 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6054 self.addCleanup(sock.close) 6055 6056 # salg_type[14] 6057 with self.assertRaises(FileNotFoundError): 6058 sock.bind(("t" * 13, "name")) 6059 with self.assertRaisesRegex(ValueError, "type too long"): 6060 sock.bind(("t" * 14, "name")) 6061 6062 # salg_name[64] 6063 with self.assertRaises(FileNotFoundError): 6064 sock.bind(("type", "n" * 63)) 6065 with self.assertRaisesRegex(ValueError, "name too long"): 6066 sock.bind(("type", "n" * 64)) 6067 6068 6069@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") 6070class TestMSWindowsTCPFlags(unittest.TestCase): 6071 knownTCPFlags = { 6072 # available since long time ago 6073 'TCP_MAXSEG', 6074 'TCP_NODELAY', 6075 # available starting with Windows 10 1607 6076 'TCP_FASTOPEN', 6077 # available starting with Windows 10 1703 6078 'TCP_KEEPCNT', 6079 # available starting with Windows 10 1709 6080 'TCP_KEEPIDLE', 6081 'TCP_KEEPINTVL' 6082 } 6083 6084 def test_new_tcp_flags(self): 6085 provided = [s for s in dir(socket) if s.startswith('TCP')] 6086 unknown = [s for s in provided if s not in self.knownTCPFlags] 6087 6088 self.assertEqual([], unknown, 6089 "New TCP flags were discovered. See bpo-32394 for more information") 6090 6091 6092class CreateServerTest(unittest.TestCase): 6093 6094 def test_address(self): 6095 port = support.find_unused_port() 6096 with socket.create_server(("127.0.0.1", port)) as sock: 6097 self.assertEqual(sock.getsockname()[0], "127.0.0.1") 6098 self.assertEqual(sock.getsockname()[1], port) 6099 if support.IPV6_ENABLED: 6100 with socket.create_server(("::1", port), 6101 family=socket.AF_INET6) as sock: 6102 self.assertEqual(sock.getsockname()[0], "::1") 6103 self.assertEqual(sock.getsockname()[1], port) 6104 6105 def test_family_and_type(self): 6106 with socket.create_server(("127.0.0.1", 0)) as sock: 6107 self.assertEqual(sock.family, socket.AF_INET) 6108 self.assertEqual(sock.type, socket.SOCK_STREAM) 6109 if support.IPV6_ENABLED: 6110 with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: 6111 self.assertEqual(s.family, socket.AF_INET6) 6112 self.assertEqual(sock.type, socket.SOCK_STREAM) 6113 6114 def test_reuse_port(self): 6115 if not hasattr(socket, "SO_REUSEPORT"): 6116 with self.assertRaises(ValueError): 6117 socket.create_server(("localhost", 0), reuse_port=True) 6118 else: 6119 with socket.create_server(("localhost", 0)) as sock: 6120 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6121 self.assertEqual(opt, 0) 6122 with socket.create_server(("localhost", 0), reuse_port=True) as sock: 6123 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6124 self.assertNotEqual(opt, 0) 6125 6126 @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or 6127 not hasattr(_socket, 'IPV6_V6ONLY'), 6128 "IPV6_V6ONLY option not supported") 6129 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') 6130 def test_ipv6_only_default(self): 6131 with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: 6132 assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) 6133 6134 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6135 "dualstack_ipv6 not supported") 6136 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') 6137 def test_dualstack_ipv6_family(self): 6138 with socket.create_server(("::1", 0), family=socket.AF_INET6, 6139 dualstack_ipv6=True) as sock: 6140 self.assertEqual(sock.family, socket.AF_INET6) 6141 6142 6143class CreateServerFunctionalTest(unittest.TestCase): 6144 timeout = 3 6145 6146 def setUp(self): 6147 self.thread = None 6148 6149 def tearDown(self): 6150 if self.thread is not None: 6151 self.thread.join(self.timeout) 6152 6153 def echo_server(self, sock): 6154 def run(sock): 6155 with sock: 6156 conn, _ = sock.accept() 6157 with conn: 6158 event.wait(self.timeout) 6159 msg = conn.recv(1024) 6160 if not msg: 6161 return 6162 conn.sendall(msg) 6163 6164 event = threading.Event() 6165 sock.settimeout(self.timeout) 6166 self.thread = threading.Thread(target=run, args=(sock, )) 6167 self.thread.start() 6168 event.set() 6169 6170 def echo_client(self, addr, family): 6171 with socket.socket(family=family) as sock: 6172 sock.settimeout(self.timeout) 6173 sock.connect(addr) 6174 sock.sendall(b'foo') 6175 self.assertEqual(sock.recv(1024), b'foo') 6176 6177 def test_tcp4(self): 6178 port = support.find_unused_port() 6179 with socket.create_server(("", port)) as sock: 6180 self.echo_server(sock) 6181 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6182 6183 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') 6184 def test_tcp6(self): 6185 port = support.find_unused_port() 6186 with socket.create_server(("", port), 6187 family=socket.AF_INET6) as sock: 6188 self.echo_server(sock) 6189 self.echo_client(("::1", port), socket.AF_INET6) 6190 6191 # --- dual stack tests 6192 6193 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6194 "dualstack_ipv6 not supported") 6195 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') 6196 def test_dual_stack_client_v4(self): 6197 port = support.find_unused_port() 6198 with socket.create_server(("", port), family=socket.AF_INET6, 6199 dualstack_ipv6=True) as sock: 6200 self.echo_server(sock) 6201 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6202 6203 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6204 "dualstack_ipv6 not supported") 6205 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') 6206 def test_dual_stack_client_v6(self): 6207 port = support.find_unused_port() 6208 with socket.create_server(("", port), family=socket.AF_INET6, 6209 dualstack_ipv6=True) as sock: 6210 self.echo_server(sock) 6211 self.echo_client(("::1", port), socket.AF_INET6) 6212 6213 6214def test_main(): 6215 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, 6216 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, 6217 UDPTimeoutTest, CreateServerTest, CreateServerFunctionalTest] 6218 6219 tests.extend([ 6220 NonBlockingTCPTests, 6221 FileObjectClassTestCase, 6222 UnbufferedFileObjectClassTestCase, 6223 LineBufferedFileObjectClassTestCase, 6224 SmallBufferedFileObjectClassTestCase, 6225 UnicodeReadFileObjectClassTestCase, 6226 UnicodeWriteFileObjectClassTestCase, 6227 UnicodeReadWriteFileObjectClassTestCase, 6228 NetworkConnectionNoServer, 6229 NetworkConnectionAttributesTest, 6230 NetworkConnectionBehaviourTest, 6231 ContextManagersTest, 6232 InheritanceTest, 6233 NonblockConstantTest 6234 ]) 6235 tests.append(BasicSocketPairTest) 6236 tests.append(TestUnixDomain) 6237 tests.append(TestLinuxAbstractNamespace) 6238 tests.extend([TIPCTest, TIPCThreadableTest]) 6239 tests.extend([BasicCANTest, CANTest]) 6240 tests.extend([BasicRDSTest, RDSTest]) 6241 tests.append(LinuxKernelCryptoAPI) 6242 tests.append(BasicQIPCRTRTest) 6243 tests.extend([ 6244 BasicVSOCKTest, 6245 ThreadedVSOCKSocketStreamTest, 6246 ]) 6247 tests.extend([ 6248 CmsgMacroTests, 6249 SendmsgUDPTest, 6250 RecvmsgUDPTest, 6251 RecvmsgIntoUDPTest, 6252 SendmsgUDP6Test, 6253 RecvmsgUDP6Test, 6254 RecvmsgRFC3542AncillaryUDP6Test, 6255 RecvmsgIntoRFC3542AncillaryUDP6Test, 6256 RecvmsgIntoUDP6Test, 6257 SendmsgTCPTest, 6258 RecvmsgTCPTest, 6259 RecvmsgIntoTCPTest, 6260 SendmsgSCTPStreamTest, 6261 RecvmsgSCTPStreamTest, 6262 RecvmsgIntoSCTPStreamTest, 6263 SendmsgUnixStreamTest, 6264 RecvmsgUnixStreamTest, 6265 RecvmsgIntoUnixStreamTest, 6266 RecvmsgSCMRightsStreamTest, 6267 RecvmsgIntoSCMRightsStreamTest, 6268 # These are slow when setitimer() is not available 6269 InterruptedRecvTimeoutTest, 6270 InterruptedSendTimeoutTest, 6271 TestSocketSharing, 6272 SendfileUsingSendTest, 6273 SendfileUsingSendfileTest, 6274 ]) 6275 tests.append(TestMSWindowsTCPFlags) 6276 6277 thread_info = support.threading_setup() 6278 support.run_unittest(*tests) 6279 support.threading_cleanup(*thread_info) 6280 6281if __name__ == "__main__": 6282 test_main() 6283