1import unittest 2from test import support 3from test.support import os_helper 4from test.support import socket_helper 5from test.support import threading_helper 6 7import errno 8import io 9import itertools 10import socket 11import select 12import tempfile 13import time 14import traceback 15import queue 16import sys 17import os 18import platform 19import array 20import contextlib 21from weakref import proxy 22import signal 23import math 24import pickle 25import struct 26import random 27import shutil 28import string 29import _thread as thread 30import threading 31try: 32 import multiprocessing 33except ImportError: 34 multiprocessing = False 35try: 36 import fcntl 37except ImportError: 38 fcntl = None 39 40support.requires_working_socket(module=True) 41 42HOST = socket_helper.HOST 43# test unicode string and carriage return 44MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') 45 46VSOCKPORT = 1234 47AIX = platform.system() == "AIX" 48 49try: 50 import _socket 51except ImportError: 52 _socket = None 53 54def get_cid(): 55 if fcntl is None: 56 return None 57 if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'): 58 return None 59 try: 60 with open("/dev/vsock", "rb") as f: 61 r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ") 62 except OSError: 63 return None 64 else: 65 return struct.unpack("I", r)[0] 66 67def _have_socket_can(): 68 """Check whether CAN sockets are supported on this host.""" 69 try: 70 s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 71 except (AttributeError, OSError): 72 return False 73 else: 74 s.close() 75 return True 76 77def _have_socket_can_isotp(): 78 """Check whether CAN ISOTP sockets are supported on this host.""" 79 try: 80 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) 81 except (AttributeError, OSError): 82 return False 83 else: 84 s.close() 85 return True 86 87def _have_socket_can_j1939(): 88 """Check whether CAN J1939 sockets are supported on this host.""" 89 try: 90 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) 91 except (AttributeError, OSError): 92 return False 93 else: 94 s.close() 95 return True 96 97def _have_socket_rds(): 98 """Check whether RDS sockets are supported on this host.""" 99 try: 100 s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 101 except (AttributeError, OSError): 102 return False 103 else: 104 s.close() 105 return True 106 107def _have_socket_alg(): 108 """Check whether AF_ALG sockets are supported on this host.""" 109 try: 110 s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 111 except (AttributeError, OSError): 112 return False 113 else: 114 s.close() 115 return True 116 117def _have_socket_qipcrtr(): 118 """Check whether AF_QIPCRTR sockets are supported on this host.""" 119 try: 120 s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0) 121 except (AttributeError, OSError): 122 return False 123 else: 124 s.close() 125 return True 126 127def _have_socket_vsock(): 128 """Check whether AF_VSOCK sockets are supported on this host.""" 129 ret = get_cid() is not None 130 return ret 131 132 133def _have_socket_bluetooth(): 134 """Check whether AF_BLUETOOTH sockets are supported on this host.""" 135 try: 136 # RFCOMM is supported by all platforms with bluetooth support. Windows 137 # does not support omitting the protocol. 138 s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) 139 except (AttributeError, OSError): 140 return False 141 else: 142 s.close() 143 return True 144 145 146@contextlib.contextmanager 147def socket_setdefaulttimeout(timeout): 148 old_timeout = socket.getdefaulttimeout() 149 try: 150 socket.setdefaulttimeout(timeout) 151 yield 152 finally: 153 socket.setdefaulttimeout(old_timeout) 154 155 156HAVE_SOCKET_CAN = _have_socket_can() 157 158HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() 159 160HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939() 161 162HAVE_SOCKET_RDS = _have_socket_rds() 163 164HAVE_SOCKET_ALG = _have_socket_alg() 165 166HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr() 167 168HAVE_SOCKET_VSOCK = _have_socket_vsock() 169 170HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE") 171 172HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth() 173 174# Size in bytes of the int type 175SIZEOF_INT = array.array("i").itemsize 176 177class SocketTCPTest(unittest.TestCase): 178 179 def setUp(self): 180 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 181 self.port = socket_helper.bind_port(self.serv) 182 self.serv.listen() 183 184 def tearDown(self): 185 self.serv.close() 186 self.serv = None 187 188class SocketUDPTest(unittest.TestCase): 189 190 def setUp(self): 191 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 192 self.port = socket_helper.bind_port(self.serv) 193 194 def tearDown(self): 195 self.serv.close() 196 self.serv = None 197 198class SocketUDPLITETest(SocketUDPTest): 199 200 def setUp(self): 201 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 202 self.port = socket_helper.bind_port(self.serv) 203 204class ThreadSafeCleanupTestCase: 205 """Subclass of unittest.TestCase with thread-safe cleanup methods. 206 207 This subclass protects the addCleanup() and doCleanups() methods 208 with a recursive lock. 209 """ 210 211 def __init__(self, *args, **kwargs): 212 super().__init__(*args, **kwargs) 213 self._cleanup_lock = threading.RLock() 214 215 def addCleanup(self, *args, **kwargs): 216 with self._cleanup_lock: 217 return super().addCleanup(*args, **kwargs) 218 219 def doCleanups(self, *args, **kwargs): 220 with self._cleanup_lock: 221 return super().doCleanups(*args, **kwargs) 222 223class SocketCANTest(unittest.TestCase): 224 225 """To be able to run this test, a `vcan0` CAN interface can be created with 226 the following commands: 227 # modprobe vcan 228 # ip link add dev vcan0 type vcan 229 # ip link set up vcan0 230 """ 231 interface = 'vcan0' 232 bufsize = 128 233 234 """The CAN frame structure is defined in <linux/can.h>: 235 236 struct can_frame { 237 canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ 238 __u8 can_dlc; /* data length code: 0 .. 8 */ 239 __u8 data[8] __attribute__((aligned(8))); 240 }; 241 """ 242 can_frame_fmt = "=IB3x8s" 243 can_frame_size = struct.calcsize(can_frame_fmt) 244 245 """The Broadcast Management Command frame structure is defined 246 in <linux/can/bcm.h>: 247 248 struct bcm_msg_head { 249 __u32 opcode; 250 __u32 flags; 251 __u32 count; 252 struct timeval ival1, ival2; 253 canid_t can_id; 254 __u32 nframes; 255 struct can_frame frames[0]; 256 } 257 258 `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see 259 `struct can_frame` definition). Must use native not standard types for packing. 260 """ 261 bcm_cmd_msg_fmt = "@3I4l2I" 262 bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) 263 264 def setUp(self): 265 self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 266 self.addCleanup(self.s.close) 267 try: 268 self.s.bind((self.interface,)) 269 except OSError: 270 self.skipTest('network interface `%s` does not exist' % 271 self.interface) 272 273 274class SocketRDSTest(unittest.TestCase): 275 276 """To be able to run this test, the `rds` kernel module must be loaded: 277 # modprobe rds 278 """ 279 bufsize = 8192 280 281 def setUp(self): 282 self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 283 self.addCleanup(self.serv.close) 284 try: 285 self.port = socket_helper.bind_port(self.serv) 286 except OSError: 287 self.skipTest('unable to bind RDS socket') 288 289 290class ThreadableTest: 291 """Threadable Test class 292 293 The ThreadableTest class makes it easy to create a threaded 294 client/server pair from an existing unit test. To create a 295 new threaded class from an existing unit test, use multiple 296 inheritance: 297 298 class NewClass (OldClass, ThreadableTest): 299 pass 300 301 This class defines two new fixture functions with obvious 302 purposes for overriding: 303 304 clientSetUp () 305 clientTearDown () 306 307 Any new test functions within the class must then define 308 tests in pairs, where the test name is preceded with a 309 '_' to indicate the client portion of the test. Ex: 310 311 def testFoo(self): 312 # Server portion 313 314 def _testFoo(self): 315 # Client portion 316 317 Any exceptions raised by the clients during their tests 318 are caught and transferred to the main thread to alert 319 the testing framework. 320 321 Note, the server setup function cannot call any blocking 322 functions that rely on the client thread during setup, 323 unless serverExplicitReady() is called just before 324 the blocking call (such as in setting up a client/server 325 connection and performing the accept() in setUp(). 326 """ 327 328 def __init__(self): 329 # Swap the true setup function 330 self.__setUp = self.setUp 331 self.setUp = self._setUp 332 333 def serverExplicitReady(self): 334 """This method allows the server to explicitly indicate that 335 it wants the client thread to proceed. This is useful if the 336 server is about to execute a blocking routine that is 337 dependent upon the client thread during its setup routine.""" 338 self.server_ready.set() 339 340 def _setUp(self): 341 self.enterContext(threading_helper.wait_threads_exit()) 342 343 self.server_ready = threading.Event() 344 self.client_ready = threading.Event() 345 self.done = threading.Event() 346 self.queue = queue.Queue(1) 347 self.server_crashed = False 348 349 def raise_queued_exception(): 350 if self.queue.qsize(): 351 raise self.queue.get() 352 self.addCleanup(raise_queued_exception) 353 354 # Do some munging to start the client test. 355 methodname = self.id() 356 i = methodname.rfind('.') 357 methodname = methodname[i+1:] 358 test_method = getattr(self, '_' + methodname) 359 self.client_thread = thread.start_new_thread( 360 self.clientRun, (test_method,)) 361 362 try: 363 self.__setUp() 364 except: 365 self.server_crashed = True 366 raise 367 finally: 368 self.server_ready.set() 369 self.client_ready.wait() 370 self.addCleanup(self.done.wait) 371 372 def clientRun(self, test_func): 373 self.server_ready.wait() 374 try: 375 self.clientSetUp() 376 except BaseException as e: 377 self.queue.put(e) 378 self.clientTearDown() 379 return 380 finally: 381 self.client_ready.set() 382 if self.server_crashed: 383 self.clientTearDown() 384 return 385 if not hasattr(test_func, '__call__'): 386 raise TypeError("test_func must be a callable function") 387 try: 388 test_func() 389 except BaseException as e: 390 self.queue.put(e) 391 finally: 392 self.clientTearDown() 393 394 def clientSetUp(self): 395 raise NotImplementedError("clientSetUp must be implemented.") 396 397 def clientTearDown(self): 398 self.done.set() 399 thread.exit() 400 401class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 402 403 def __init__(self, methodName='runTest'): 404 SocketTCPTest.__init__(self, methodName=methodName) 405 ThreadableTest.__init__(self) 406 407 def clientSetUp(self): 408 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 409 410 def clientTearDown(self): 411 self.cli.close() 412 self.cli = None 413 ThreadableTest.clientTearDown(self) 414 415class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 416 417 def __init__(self, methodName='runTest'): 418 SocketUDPTest.__init__(self, methodName=methodName) 419 ThreadableTest.__init__(self) 420 421 def clientSetUp(self): 422 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 423 424 def clientTearDown(self): 425 self.cli.close() 426 self.cli = None 427 ThreadableTest.clientTearDown(self) 428 429@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 430 'UDPLITE sockets required for this test.') 431class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest): 432 433 def __init__(self, methodName='runTest'): 434 SocketUDPLITETest.__init__(self, methodName=methodName) 435 ThreadableTest.__init__(self) 436 437 def clientSetUp(self): 438 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 439 440 def clientTearDown(self): 441 self.cli.close() 442 self.cli = None 443 ThreadableTest.clientTearDown(self) 444 445class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): 446 447 def __init__(self, methodName='runTest'): 448 SocketCANTest.__init__(self, methodName=methodName) 449 ThreadableTest.__init__(self) 450 451 def clientSetUp(self): 452 self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 453 try: 454 self.cli.bind((self.interface,)) 455 except OSError: 456 # skipTest should not be called here, and will be called in the 457 # server instead 458 pass 459 460 def clientTearDown(self): 461 self.cli.close() 462 self.cli = None 463 ThreadableTest.clientTearDown(self) 464 465class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): 466 467 def __init__(self, methodName='runTest'): 468 SocketRDSTest.__init__(self, methodName=methodName) 469 ThreadableTest.__init__(self) 470 471 def clientSetUp(self): 472 self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 473 try: 474 # RDS sockets must be bound explicitly to send or receive data 475 self.cli.bind((HOST, 0)) 476 self.cli_addr = self.cli.getsockname() 477 except OSError: 478 # skipTest should not be called here, and will be called in the 479 # server instead 480 pass 481 482 def clientTearDown(self): 483 self.cli.close() 484 self.cli = None 485 ThreadableTest.clientTearDown(self) 486 487@unittest.skipIf(fcntl is None, "need fcntl") 488@unittest.skipUnless(HAVE_SOCKET_VSOCK, 489 'VSOCK sockets required for this test.') 490@unittest.skipUnless(get_cid() != 2, 491 "This test can only be run on a virtual guest.") 492class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest): 493 494 def __init__(self, methodName='runTest'): 495 unittest.TestCase.__init__(self, methodName=methodName) 496 ThreadableTest.__init__(self) 497 498 def setUp(self): 499 self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 500 self.addCleanup(self.serv.close) 501 self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT)) 502 self.serv.listen() 503 self.serverExplicitReady() 504 self.conn, self.connaddr = self.serv.accept() 505 self.addCleanup(self.conn.close) 506 507 def clientSetUp(self): 508 time.sleep(0.1) 509 self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 510 self.addCleanup(self.cli.close) 511 cid = get_cid() 512 self.cli.connect((cid, VSOCKPORT)) 513 514 def testStream(self): 515 msg = self.conn.recv(1024) 516 self.assertEqual(msg, MSG) 517 518 def _testStream(self): 519 self.cli.send(MSG) 520 self.cli.close() 521 522class SocketConnectedTest(ThreadedTCPSocketTest): 523 """Socket tests for client-server connection. 524 525 self.cli_conn is a client socket connected to the server. The 526 setUp() method guarantees that it is connected to the server. 527 """ 528 529 def __init__(self, methodName='runTest'): 530 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 531 532 def setUp(self): 533 ThreadedTCPSocketTest.setUp(self) 534 # Indicate explicitly we're ready for the client thread to 535 # proceed and then perform the blocking call to accept 536 self.serverExplicitReady() 537 conn, addr = self.serv.accept() 538 self.cli_conn = conn 539 540 def tearDown(self): 541 self.cli_conn.close() 542 self.cli_conn = None 543 ThreadedTCPSocketTest.tearDown(self) 544 545 def clientSetUp(self): 546 ThreadedTCPSocketTest.clientSetUp(self) 547 self.cli.connect((HOST, self.port)) 548 self.serv_conn = self.cli 549 550 def clientTearDown(self): 551 self.serv_conn.close() 552 self.serv_conn = None 553 ThreadedTCPSocketTest.clientTearDown(self) 554 555class SocketPairTest(unittest.TestCase, ThreadableTest): 556 557 def __init__(self, methodName='runTest'): 558 unittest.TestCase.__init__(self, methodName=methodName) 559 ThreadableTest.__init__(self) 560 self.cli = None 561 self.serv = None 562 563 def socketpair(self): 564 # To be overridden by some child classes. 565 return socket.socketpair() 566 567 def setUp(self): 568 self.serv, self.cli = self.socketpair() 569 570 def tearDown(self): 571 if self.serv: 572 self.serv.close() 573 self.serv = None 574 575 def clientSetUp(self): 576 pass 577 578 def clientTearDown(self): 579 if self.cli: 580 self.cli.close() 581 self.cli = None 582 ThreadableTest.clientTearDown(self) 583 584 585# The following classes are used by the sendmsg()/recvmsg() tests. 586# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase 587# gives a drop-in replacement for SocketConnectedTest, but different 588# address families can be used, and the attributes serv_addr and 589# cli_addr will be set to the addresses of the endpoints. 590 591class SocketTestBase(unittest.TestCase): 592 """A base class for socket tests. 593 594 Subclasses must provide methods newSocket() to return a new socket 595 and bindSock(sock) to bind it to an unused address. 596 597 Creates a socket self.serv and sets self.serv_addr to its address. 598 """ 599 600 def setUp(self): 601 self.serv = self.newSocket() 602 self.bindServer() 603 604 def bindServer(self): 605 """Bind server socket and set self.serv_addr to its address.""" 606 self.bindSock(self.serv) 607 self.serv_addr = self.serv.getsockname() 608 609 def tearDown(self): 610 self.serv.close() 611 self.serv = None 612 613 614class SocketListeningTestMixin(SocketTestBase): 615 """Mixin to listen on the server socket.""" 616 617 def setUp(self): 618 super().setUp() 619 self.serv.listen() 620 621 622class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, 623 ThreadableTest): 624 """Mixin to add client socket and allow client/server tests. 625 626 Client socket is self.cli and its address is self.cli_addr. See 627 ThreadableTest for usage information. 628 """ 629 630 def __init__(self, *args, **kwargs): 631 super().__init__(*args, **kwargs) 632 ThreadableTest.__init__(self) 633 634 def clientSetUp(self): 635 self.cli = self.newClientSocket() 636 self.bindClient() 637 638 def newClientSocket(self): 639 """Return a new socket for use as client.""" 640 return self.newSocket() 641 642 def bindClient(self): 643 """Bind client socket and set self.cli_addr to its address.""" 644 self.bindSock(self.cli) 645 self.cli_addr = self.cli.getsockname() 646 647 def clientTearDown(self): 648 self.cli.close() 649 self.cli = None 650 ThreadableTest.clientTearDown(self) 651 652 653class ConnectedStreamTestMixin(SocketListeningTestMixin, 654 ThreadedSocketTestMixin): 655 """Mixin to allow client/server stream tests with connected client. 656 657 Server's socket representing connection to client is self.cli_conn 658 and client's connection to server is self.serv_conn. (Based on 659 SocketConnectedTest.) 660 """ 661 662 def setUp(self): 663 super().setUp() 664 # Indicate explicitly we're ready for the client thread to 665 # proceed and then perform the blocking call to accept 666 self.serverExplicitReady() 667 conn, addr = self.serv.accept() 668 self.cli_conn = conn 669 670 def tearDown(self): 671 self.cli_conn.close() 672 self.cli_conn = None 673 super().tearDown() 674 675 def clientSetUp(self): 676 super().clientSetUp() 677 self.cli.connect(self.serv_addr) 678 self.serv_conn = self.cli 679 680 def clientTearDown(self): 681 try: 682 self.serv_conn.close() 683 self.serv_conn = None 684 except AttributeError: 685 pass 686 super().clientTearDown() 687 688 689class UnixSocketTestBase(SocketTestBase): 690 """Base class for Unix-domain socket tests.""" 691 692 # This class is used for file descriptor passing tests, so we 693 # create the sockets in a private directory so that other users 694 # can't send anything that might be problematic for a privileged 695 # user running the tests. 696 697 def setUp(self): 698 self.dir_path = tempfile.mkdtemp() 699 self.addCleanup(os.rmdir, self.dir_path) 700 super().setUp() 701 702 def bindSock(self, sock): 703 path = tempfile.mktemp(dir=self.dir_path) 704 socket_helper.bind_unix_socket(sock, path) 705 self.addCleanup(os_helper.unlink, path) 706 707class UnixStreamBase(UnixSocketTestBase): 708 """Base class for Unix-domain SOCK_STREAM tests.""" 709 710 def newSocket(self): 711 return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 712 713 714class InetTestBase(SocketTestBase): 715 """Base class for IPv4 socket tests.""" 716 717 host = HOST 718 719 def setUp(self): 720 super().setUp() 721 self.port = self.serv_addr[1] 722 723 def bindSock(self, sock): 724 socket_helper.bind_port(sock, host=self.host) 725 726class TCPTestBase(InetTestBase): 727 """Base class for TCP-over-IPv4 tests.""" 728 729 def newSocket(self): 730 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 731 732class UDPTestBase(InetTestBase): 733 """Base class for UDP-over-IPv4 tests.""" 734 735 def newSocket(self): 736 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 737 738class UDPLITETestBase(InetTestBase): 739 """Base class for UDPLITE-over-IPv4 tests.""" 740 741 def newSocket(self): 742 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 743 744class SCTPStreamBase(InetTestBase): 745 """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" 746 747 def newSocket(self): 748 return socket.socket(socket.AF_INET, socket.SOCK_STREAM, 749 socket.IPPROTO_SCTP) 750 751 752class Inet6TestBase(InetTestBase): 753 """Base class for IPv6 socket tests.""" 754 755 host = socket_helper.HOSTv6 756 757class UDP6TestBase(Inet6TestBase): 758 """Base class for UDP-over-IPv6 tests.""" 759 760 def newSocket(self): 761 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 762 763class UDPLITE6TestBase(Inet6TestBase): 764 """Base class for UDPLITE-over-IPv6 tests.""" 765 766 def newSocket(self): 767 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 768 769 770# Test-skipping decorators for use with ThreadableTest. 771 772def skipWithClientIf(condition, reason): 773 """Skip decorated test if condition is true, add client_skip decorator. 774 775 If the decorated object is not a class, sets its attribute 776 "client_skip" to a decorator which will return an empty function 777 if the test is to be skipped, or the original function if it is 778 not. This can be used to avoid running the client part of a 779 skipped test when using ThreadableTest. 780 """ 781 def client_pass(*args, **kwargs): 782 pass 783 def skipdec(obj): 784 retval = unittest.skip(reason)(obj) 785 if not isinstance(obj, type): 786 retval.client_skip = lambda f: client_pass 787 return retval 788 def noskipdec(obj): 789 if not (isinstance(obj, type) or hasattr(obj, "client_skip")): 790 obj.client_skip = lambda f: f 791 return obj 792 return skipdec if condition else noskipdec 793 794 795def requireAttrs(obj, *attributes): 796 """Skip decorated test if obj is missing any of the given attributes. 797 798 Sets client_skip attribute as skipWithClientIf() does. 799 """ 800 missing = [name for name in attributes if not hasattr(obj, name)] 801 return skipWithClientIf( 802 missing, "don't have " + ", ".join(name for name in missing)) 803 804 805def requireSocket(*args): 806 """Skip decorated test if a socket cannot be created with given arguments. 807 808 When an argument is given as a string, will use the value of that 809 attribute of the socket module, or skip the test if it doesn't 810 exist. Sets client_skip attribute as skipWithClientIf() does. 811 """ 812 err = None 813 missing = [obj for obj in args if 814 isinstance(obj, str) and not hasattr(socket, obj)] 815 if missing: 816 err = "don't have " + ", ".join(name for name in missing) 817 else: 818 callargs = [getattr(socket, obj) if isinstance(obj, str) else obj 819 for obj in args] 820 try: 821 s = socket.socket(*callargs) 822 except OSError as e: 823 # XXX: check errno? 824 err = str(e) 825 else: 826 s.close() 827 return skipWithClientIf( 828 err is not None, 829 "can't create socket({0}): {1}".format( 830 ", ".join(str(o) for o in args), err)) 831 832 833####################################################################### 834## Begin Tests 835 836class GeneralModuleTests(unittest.TestCase): 837 838 def test_SocketType_is_socketobject(self): 839 import _socket 840 self.assertTrue(socket.SocketType is _socket.socket) 841 s = socket.socket() 842 self.assertIsInstance(s, socket.SocketType) 843 s.close() 844 845 def test_repr(self): 846 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 847 with s: 848 self.assertIn('fd=%i' % s.fileno(), repr(s)) 849 self.assertIn('family=%s' % socket.AF_INET, repr(s)) 850 self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) 851 self.assertIn('proto=0', repr(s)) 852 self.assertNotIn('raddr', repr(s)) 853 s.bind(('127.0.0.1', 0)) 854 self.assertIn('laddr', repr(s)) 855 self.assertIn(str(s.getsockname()), repr(s)) 856 self.assertIn('[closed]', repr(s)) 857 self.assertNotIn('laddr', repr(s)) 858 859 @unittest.skipUnless(_socket is not None, 'need _socket module') 860 def test_csocket_repr(self): 861 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 862 try: 863 expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' 864 % (s.fileno(), s.family, s.type, s.proto)) 865 self.assertEqual(repr(s), expected) 866 finally: 867 s.close() 868 expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' 869 % (s.family, s.type, s.proto)) 870 self.assertEqual(repr(s), expected) 871 872 def test_weakref(self): 873 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 874 p = proxy(s) 875 self.assertEqual(p.fileno(), s.fileno()) 876 s = None 877 support.gc_collect() # For PyPy or other GCs. 878 try: 879 p.fileno() 880 except ReferenceError: 881 pass 882 else: 883 self.fail('Socket proxy still exists') 884 885 def testSocketError(self): 886 # Testing socket module exceptions 887 msg = "Error raising socket exception (%s)." 888 with self.assertRaises(OSError, msg=msg % 'OSError'): 889 raise OSError 890 with self.assertRaises(OSError, msg=msg % 'socket.herror'): 891 raise socket.herror 892 with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): 893 raise socket.gaierror 894 895 def testSendtoErrors(self): 896 # Testing that sendto doesn't mask failures. See #10169. 897 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 898 self.addCleanup(s.close) 899 s.bind(('', 0)) 900 sockname = s.getsockname() 901 # 2 args 902 with self.assertRaises(TypeError) as cm: 903 s.sendto('\u2620', sockname) 904 self.assertEqual(str(cm.exception), 905 "a bytes-like object is required, not 'str'") 906 with self.assertRaises(TypeError) as cm: 907 s.sendto(5j, sockname) 908 self.assertEqual(str(cm.exception), 909 "a bytes-like object is required, not 'complex'") 910 with self.assertRaises(TypeError) as cm: 911 s.sendto(b'foo', None) 912 self.assertIn('not NoneType',str(cm.exception)) 913 # 3 args 914 with self.assertRaises(TypeError) as cm: 915 s.sendto('\u2620', 0, sockname) 916 self.assertEqual(str(cm.exception), 917 "a bytes-like object is required, not 'str'") 918 with self.assertRaises(TypeError) as cm: 919 s.sendto(5j, 0, sockname) 920 self.assertEqual(str(cm.exception), 921 "a bytes-like object is required, not 'complex'") 922 with self.assertRaises(TypeError) as cm: 923 s.sendto(b'foo', 0, None) 924 self.assertIn('not NoneType', str(cm.exception)) 925 with self.assertRaises(TypeError) as cm: 926 s.sendto(b'foo', 'bar', sockname) 927 with self.assertRaises(TypeError) as cm: 928 s.sendto(b'foo', None, None) 929 # wrong number of args 930 with self.assertRaises(TypeError) as cm: 931 s.sendto(b'foo') 932 self.assertIn('(1 given)', str(cm.exception)) 933 with self.assertRaises(TypeError) as cm: 934 s.sendto(b'foo', 0, sockname, 4) 935 self.assertIn('(4 given)', str(cm.exception)) 936 937 def testCrucialConstants(self): 938 # Testing for mission critical constants 939 socket.AF_INET 940 if socket.has_ipv6: 941 socket.AF_INET6 942 socket.SOCK_STREAM 943 socket.SOCK_DGRAM 944 socket.SOCK_RAW 945 socket.SOCK_RDM 946 socket.SOCK_SEQPACKET 947 socket.SOL_SOCKET 948 socket.SO_REUSEADDR 949 950 def testCrucialIpProtoConstants(self): 951 socket.IPPROTO_TCP 952 socket.IPPROTO_UDP 953 if socket.has_ipv6: 954 socket.IPPROTO_IPV6 955 956 @unittest.skipUnless(os.name == "nt", "Windows specific") 957 def testWindowsSpecificConstants(self): 958 socket.IPPROTO_ICLFXBM 959 socket.IPPROTO_ST 960 socket.IPPROTO_CBT 961 socket.IPPROTO_IGP 962 socket.IPPROTO_RDP 963 socket.IPPROTO_PGM 964 socket.IPPROTO_L2TP 965 socket.IPPROTO_SCTP 966 967 @unittest.skipIf(support.is_wasi, "WASI is missing these methods") 968 def test_socket_methods(self): 969 # socket methods that depend on a configure HAVE_ check. They should 970 # be present on all platforms except WASI. 971 names = [ 972 "_accept", "bind", "connect", "connect_ex", "getpeername", 973 "getsockname", "listen", "recvfrom", "recvfrom_into", "sendto", 974 "setsockopt", "shutdown" 975 ] 976 for name in names: 977 if not hasattr(socket.socket, name): 978 self.fail(f"socket method {name} is missing") 979 980 @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') 981 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 982 def test3542SocketOptions(self): 983 # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542 984 opts = { 985 'IPV6_CHECKSUM', 986 'IPV6_DONTFRAG', 987 'IPV6_DSTOPTS', 988 'IPV6_HOPLIMIT', 989 'IPV6_HOPOPTS', 990 'IPV6_NEXTHOP', 991 'IPV6_PATHMTU', 992 'IPV6_PKTINFO', 993 'IPV6_RECVDSTOPTS', 994 'IPV6_RECVHOPLIMIT', 995 'IPV6_RECVHOPOPTS', 996 'IPV6_RECVPATHMTU', 997 'IPV6_RECVPKTINFO', 998 'IPV6_RECVRTHDR', 999 'IPV6_RECVTCLASS', 1000 'IPV6_RTHDR', 1001 'IPV6_RTHDRDSTOPTS', 1002 'IPV6_RTHDR_TYPE_0', 1003 'IPV6_TCLASS', 1004 'IPV6_USE_MIN_MTU', 1005 } 1006 for opt in opts: 1007 self.assertTrue( 1008 hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'" 1009 ) 1010 1011 def testHostnameRes(self): 1012 # Testing hostname resolution mechanisms 1013 hostname = socket.gethostname() 1014 try: 1015 ip = socket.gethostbyname(hostname) 1016 except OSError: 1017 # Probably name lookup wasn't set up right; skip this test 1018 self.skipTest('name lookup failure') 1019 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 1020 try: 1021 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 1022 except OSError: 1023 # Probably a similar problem as above; skip this test 1024 self.skipTest('name lookup failure') 1025 all_host_names = [hostname, hname] + aliases 1026 fqhn = socket.getfqdn(ip) 1027 if not fqhn in all_host_names: 1028 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 1029 1030 def test_host_resolution(self): 1031 for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']: 1032 self.assertEqual(socket.gethostbyname(addr), addr) 1033 1034 # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have 1035 # a matching name entry (e.g. 'ip6-localhost') 1036 for host in [socket_helper.HOSTv4]: 1037 self.assertIn(host, socket.gethostbyaddr(host)[2]) 1038 1039 def test_host_resolution_bad_address(self): 1040 # These are all malformed IP addresses and expected not to resolve to 1041 # any result. But some ISPs, e.g. AWS and AT&T, may successfully 1042 # resolve these IPs. In particular, AT&T's DNS Error Assist service 1043 # will break this test. See https://bugs.python.org/issue42092 for a 1044 # workaround. 1045 explanation = ( 1046 "resolving an invalid IP address did not raise OSError; " 1047 "can be caused by a broken DNS server" 1048 ) 1049 for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', 1050 '1:1:1:1:1:1:1:1:1']: 1051 with self.assertRaises(OSError, msg=addr): 1052 socket.gethostbyname(addr) 1053 with self.assertRaises(OSError, msg=explanation): 1054 socket.gethostbyaddr(addr) 1055 1056 @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") 1057 @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") 1058 def test_sethostname(self): 1059 oldhn = socket.gethostname() 1060 try: 1061 socket.sethostname('new') 1062 except OSError as e: 1063 if e.errno == errno.EPERM: 1064 self.skipTest("test should be run as root") 1065 else: 1066 raise 1067 try: 1068 # running test as root! 1069 self.assertEqual(socket.gethostname(), 'new') 1070 # Should work with bytes objects too 1071 socket.sethostname(b'bar') 1072 self.assertEqual(socket.gethostname(), 'bar') 1073 finally: 1074 socket.sethostname(oldhn) 1075 1076 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 1077 'socket.if_nameindex() not available.') 1078 def testInterfaceNameIndex(self): 1079 interfaces = socket.if_nameindex() 1080 for index, name in interfaces: 1081 self.assertIsInstance(index, int) 1082 self.assertIsInstance(name, str) 1083 # interface indices are non-zero integers 1084 self.assertGreater(index, 0) 1085 _index = socket.if_nametoindex(name) 1086 self.assertIsInstance(_index, int) 1087 self.assertEqual(index, _index) 1088 _name = socket.if_indextoname(index) 1089 self.assertIsInstance(_name, str) 1090 self.assertEqual(name, _name) 1091 1092 @unittest.skipUnless(hasattr(socket, 'if_indextoname'), 1093 'socket.if_indextoname() not available.') 1094 def testInvalidInterfaceIndexToName(self): 1095 self.assertRaises(OSError, socket.if_indextoname, 0) 1096 self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') 1097 1098 @unittest.skipUnless(hasattr(socket, 'if_nametoindex'), 1099 'socket.if_nametoindex() not available.') 1100 def testInvalidInterfaceNameToIndex(self): 1101 self.assertRaises(TypeError, socket.if_nametoindex, 0) 1102 self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') 1103 1104 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 1105 'test needs sys.getrefcount()') 1106 def testRefCountGetNameInfo(self): 1107 # Testing reference count for getnameinfo 1108 try: 1109 # On some versions, this loses a reference 1110 orig = sys.getrefcount(__name__) 1111 socket.getnameinfo(__name__,0) 1112 except TypeError: 1113 if sys.getrefcount(__name__) != orig: 1114 self.fail("socket.getnameinfo loses a reference") 1115 1116 def testInterpreterCrash(self): 1117 # Making sure getnameinfo doesn't crash the interpreter 1118 try: 1119 # On some versions, this crashes the interpreter. 1120 socket.getnameinfo(('x', 0, 0, 0), 0) 1121 except OSError: 1122 pass 1123 1124 def testNtoH(self): 1125 # This just checks that htons etc. are their own inverse, 1126 # when looking at the lower 16 or 32 bits. 1127 sizes = {socket.htonl: 32, socket.ntohl: 32, 1128 socket.htons: 16, socket.ntohs: 16} 1129 for func, size in sizes.items(): 1130 mask = (1<<size) - 1 1131 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 1132 self.assertEqual(i & mask, func(func(i&mask)) & mask) 1133 1134 swapped = func(mask) 1135 self.assertEqual(swapped & mask, mask) 1136 self.assertRaises(OverflowError, func, 1<<34) 1137 1138 @support.cpython_only 1139 def testNtoHErrors(self): 1140 import _testcapi 1141 s_good_values = [0, 1, 2, 0xffff] 1142 l_good_values = s_good_values + [0xffffffff] 1143 l_bad_values = [-1, -2, 1<<32, 1<<1000] 1144 s_bad_values = ( 1145 l_bad_values + 1146 [_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] + 1147 [1 << 16, _testcapi.INT_MAX] 1148 ) 1149 for k in s_good_values: 1150 socket.ntohs(k) 1151 socket.htons(k) 1152 for k in l_good_values: 1153 socket.ntohl(k) 1154 socket.htonl(k) 1155 for k in s_bad_values: 1156 self.assertRaises(OverflowError, socket.ntohs, k) 1157 self.assertRaises(OverflowError, socket.htons, k) 1158 for k in l_bad_values: 1159 self.assertRaises(OverflowError, socket.ntohl, k) 1160 self.assertRaises(OverflowError, socket.htonl, k) 1161 1162 def testGetServBy(self): 1163 eq = self.assertEqual 1164 # Find one service that exists, then check all the related interfaces. 1165 # I've ordered this by protocols that have both a tcp and udp 1166 # protocol, at least for modern Linuxes. 1167 if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) 1168 or sys.platform in ('linux', 'darwin')): 1169 # avoid the 'echo' service on this platform, as there is an 1170 # assumption breaking non-standard port/protocol entry 1171 services = ('daytime', 'qotd', 'domain') 1172 else: 1173 services = ('echo', 'daytime', 'domain') 1174 for service in services: 1175 try: 1176 port = socket.getservbyname(service, 'tcp') 1177 break 1178 except OSError: 1179 pass 1180 else: 1181 raise OSError 1182 # Try same call with optional protocol omitted 1183 # Issue #26936: Android getservbyname() was broken before API 23. 1184 if (not hasattr(sys, 'getandroidapilevel') or 1185 sys.getandroidapilevel() >= 23): 1186 port2 = socket.getservbyname(service) 1187 eq(port, port2) 1188 # Try udp, but don't barf if it doesn't exist 1189 try: 1190 udpport = socket.getservbyname(service, 'udp') 1191 except OSError: 1192 udpport = None 1193 else: 1194 eq(udpport, port) 1195 # Now make sure the lookup by port returns the same service name 1196 # Issue #26936: Android getservbyport() is broken. 1197 if not support.is_android: 1198 eq(socket.getservbyport(port2), service) 1199 eq(socket.getservbyport(port, 'tcp'), service) 1200 if udpport is not None: 1201 eq(socket.getservbyport(udpport, 'udp'), service) 1202 # Make sure getservbyport does not accept out of range ports. 1203 self.assertRaises(OverflowError, socket.getservbyport, -1) 1204 self.assertRaises(OverflowError, socket.getservbyport, 65536) 1205 1206 def testDefaultTimeout(self): 1207 # Testing default timeout 1208 # The default timeout should initially be None 1209 self.assertEqual(socket.getdefaulttimeout(), None) 1210 with socket.socket() as s: 1211 self.assertEqual(s.gettimeout(), None) 1212 1213 # Set the default timeout to 10, and see if it propagates 1214 with socket_setdefaulttimeout(10): 1215 self.assertEqual(socket.getdefaulttimeout(), 10) 1216 with socket.socket() as sock: 1217 self.assertEqual(sock.gettimeout(), 10) 1218 1219 # Reset the default timeout to None, and see if it propagates 1220 socket.setdefaulttimeout(None) 1221 self.assertEqual(socket.getdefaulttimeout(), None) 1222 with socket.socket() as sock: 1223 self.assertEqual(sock.gettimeout(), None) 1224 1225 # Check that setting it to an invalid value raises ValueError 1226 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 1227 1228 # Check that setting it to an invalid type raises TypeError 1229 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 1230 1231 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 1232 'test needs socket.inet_aton()') 1233 def testIPv4_inet_aton_fourbytes(self): 1234 # Test that issue1008086 and issue767150 are fixed. 1235 # It must return 4 bytes. 1236 self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) 1237 self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) 1238 1239 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1240 'test needs socket.inet_pton()') 1241 def testIPv4toString(self): 1242 from socket import inet_aton as f, inet_pton, AF_INET 1243 g = lambda a: inet_pton(AF_INET, a) 1244 1245 assertInvalid = lambda func,a: self.assertRaises( 1246 (OSError, ValueError), func, a 1247 ) 1248 1249 self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) 1250 self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) 1251 self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) 1252 self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) 1253 self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) 1254 # bpo-29972: inet_pton() doesn't fail on AIX 1255 if not AIX: 1256 assertInvalid(f, '0.0.0.') 1257 assertInvalid(f, '300.0.0.0') 1258 assertInvalid(f, 'a.0.0.0') 1259 assertInvalid(f, '1.2.3.4.5') 1260 assertInvalid(f, '::1') 1261 1262 self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) 1263 self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) 1264 self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) 1265 self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) 1266 assertInvalid(g, '0.0.0.') 1267 assertInvalid(g, '300.0.0.0') 1268 assertInvalid(g, 'a.0.0.0') 1269 assertInvalid(g, '1.2.3.4.5') 1270 assertInvalid(g, '::1') 1271 1272 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1273 'test needs socket.inet_pton()') 1274 def testIPv6toString(self): 1275 try: 1276 from socket import inet_pton, AF_INET6, has_ipv6 1277 if not has_ipv6: 1278 self.skipTest('IPv6 not available') 1279 except ImportError: 1280 self.skipTest('could not import needed symbols from socket') 1281 1282 if sys.platform == "win32": 1283 try: 1284 inet_pton(AF_INET6, '::') 1285 except OSError as e: 1286 if e.winerror == 10022: 1287 self.skipTest('IPv6 might not be supported') 1288 1289 f = lambda a: inet_pton(AF_INET6, a) 1290 assertInvalid = lambda a: self.assertRaises( 1291 (OSError, ValueError), f, a 1292 ) 1293 1294 self.assertEqual(b'\x00' * 16, f('::')) 1295 self.assertEqual(b'\x00' * 16, f('0::0')) 1296 self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) 1297 self.assertEqual( 1298 b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 1299 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 1300 ) 1301 self.assertEqual( 1302 b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', 1303 f('ad42:abc::127:0:254:2') 1304 ) 1305 self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) 1306 assertInvalid('0x20::') 1307 assertInvalid(':::') 1308 assertInvalid('::0::') 1309 assertInvalid('1::abc::') 1310 assertInvalid('1::abc::def') 1311 assertInvalid('1:2:3:4:5:6') 1312 assertInvalid('1:2:3:4:5:6:') 1313 assertInvalid('1:2:3:4:5:6:7:8:0') 1314 # bpo-29972: inet_pton() doesn't fail on AIX 1315 if not AIX: 1316 assertInvalid('1:2:3:4:5:6:7:8:') 1317 1318 self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', 1319 f('::254.42.23.64') 1320 ) 1321 self.assertEqual( 1322 b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', 1323 f('42::a29b:254.42.23.64') 1324 ) 1325 self.assertEqual( 1326 b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', 1327 f('42:a8b9:0:2:ffff:a29b:254.42.23.64') 1328 ) 1329 assertInvalid('255.254.253.252') 1330 assertInvalid('1::260.2.3.0') 1331 assertInvalid('1::0.be.e.0') 1332 assertInvalid('1:2:3:4:5:6:7:1.2.3.4') 1333 assertInvalid('::1.2.3.4:0') 1334 assertInvalid('0.100.200.0:3:4:5:6:7:8') 1335 1336 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1337 'test needs socket.inet_ntop()') 1338 def testStringToIPv4(self): 1339 from socket import inet_ntoa as f, inet_ntop, AF_INET 1340 g = lambda a: inet_ntop(AF_INET, a) 1341 assertInvalid = lambda func,a: self.assertRaises( 1342 (OSError, ValueError), func, a 1343 ) 1344 1345 self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) 1346 self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) 1347 self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) 1348 self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) 1349 assertInvalid(f, b'\x00' * 3) 1350 assertInvalid(f, b'\x00' * 5) 1351 assertInvalid(f, b'\x00' * 16) 1352 self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) 1353 1354 self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) 1355 self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) 1356 self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) 1357 assertInvalid(g, b'\x00' * 3) 1358 assertInvalid(g, b'\x00' * 5) 1359 assertInvalid(g, b'\x00' * 16) 1360 self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) 1361 1362 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1363 'test needs socket.inet_ntop()') 1364 def testStringToIPv6(self): 1365 try: 1366 from socket import inet_ntop, AF_INET6, has_ipv6 1367 if not has_ipv6: 1368 self.skipTest('IPv6 not available') 1369 except ImportError: 1370 self.skipTest('could not import needed symbols from socket') 1371 1372 if sys.platform == "win32": 1373 try: 1374 inet_ntop(AF_INET6, b'\x00' * 16) 1375 except OSError as e: 1376 if e.winerror == 10022: 1377 self.skipTest('IPv6 might not be supported') 1378 1379 f = lambda a: inet_ntop(AF_INET6, a) 1380 assertInvalid = lambda a: self.assertRaises( 1381 (OSError, ValueError), f, a 1382 ) 1383 1384 self.assertEqual('::', f(b'\x00' * 16)) 1385 self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) 1386 self.assertEqual( 1387 'aef:b01:506:1001:ffff:9997:55:170', 1388 f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 1389 ) 1390 self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) 1391 1392 assertInvalid(b'\x12' * 15) 1393 assertInvalid(b'\x12' * 17) 1394 assertInvalid(b'\x12' * 4) 1395 1396 # XXX The following don't test module-level functionality... 1397 1398 def testSockName(self): 1399 # Testing getsockname() 1400 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1401 self.addCleanup(sock.close) 1402 1403 # Since find_unused_port() is inherently subject to race conditions, we 1404 # call it a couple times if necessary. 1405 for i in itertools.count(): 1406 port = socket_helper.find_unused_port() 1407 try: 1408 sock.bind(("0.0.0.0", port)) 1409 except OSError as e: 1410 if e.errno != errno.EADDRINUSE or i == 5: 1411 raise 1412 else: 1413 break 1414 1415 name = sock.getsockname() 1416 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 1417 # it reasonable to get the host's addr in addition to 0.0.0.0. 1418 # At least for eCos. This is required for the S/390 to pass. 1419 try: 1420 my_ip_addr = socket.gethostbyname(socket.gethostname()) 1421 except OSError: 1422 # Probably name lookup wasn't set up right; skip this test 1423 self.skipTest('name lookup failure') 1424 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 1425 self.assertEqual(name[1], port) 1426 1427 def testGetSockOpt(self): 1428 # Testing getsockopt() 1429 # We know a socket should start without reuse==0 1430 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1431 self.addCleanup(sock.close) 1432 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1433 self.assertFalse(reuse != 0, "initial mode is reuse") 1434 1435 def testSetSockOpt(self): 1436 # Testing setsockopt() 1437 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1438 self.addCleanup(sock.close) 1439 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1440 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1441 self.assertFalse(reuse == 0, "failed to set reuse mode") 1442 1443 def testSendAfterClose(self): 1444 # testing send() after close() with timeout 1445 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1446 sock.settimeout(1) 1447 self.assertRaises(OSError, sock.send, b"spam") 1448 1449 def testCloseException(self): 1450 sock = socket.socket() 1451 sock.bind((socket._LOCALHOST, 0)) 1452 socket.socket(fileno=sock.fileno()).close() 1453 try: 1454 sock.close() 1455 except OSError as err: 1456 # Winsock apparently raises ENOTSOCK 1457 self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) 1458 else: 1459 self.fail("close() should raise EBADF/ENOTSOCK") 1460 1461 def testNewAttributes(self): 1462 # testing .family, .type and .protocol 1463 1464 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1465 self.assertEqual(sock.family, socket.AF_INET) 1466 if hasattr(socket, 'SOCK_CLOEXEC'): 1467 self.assertIn(sock.type, 1468 (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, 1469 socket.SOCK_STREAM)) 1470 else: 1471 self.assertEqual(sock.type, socket.SOCK_STREAM) 1472 self.assertEqual(sock.proto, 0) 1473 1474 def test_getsockaddrarg(self): 1475 sock = socket.socket() 1476 self.addCleanup(sock.close) 1477 port = socket_helper.find_unused_port() 1478 big_port = port + 65536 1479 neg_port = port - 65536 1480 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 1481 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 1482 # Since find_unused_port() is inherently subject to race conditions, we 1483 # call it a couple times if necessary. 1484 for i in itertools.count(): 1485 port = socket_helper.find_unused_port() 1486 try: 1487 sock.bind((HOST, port)) 1488 except OSError as e: 1489 if e.errno != errno.EADDRINUSE or i == 5: 1490 raise 1491 else: 1492 break 1493 1494 @unittest.skipUnless(os.name == "nt", "Windows specific") 1495 def test_sock_ioctl(self): 1496 self.assertTrue(hasattr(socket.socket, 'ioctl')) 1497 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 1498 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 1499 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 1500 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 1501 s = socket.socket() 1502 self.addCleanup(s.close) 1503 self.assertRaises(ValueError, s.ioctl, -1, None) 1504 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 1505 1506 @unittest.skipUnless(os.name == "nt", "Windows specific") 1507 @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), 1508 'Loopback fast path support required for this test') 1509 def test_sio_loopback_fast_path(self): 1510 s = socket.socket() 1511 self.addCleanup(s.close) 1512 try: 1513 s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) 1514 except OSError as exc: 1515 WSAEOPNOTSUPP = 10045 1516 if exc.winerror == WSAEOPNOTSUPP: 1517 self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " 1518 "doesn't implemented in this Windows version") 1519 raise 1520 self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) 1521 1522 def testGetaddrinfo(self): 1523 try: 1524 socket.getaddrinfo('localhost', 80) 1525 except socket.gaierror as err: 1526 if err.errno == socket.EAI_SERVICE: 1527 # see http://bugs.python.org/issue1282647 1528 self.skipTest("buggy libc version") 1529 raise 1530 # len of every sequence is supposed to be == 5 1531 for info in socket.getaddrinfo(HOST, None): 1532 self.assertEqual(len(info), 5) 1533 # host can be a domain name, a string representation of an 1534 # IPv4/v6 address or None 1535 socket.getaddrinfo('localhost', 80) 1536 socket.getaddrinfo('127.0.0.1', 80) 1537 socket.getaddrinfo(None, 80) 1538 if socket_helper.IPV6_ENABLED: 1539 socket.getaddrinfo('::1', 80) 1540 # port can be a string service name such as "http", a numeric 1541 # port number or None 1542 # Issue #26936: Android getaddrinfo() was broken before API level 23. 1543 if (not hasattr(sys, 'getandroidapilevel') or 1544 sys.getandroidapilevel() >= 23): 1545 socket.getaddrinfo(HOST, "http") 1546 socket.getaddrinfo(HOST, 80) 1547 socket.getaddrinfo(HOST, None) 1548 # test family and socktype filters 1549 infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) 1550 for family, type, _, _, _ in infos: 1551 self.assertEqual(family, socket.AF_INET) 1552 self.assertEqual(repr(family), '<AddressFamily.AF_INET: %r>' % family.value) 1553 self.assertEqual(str(family), str(family.value)) 1554 self.assertEqual(type, socket.SOCK_STREAM) 1555 self.assertEqual(repr(type), '<SocketKind.SOCK_STREAM: %r>' % type.value) 1556 self.assertEqual(str(type), str(type.value)) 1557 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1558 for _, socktype, _, _, _ in infos: 1559 self.assertEqual(socktype, socket.SOCK_STREAM) 1560 # test proto and flags arguments 1561 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1562 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1563 # a server willing to support both IPv4 and IPv6 will 1564 # usually do this 1565 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1566 socket.AI_PASSIVE) 1567 # test keyword arguments 1568 a = socket.getaddrinfo(HOST, None) 1569 b = socket.getaddrinfo(host=HOST, port=None) 1570 self.assertEqual(a, b) 1571 a = socket.getaddrinfo(HOST, None, socket.AF_INET) 1572 b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) 1573 self.assertEqual(a, b) 1574 a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1575 b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) 1576 self.assertEqual(a, b) 1577 a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1578 b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) 1579 self.assertEqual(a, b) 1580 a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1581 b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) 1582 self.assertEqual(a, b) 1583 a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1584 socket.AI_PASSIVE) 1585 b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, 1586 type=socket.SOCK_STREAM, proto=0, 1587 flags=socket.AI_PASSIVE) 1588 self.assertEqual(a, b) 1589 # Issue #6697. 1590 self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') 1591 1592 # Issue 17269: test workaround for OS X platform bug segfault 1593 if hasattr(socket, 'AI_NUMERICSERV'): 1594 try: 1595 # The arguments here are undefined and the call may succeed 1596 # or fail. All we care here is that it doesn't segfault. 1597 socket.getaddrinfo("localhost", None, 0, 0, 0, 1598 socket.AI_NUMERICSERV) 1599 except socket.gaierror: 1600 pass 1601 1602 def test_getnameinfo(self): 1603 # only IP addresses are allowed 1604 self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) 1605 1606 @unittest.skipUnless(support.is_resource_enabled('network'), 1607 'network is not enabled') 1608 def test_idna(self): 1609 # Check for internet access before running test 1610 # (issue #12804, issue #25138). 1611 with socket_helper.transient_internet('python.org'): 1612 socket.gethostbyname('python.org') 1613 1614 # these should all be successful 1615 domain = 'испытание.pythontest.net' 1616 socket.gethostbyname(domain) 1617 socket.gethostbyname_ex(domain) 1618 socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) 1619 # this may not work if the forward lookup chooses the IPv6 address, as that doesn't 1620 # have a reverse entry yet 1621 # socket.gethostbyaddr('испытание.python.org') 1622 1623 def check_sendall_interrupted(self, with_timeout): 1624 # socketpair() is not strictly required, but it makes things easier. 1625 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 1626 self.skipTest("signal.alarm and socket.socketpair required for this test") 1627 # Our signal handlers clobber the C errno by calling a math function 1628 # with an invalid domain value. 1629 def ok_handler(*args): 1630 self.assertRaises(ValueError, math.acosh, 0) 1631 def raising_handler(*args): 1632 self.assertRaises(ValueError, math.acosh, 0) 1633 1 // 0 1634 c, s = socket.socketpair() 1635 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 1636 try: 1637 if with_timeout: 1638 # Just above the one second minimum for signal.alarm 1639 c.settimeout(1.5) 1640 with self.assertRaises(ZeroDivisionError): 1641 signal.alarm(1) 1642 c.sendall(b"x" * support.SOCK_MAX_SIZE) 1643 if with_timeout: 1644 signal.signal(signal.SIGALRM, ok_handler) 1645 signal.alarm(1) 1646 self.assertRaises(TimeoutError, c.sendall, 1647 b"x" * support.SOCK_MAX_SIZE) 1648 finally: 1649 signal.alarm(0) 1650 signal.signal(signal.SIGALRM, old_alarm) 1651 c.close() 1652 s.close() 1653 1654 def test_sendall_interrupted(self): 1655 self.check_sendall_interrupted(False) 1656 1657 def test_sendall_interrupted_with_timeout(self): 1658 self.check_sendall_interrupted(True) 1659 1660 def test_dealloc_warn(self): 1661 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1662 r = repr(sock) 1663 with self.assertWarns(ResourceWarning) as cm: 1664 sock = None 1665 support.gc_collect() 1666 self.assertIn(r, str(cm.warning.args[0])) 1667 # An open socket file object gets dereferenced after the socket 1668 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1669 f = sock.makefile('rb') 1670 r = repr(sock) 1671 sock = None 1672 support.gc_collect() 1673 with self.assertWarns(ResourceWarning): 1674 f = None 1675 support.gc_collect() 1676 1677 def test_name_closed_socketio(self): 1678 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1679 fp = sock.makefile("rb") 1680 fp.close() 1681 self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") 1682 1683 def test_unusable_closed_socketio(self): 1684 with socket.socket() as sock: 1685 fp = sock.makefile("rb", buffering=0) 1686 self.assertTrue(fp.readable()) 1687 self.assertFalse(fp.writable()) 1688 self.assertFalse(fp.seekable()) 1689 fp.close() 1690 self.assertRaises(ValueError, fp.readable) 1691 self.assertRaises(ValueError, fp.writable) 1692 self.assertRaises(ValueError, fp.seekable) 1693 1694 def test_socket_close(self): 1695 sock = socket.socket() 1696 try: 1697 sock.bind((HOST, 0)) 1698 socket.close(sock.fileno()) 1699 with self.assertRaises(OSError): 1700 sock.listen(1) 1701 finally: 1702 with self.assertRaises(OSError): 1703 # sock.close() fails with EBADF 1704 sock.close() 1705 with self.assertRaises(TypeError): 1706 socket.close(None) 1707 with self.assertRaises(OSError): 1708 socket.close(-1) 1709 1710 def test_makefile_mode(self): 1711 for mode in 'r', 'rb', 'rw', 'w', 'wb': 1712 with self.subTest(mode=mode): 1713 with socket.socket() as sock: 1714 encoding = None if "b" in mode else "utf-8" 1715 with sock.makefile(mode, encoding=encoding) as fp: 1716 self.assertEqual(fp.mode, mode) 1717 1718 def test_makefile_invalid_mode(self): 1719 for mode in 'rt', 'x', '+', 'a': 1720 with self.subTest(mode=mode): 1721 with socket.socket() as sock: 1722 with self.assertRaisesRegex(ValueError, 'invalid mode'): 1723 sock.makefile(mode) 1724 1725 def test_pickle(self): 1726 sock = socket.socket() 1727 with sock: 1728 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1729 self.assertRaises(TypeError, pickle.dumps, sock, protocol) 1730 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1731 family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) 1732 self.assertEqual(family, socket.AF_INET) 1733 type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) 1734 self.assertEqual(type, socket.SOCK_STREAM) 1735 1736 def test_listen_backlog(self): 1737 for backlog in 0, -1: 1738 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1739 srv.bind((HOST, 0)) 1740 srv.listen(backlog) 1741 1742 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1743 srv.bind((HOST, 0)) 1744 srv.listen() 1745 1746 @support.cpython_only 1747 def test_listen_backlog_overflow(self): 1748 # Issue 15989 1749 import _testcapi 1750 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1751 srv.bind((HOST, 0)) 1752 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 1753 1754 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1755 def test_flowinfo(self): 1756 self.assertRaises(OverflowError, socket.getnameinfo, 1757 (socket_helper.HOSTv6, 0, 0xffffffff), 0) 1758 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 1759 self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10)) 1760 1761 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1762 def test_getaddrinfo_ipv6_basic(self): 1763 ((*_, sockaddr),) = socket.getaddrinfo( 1764 'ff02::1de:c0:face:8D', # Note capital letter `D`. 1765 1234, socket.AF_INET6, 1766 socket.SOCK_DGRAM, 1767 socket.IPPROTO_UDP 1768 ) 1769 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) 1770 1771 def test_getfqdn_filter_localhost(self): 1772 self.assertEqual(socket.getfqdn(), socket.getfqdn("0.0.0.0")) 1773 self.assertEqual(socket.getfqdn(), socket.getfqdn("::")) 1774 1775 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1776 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1777 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1778 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") 1779 def test_getaddrinfo_ipv6_scopeid_symbolic(self): 1780 # Just pick up any network interface (Linux, Mac OS X) 1781 (ifindex, test_interface) = socket.if_nameindex()[0] 1782 ((*_, sockaddr),) = socket.getaddrinfo( 1783 'ff02::1de:c0:face:8D%' + test_interface, 1784 1234, socket.AF_INET6, 1785 socket.SOCK_DGRAM, 1786 socket.IPPROTO_UDP 1787 ) 1788 # Note missing interface name part in IPv6 address 1789 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1790 1791 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1792 @unittest.skipUnless( 1793 sys.platform == 'win32', 1794 'Numeric scope id does not work or undocumented') 1795 def test_getaddrinfo_ipv6_scopeid_numeric(self): 1796 # Also works on Linux and Mac OS X, but is not documented (?) 1797 # Windows, Linux and Max OS X allow nonexistent interface numbers here. 1798 ifindex = 42 1799 ((*_, sockaddr),) = socket.getaddrinfo( 1800 'ff02::1de:c0:face:8D%' + str(ifindex), 1801 1234, socket.AF_INET6, 1802 socket.SOCK_DGRAM, 1803 socket.IPPROTO_UDP 1804 ) 1805 # Note missing interface name part in IPv6 address 1806 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1807 1808 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1809 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1810 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1811 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") 1812 def test_getnameinfo_ipv6_scopeid_symbolic(self): 1813 # Just pick up any network interface. 1814 (ifindex, test_interface) = socket.if_nameindex()[0] 1815 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1816 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1817 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) 1818 1819 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1820 @unittest.skipUnless( sys.platform == 'win32', 1821 'Numeric scope id does not work or undocumented') 1822 def test_getnameinfo_ipv6_scopeid_numeric(self): 1823 # Also works on Linux (undocumented), but does not work on Mac OS X 1824 # Windows and Linux allow nonexistent interface numbers here. 1825 ifindex = 42 1826 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1827 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1828 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234')) 1829 1830 def test_str_for_enums(self): 1831 # Make sure that the AF_* and SOCK_* constants have enum-like string 1832 # reprs. 1833 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1834 self.assertEqual(repr(s.family), '<AddressFamily.AF_INET: %r>' % s.family.value) 1835 self.assertEqual(repr(s.type), '<SocketKind.SOCK_STREAM: %r>' % s.type.value) 1836 self.assertEqual(str(s.family), str(s.family.value)) 1837 self.assertEqual(str(s.type), str(s.type.value)) 1838 1839 def test_socket_consistent_sock_type(self): 1840 SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) 1841 SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0) 1842 sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC 1843 1844 with socket.socket(socket.AF_INET, sock_type) as s: 1845 self.assertEqual(s.type, socket.SOCK_STREAM) 1846 s.settimeout(1) 1847 self.assertEqual(s.type, socket.SOCK_STREAM) 1848 s.settimeout(0) 1849 self.assertEqual(s.type, socket.SOCK_STREAM) 1850 s.setblocking(True) 1851 self.assertEqual(s.type, socket.SOCK_STREAM) 1852 s.setblocking(False) 1853 self.assertEqual(s.type, socket.SOCK_STREAM) 1854 1855 def test_unknown_socket_family_repr(self): 1856 # Test that when created with a family that's not one of the known 1857 # AF_*/SOCK_* constants, socket.family just returns the number. 1858 # 1859 # To do this we fool socket.socket into believing it already has an 1860 # open fd because on this path it doesn't actually verify the family and 1861 # type and populates the socket object. 1862 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1863 fd = sock.detach() 1864 unknown_family = max(socket.AddressFamily.__members__.values()) + 1 1865 1866 unknown_type = max( 1867 kind 1868 for name, kind in socket.SocketKind.__members__.items() 1869 if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'} 1870 ) + 1 1871 1872 with socket.socket( 1873 family=unknown_family, type=unknown_type, proto=23, 1874 fileno=fd) as s: 1875 self.assertEqual(s.family, unknown_family) 1876 self.assertEqual(s.type, unknown_type) 1877 # some OS like macOS ignore proto 1878 self.assertIn(s.proto, {0, 23}) 1879 1880 @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') 1881 def test__sendfile_use_sendfile(self): 1882 class File: 1883 def __init__(self, fd): 1884 self.fd = fd 1885 1886 def fileno(self): 1887 return self.fd 1888 with socket.socket() as sock: 1889 fd = os.open(os.curdir, os.O_RDONLY) 1890 os.close(fd) 1891 with self.assertRaises(socket._GiveupOnSendfile): 1892 sock._sendfile_use_sendfile(File(fd)) 1893 with self.assertRaises(OverflowError): 1894 sock._sendfile_use_sendfile(File(2**1000)) 1895 with self.assertRaises(TypeError): 1896 sock._sendfile_use_sendfile(File(None)) 1897 1898 def _test_socket_fileno(self, s, family, stype): 1899 self.assertEqual(s.family, family) 1900 self.assertEqual(s.type, stype) 1901 1902 fd = s.fileno() 1903 s2 = socket.socket(fileno=fd) 1904 self.addCleanup(s2.close) 1905 # detach old fd to avoid double close 1906 s.detach() 1907 self.assertEqual(s2.family, family) 1908 self.assertEqual(s2.type, stype) 1909 self.assertEqual(s2.fileno(), fd) 1910 1911 def test_socket_fileno(self): 1912 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1913 self.addCleanup(s.close) 1914 s.bind((socket_helper.HOST, 0)) 1915 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) 1916 1917 if hasattr(socket, "SOCK_DGRAM"): 1918 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1919 self.addCleanup(s.close) 1920 s.bind((socket_helper.HOST, 0)) 1921 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) 1922 1923 if socket_helper.IPV6_ENABLED: 1924 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 1925 self.addCleanup(s.close) 1926 s.bind((socket_helper.HOSTv6, 0, 0, 0)) 1927 self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) 1928 1929 if hasattr(socket, "AF_UNIX"): 1930 tmpdir = tempfile.mkdtemp() 1931 self.addCleanup(shutil.rmtree, tmpdir) 1932 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1933 self.addCleanup(s.close) 1934 try: 1935 s.bind(os.path.join(tmpdir, 'socket')) 1936 except PermissionError: 1937 pass 1938 else: 1939 self._test_socket_fileno(s, socket.AF_UNIX, 1940 socket.SOCK_STREAM) 1941 1942 def test_socket_fileno_rejects_float(self): 1943 with self.assertRaises(TypeError): 1944 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5) 1945 1946 def test_socket_fileno_rejects_other_types(self): 1947 with self.assertRaises(TypeError): 1948 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo") 1949 1950 def test_socket_fileno_rejects_invalid_socket(self): 1951 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1952 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1) 1953 1954 @unittest.skipIf(os.name == "nt", "Windows disallows -1 only") 1955 def test_socket_fileno_rejects_negative(self): 1956 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1957 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42) 1958 1959 def test_socket_fileno_requires_valid_fd(self): 1960 WSAENOTSOCK = 10038 1961 with self.assertRaises(OSError) as cm: 1962 socket.socket(fileno=os_helper.make_bad_fd()) 1963 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1964 1965 with self.assertRaises(OSError) as cm: 1966 socket.socket( 1967 socket.AF_INET, 1968 socket.SOCK_STREAM, 1969 fileno=os_helper.make_bad_fd()) 1970 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1971 1972 def test_socket_fileno_requires_socket_fd(self): 1973 with tempfile.NamedTemporaryFile() as afile: 1974 with self.assertRaises(OSError): 1975 socket.socket(fileno=afile.fileno()) 1976 1977 with self.assertRaises(OSError) as cm: 1978 socket.socket( 1979 socket.AF_INET, 1980 socket.SOCK_STREAM, 1981 fileno=afile.fileno()) 1982 self.assertEqual(cm.exception.errno, errno.ENOTSOCK) 1983 1984 def test_addressfamily_enum(self): 1985 import _socket, enum 1986 CheckedAddressFamily = enum._old_convert_( 1987 enum.IntEnum, 'AddressFamily', 'socket', 1988 lambda C: C.isupper() and C.startswith('AF_'), 1989 source=_socket, 1990 ) 1991 enum._test_simple_enum(CheckedAddressFamily, socket.AddressFamily) 1992 1993 def test_socketkind_enum(self): 1994 import _socket, enum 1995 CheckedSocketKind = enum._old_convert_( 1996 enum.IntEnum, 'SocketKind', 'socket', 1997 lambda C: C.isupper() and C.startswith('SOCK_'), 1998 source=_socket, 1999 ) 2000 enum._test_simple_enum(CheckedSocketKind, socket.SocketKind) 2001 2002 def test_msgflag_enum(self): 2003 import _socket, enum 2004 CheckedMsgFlag = enum._old_convert_( 2005 enum.IntFlag, 'MsgFlag', 'socket', 2006 lambda C: C.isupper() and C.startswith('MSG_'), 2007 source=_socket, 2008 ) 2009 enum._test_simple_enum(CheckedMsgFlag, socket.MsgFlag) 2010 2011 def test_addressinfo_enum(self): 2012 import _socket, enum 2013 CheckedAddressInfo = enum._old_convert_( 2014 enum.IntFlag, 'AddressInfo', 'socket', 2015 lambda C: C.isupper() and C.startswith('AI_'), 2016 source=_socket) 2017 enum._test_simple_enum(CheckedAddressInfo, socket.AddressInfo) 2018 2019 2020@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 2021class BasicCANTest(unittest.TestCase): 2022 2023 def testCrucialConstants(self): 2024 socket.AF_CAN 2025 socket.PF_CAN 2026 socket.CAN_RAW 2027 2028 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2029 'socket.CAN_BCM required for this test.') 2030 def testBCMConstants(self): 2031 socket.CAN_BCM 2032 2033 # opcodes 2034 socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task 2035 socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task 2036 socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task 2037 socket.CAN_BCM_TX_SEND # send one CAN frame 2038 socket.CAN_BCM_RX_SETUP # create RX content filter subscription 2039 socket.CAN_BCM_RX_DELETE # remove RX content filter subscription 2040 socket.CAN_BCM_RX_READ # read properties of RX content filter subscription 2041 socket.CAN_BCM_TX_STATUS # reply to TX_READ request 2042 socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) 2043 socket.CAN_BCM_RX_STATUS # reply to RX_READ request 2044 socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent 2045 socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) 2046 2047 # flags 2048 socket.CAN_BCM_SETTIMER 2049 socket.CAN_BCM_STARTTIMER 2050 socket.CAN_BCM_TX_COUNTEVT 2051 socket.CAN_BCM_TX_ANNOUNCE 2052 socket.CAN_BCM_TX_CP_CAN_ID 2053 socket.CAN_BCM_RX_FILTER_ID 2054 socket.CAN_BCM_RX_CHECK_DLC 2055 socket.CAN_BCM_RX_NO_AUTOTIMER 2056 socket.CAN_BCM_RX_ANNOUNCE_RESUME 2057 socket.CAN_BCM_TX_RESET_MULTI_IDX 2058 socket.CAN_BCM_RX_RTR_FRAME 2059 2060 def testCreateSocket(self): 2061 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2062 pass 2063 2064 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2065 'socket.CAN_BCM required for this test.') 2066 def testCreateBCMSocket(self): 2067 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: 2068 pass 2069 2070 def testBindAny(self): 2071 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2072 address = ('', ) 2073 s.bind(address) 2074 self.assertEqual(s.getsockname(), address) 2075 2076 def testTooLongInterfaceName(self): 2077 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2078 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2079 self.assertRaisesRegex(OSError, 'interface name too long', 2080 s.bind, ('x' * 1024,)) 2081 2082 @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), 2083 'socket.CAN_RAW_LOOPBACK required for this test.') 2084 def testLoopback(self): 2085 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2086 for loopback in (0, 1): 2087 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, 2088 loopback) 2089 self.assertEqual(loopback, 2090 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) 2091 2092 @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), 2093 'socket.CAN_RAW_FILTER required for this test.') 2094 def testFilter(self): 2095 can_id, can_mask = 0x200, 0x700 2096 can_filter = struct.pack("=II", can_id, can_mask) 2097 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2098 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) 2099 self.assertEqual(can_filter, 2100 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) 2101 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) 2102 2103 2104@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 2105class CANTest(ThreadedCANSocketTest): 2106 2107 def __init__(self, methodName='runTest'): 2108 ThreadedCANSocketTest.__init__(self, methodName=methodName) 2109 2110 @classmethod 2111 def build_can_frame(cls, can_id, data): 2112 """Build a CAN frame.""" 2113 can_dlc = len(data) 2114 data = data.ljust(8, b'\x00') 2115 return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) 2116 2117 @classmethod 2118 def dissect_can_frame(cls, frame): 2119 """Dissect a CAN frame.""" 2120 can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) 2121 return (can_id, can_dlc, data[:can_dlc]) 2122 2123 def testSendFrame(self): 2124 cf, addr = self.s.recvfrom(self.bufsize) 2125 self.assertEqual(self.cf, cf) 2126 self.assertEqual(addr[0], self.interface) 2127 2128 def _testSendFrame(self): 2129 self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') 2130 self.cli.send(self.cf) 2131 2132 def testSendMaxFrame(self): 2133 cf, addr = self.s.recvfrom(self.bufsize) 2134 self.assertEqual(self.cf, cf) 2135 2136 def _testSendMaxFrame(self): 2137 self.cf = self.build_can_frame(0x00, b'\x07' * 8) 2138 self.cli.send(self.cf) 2139 2140 def testSendMultiFrames(self): 2141 cf, addr = self.s.recvfrom(self.bufsize) 2142 self.assertEqual(self.cf1, cf) 2143 2144 cf, addr = self.s.recvfrom(self.bufsize) 2145 self.assertEqual(self.cf2, cf) 2146 2147 def _testSendMultiFrames(self): 2148 self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') 2149 self.cli.send(self.cf1) 2150 2151 self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') 2152 self.cli.send(self.cf2) 2153 2154 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2155 'socket.CAN_BCM required for this test.') 2156 def _testBCM(self): 2157 cf, addr = self.cli.recvfrom(self.bufsize) 2158 self.assertEqual(self.cf, cf) 2159 can_id, can_dlc, data = self.dissect_can_frame(cf) 2160 self.assertEqual(self.can_id, can_id) 2161 self.assertEqual(self.data, data) 2162 2163 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2164 'socket.CAN_BCM required for this test.') 2165 def testBCM(self): 2166 bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) 2167 self.addCleanup(bcm.close) 2168 bcm.connect((self.interface,)) 2169 self.can_id = 0x123 2170 self.data = bytes([0xc0, 0xff, 0xee]) 2171 self.cf = self.build_can_frame(self.can_id, self.data) 2172 opcode = socket.CAN_BCM_TX_SEND 2173 flags = 0 2174 count = 0 2175 ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 2176 bcm_can_id = 0x0222 2177 nframes = 1 2178 assert len(self.cf) == 16 2179 header = struct.pack(self.bcm_cmd_msg_fmt, 2180 opcode, 2181 flags, 2182 count, 2183 ival1_seconds, 2184 ival1_usec, 2185 ival2_seconds, 2186 ival2_usec, 2187 bcm_can_id, 2188 nframes, 2189 ) 2190 header_plus_frame = header + self.cf 2191 bytes_sent = bcm.send(header_plus_frame) 2192 self.assertEqual(bytes_sent, len(header_plus_frame)) 2193 2194 2195@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.') 2196class ISOTPTest(unittest.TestCase): 2197 2198 def __init__(self, *args, **kwargs): 2199 super().__init__(*args, **kwargs) 2200 self.interface = "vcan0" 2201 2202 def testCrucialConstants(self): 2203 socket.AF_CAN 2204 socket.PF_CAN 2205 socket.CAN_ISOTP 2206 socket.SOCK_DGRAM 2207 2208 def testCreateSocket(self): 2209 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2210 pass 2211 2212 @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"), 2213 'socket.CAN_ISOTP required for this test.') 2214 def testCreateISOTPSocket(self): 2215 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2216 pass 2217 2218 def testTooLongInterfaceName(self): 2219 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2220 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2221 with self.assertRaisesRegex(OSError, 'interface name too long'): 2222 s.bind(('x' * 1024, 1, 2)) 2223 2224 def testBind(self): 2225 try: 2226 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2227 addr = self.interface, 0x123, 0x456 2228 s.bind(addr) 2229 self.assertEqual(s.getsockname(), addr) 2230 except OSError as e: 2231 if e.errno == errno.ENODEV: 2232 self.skipTest('network interface `%s` does not exist' % 2233 self.interface) 2234 else: 2235 raise 2236 2237 2238@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.') 2239class J1939Test(unittest.TestCase): 2240 2241 def __init__(self, *args, **kwargs): 2242 super().__init__(*args, **kwargs) 2243 self.interface = "vcan0" 2244 2245 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2246 'socket.CAN_J1939 required for this test.') 2247 def testJ1939Constants(self): 2248 socket.CAN_J1939 2249 2250 socket.J1939_MAX_UNICAST_ADDR 2251 socket.J1939_IDLE_ADDR 2252 socket.J1939_NO_ADDR 2253 socket.J1939_NO_NAME 2254 socket.J1939_PGN_REQUEST 2255 socket.J1939_PGN_ADDRESS_CLAIMED 2256 socket.J1939_PGN_ADDRESS_COMMANDED 2257 socket.J1939_PGN_PDU1_MAX 2258 socket.J1939_PGN_MAX 2259 socket.J1939_NO_PGN 2260 2261 # J1939 socket options 2262 socket.SO_J1939_FILTER 2263 socket.SO_J1939_PROMISC 2264 socket.SO_J1939_SEND_PRIO 2265 socket.SO_J1939_ERRQUEUE 2266 2267 socket.SCM_J1939_DEST_ADDR 2268 socket.SCM_J1939_DEST_NAME 2269 socket.SCM_J1939_PRIO 2270 socket.SCM_J1939_ERRQUEUE 2271 2272 socket.J1939_NLA_PAD 2273 socket.J1939_NLA_BYTES_ACKED 2274 2275 socket.J1939_EE_INFO_NONE 2276 socket.J1939_EE_INFO_TX_ABORT 2277 2278 socket.J1939_FILTER_MAX 2279 2280 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2281 'socket.CAN_J1939 required for this test.') 2282 def testCreateJ1939Socket(self): 2283 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2284 pass 2285 2286 def testBind(self): 2287 try: 2288 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2289 addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR 2290 s.bind(addr) 2291 self.assertEqual(s.getsockname(), addr) 2292 except OSError as e: 2293 if e.errno == errno.ENODEV: 2294 self.skipTest('network interface `%s` does not exist' % 2295 self.interface) 2296 else: 2297 raise 2298 2299 2300@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2301class BasicRDSTest(unittest.TestCase): 2302 2303 def testCrucialConstants(self): 2304 socket.AF_RDS 2305 socket.PF_RDS 2306 2307 def testCreateSocket(self): 2308 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2309 pass 2310 2311 def testSocketBufferSize(self): 2312 bufsize = 16384 2313 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2314 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) 2315 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) 2316 2317 2318@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2319class RDSTest(ThreadedRDSSocketTest): 2320 2321 def __init__(self, methodName='runTest'): 2322 ThreadedRDSSocketTest.__init__(self, methodName=methodName) 2323 2324 def setUp(self): 2325 super().setUp() 2326 self.evt = threading.Event() 2327 2328 def testSendAndRecv(self): 2329 data, addr = self.serv.recvfrom(self.bufsize) 2330 self.assertEqual(self.data, data) 2331 self.assertEqual(self.cli_addr, addr) 2332 2333 def _testSendAndRecv(self): 2334 self.data = b'spam' 2335 self.cli.sendto(self.data, 0, (HOST, self.port)) 2336 2337 def testPeek(self): 2338 data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) 2339 self.assertEqual(self.data, data) 2340 data, addr = self.serv.recvfrom(self.bufsize) 2341 self.assertEqual(self.data, data) 2342 2343 def _testPeek(self): 2344 self.data = b'spam' 2345 self.cli.sendto(self.data, 0, (HOST, self.port)) 2346 2347 @requireAttrs(socket.socket, 'recvmsg') 2348 def testSendAndRecvMsg(self): 2349 data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) 2350 self.assertEqual(self.data, data) 2351 2352 @requireAttrs(socket.socket, 'sendmsg') 2353 def _testSendAndRecvMsg(self): 2354 self.data = b'hello ' * 10 2355 self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) 2356 2357 def testSendAndRecvMulti(self): 2358 data, addr = self.serv.recvfrom(self.bufsize) 2359 self.assertEqual(self.data1, data) 2360 2361 data, addr = self.serv.recvfrom(self.bufsize) 2362 self.assertEqual(self.data2, data) 2363 2364 def _testSendAndRecvMulti(self): 2365 self.data1 = b'bacon' 2366 self.cli.sendto(self.data1, 0, (HOST, self.port)) 2367 2368 self.data2 = b'egg' 2369 self.cli.sendto(self.data2, 0, (HOST, self.port)) 2370 2371 def testSelect(self): 2372 r, w, x = select.select([self.serv], [], [], 3.0) 2373 self.assertIn(self.serv, r) 2374 data, addr = self.serv.recvfrom(self.bufsize) 2375 self.assertEqual(self.data, data) 2376 2377 def _testSelect(self): 2378 self.data = b'select' 2379 self.cli.sendto(self.data, 0, (HOST, self.port)) 2380 2381@unittest.skipUnless(HAVE_SOCKET_QIPCRTR, 2382 'QIPCRTR sockets required for this test.') 2383class BasicQIPCRTRTest(unittest.TestCase): 2384 2385 def testCrucialConstants(self): 2386 socket.AF_QIPCRTR 2387 2388 def testCreateSocket(self): 2389 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2390 pass 2391 2392 def testUnbound(self): 2393 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2394 self.assertEqual(s.getsockname()[1], 0) 2395 2396 def testBindSock(self): 2397 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2398 socket_helper.bind_port(s, host=s.getsockname()[0]) 2399 self.assertNotEqual(s.getsockname()[1], 0) 2400 2401 def testInvalidBindSock(self): 2402 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2403 self.assertRaises(OSError, socket_helper.bind_port, s, host=-2) 2404 2405 def testAutoBindSock(self): 2406 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2407 s.connect((123, 123)) 2408 self.assertNotEqual(s.getsockname()[1], 0) 2409 2410@unittest.skipIf(fcntl is None, "need fcntl") 2411@unittest.skipUnless(HAVE_SOCKET_VSOCK, 2412 'VSOCK sockets required for this test.') 2413class BasicVSOCKTest(unittest.TestCase): 2414 2415 def testCrucialConstants(self): 2416 socket.AF_VSOCK 2417 2418 def testVSOCKConstants(self): 2419 socket.SO_VM_SOCKETS_BUFFER_SIZE 2420 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE 2421 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE 2422 socket.VMADDR_CID_ANY 2423 socket.VMADDR_PORT_ANY 2424 socket.VMADDR_CID_HOST 2425 socket.VM_SOCKETS_INVALID_VERSION 2426 socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID 2427 2428 def testCreateSocket(self): 2429 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2430 pass 2431 2432 def testSocketBufferSize(self): 2433 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2434 orig_max = s.getsockopt(socket.AF_VSOCK, 2435 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE) 2436 orig = s.getsockopt(socket.AF_VSOCK, 2437 socket.SO_VM_SOCKETS_BUFFER_SIZE) 2438 orig_min = s.getsockopt(socket.AF_VSOCK, 2439 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE) 2440 2441 s.setsockopt(socket.AF_VSOCK, 2442 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2) 2443 s.setsockopt(socket.AF_VSOCK, 2444 socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2) 2445 s.setsockopt(socket.AF_VSOCK, 2446 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2) 2447 2448 self.assertEqual(orig_max * 2, 2449 s.getsockopt(socket.AF_VSOCK, 2450 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)) 2451 self.assertEqual(orig * 2, 2452 s.getsockopt(socket.AF_VSOCK, 2453 socket.SO_VM_SOCKETS_BUFFER_SIZE)) 2454 self.assertEqual(orig_min * 2, 2455 s.getsockopt(socket.AF_VSOCK, 2456 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)) 2457 2458 2459@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH, 2460 'Bluetooth sockets required for this test.') 2461class BasicBluetoothTest(unittest.TestCase): 2462 2463 def testBluetoothConstants(self): 2464 socket.BDADDR_ANY 2465 socket.BDADDR_LOCAL 2466 socket.AF_BLUETOOTH 2467 socket.BTPROTO_RFCOMM 2468 2469 if sys.platform != "win32": 2470 socket.BTPROTO_HCI 2471 socket.SOL_HCI 2472 socket.BTPROTO_L2CAP 2473 2474 if not sys.platform.startswith("freebsd"): 2475 socket.BTPROTO_SCO 2476 2477 def testCreateRfcommSocket(self): 2478 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: 2479 pass 2480 2481 @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets") 2482 def testCreateL2capSocket(self): 2483 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s: 2484 pass 2485 2486 @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets") 2487 def testCreateHciSocket(self): 2488 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: 2489 pass 2490 2491 @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"), 2492 "windows and freebsd do not support SCO sockets") 2493 def testCreateScoSocket(self): 2494 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: 2495 pass 2496 2497 2498class BasicTCPTest(SocketConnectedTest): 2499 2500 def __init__(self, methodName='runTest'): 2501 SocketConnectedTest.__init__(self, methodName=methodName) 2502 2503 def testRecv(self): 2504 # Testing large receive over TCP 2505 msg = self.cli_conn.recv(1024) 2506 self.assertEqual(msg, MSG) 2507 2508 def _testRecv(self): 2509 self.serv_conn.send(MSG) 2510 2511 def testOverFlowRecv(self): 2512 # Testing receive in chunks over TCP 2513 seg1 = self.cli_conn.recv(len(MSG) - 3) 2514 seg2 = self.cli_conn.recv(1024) 2515 msg = seg1 + seg2 2516 self.assertEqual(msg, MSG) 2517 2518 def _testOverFlowRecv(self): 2519 self.serv_conn.send(MSG) 2520 2521 def testRecvFrom(self): 2522 # Testing large recvfrom() over TCP 2523 msg, addr = self.cli_conn.recvfrom(1024) 2524 self.assertEqual(msg, MSG) 2525 2526 def _testRecvFrom(self): 2527 self.serv_conn.send(MSG) 2528 2529 def testOverFlowRecvFrom(self): 2530 # Testing recvfrom() in chunks over TCP 2531 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 2532 seg2, addr = self.cli_conn.recvfrom(1024) 2533 msg = seg1 + seg2 2534 self.assertEqual(msg, MSG) 2535 2536 def _testOverFlowRecvFrom(self): 2537 self.serv_conn.send(MSG) 2538 2539 def testSendAll(self): 2540 # Testing sendall() with a 2048 byte string over TCP 2541 msg = b'' 2542 while 1: 2543 read = self.cli_conn.recv(1024) 2544 if not read: 2545 break 2546 msg += read 2547 self.assertEqual(msg, b'f' * 2048) 2548 2549 def _testSendAll(self): 2550 big_chunk = b'f' * 2048 2551 self.serv_conn.sendall(big_chunk) 2552 2553 def testFromFd(self): 2554 # Testing fromfd() 2555 fd = self.cli_conn.fileno() 2556 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 2557 self.addCleanup(sock.close) 2558 self.assertIsInstance(sock, socket.socket) 2559 msg = sock.recv(1024) 2560 self.assertEqual(msg, MSG) 2561 2562 def _testFromFd(self): 2563 self.serv_conn.send(MSG) 2564 2565 def testDup(self): 2566 # Testing dup() 2567 sock = self.cli_conn.dup() 2568 self.addCleanup(sock.close) 2569 msg = sock.recv(1024) 2570 self.assertEqual(msg, MSG) 2571 2572 def _testDup(self): 2573 self.serv_conn.send(MSG) 2574 2575 def testShutdown(self): 2576 # Testing shutdown() 2577 msg = self.cli_conn.recv(1024) 2578 self.assertEqual(msg, MSG) 2579 # wait for _testShutdown to finish: on OS X, when the server 2580 # closes the connection the client also becomes disconnected, 2581 # and the client's shutdown call will fail. (Issue #4397.) 2582 self.done.wait() 2583 2584 def _testShutdown(self): 2585 self.serv_conn.send(MSG) 2586 self.serv_conn.shutdown(2) 2587 2588 testShutdown_overflow = support.cpython_only(testShutdown) 2589 2590 @support.cpython_only 2591 def _testShutdown_overflow(self): 2592 import _testcapi 2593 self.serv_conn.send(MSG) 2594 # Issue 15989 2595 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2596 _testcapi.INT_MAX + 1) 2597 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2598 2 + (_testcapi.UINT_MAX + 1)) 2599 self.serv_conn.shutdown(2) 2600 2601 def testDetach(self): 2602 # Testing detach() 2603 fileno = self.cli_conn.fileno() 2604 f = self.cli_conn.detach() 2605 self.assertEqual(f, fileno) 2606 # cli_conn cannot be used anymore... 2607 self.assertTrue(self.cli_conn._closed) 2608 self.assertRaises(OSError, self.cli_conn.recv, 1024) 2609 self.cli_conn.close() 2610 # ...but we can create another socket using the (still open) 2611 # file descriptor 2612 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) 2613 self.addCleanup(sock.close) 2614 msg = sock.recv(1024) 2615 self.assertEqual(msg, MSG) 2616 2617 def _testDetach(self): 2618 self.serv_conn.send(MSG) 2619 2620 2621class BasicUDPTest(ThreadedUDPSocketTest): 2622 2623 def __init__(self, methodName='runTest'): 2624 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 2625 2626 def testSendtoAndRecv(self): 2627 # Testing sendto() and Recv() over UDP 2628 msg = self.serv.recv(len(MSG)) 2629 self.assertEqual(msg, MSG) 2630 2631 def _testSendtoAndRecv(self): 2632 self.cli.sendto(MSG, 0, (HOST, self.port)) 2633 2634 def testRecvFrom(self): 2635 # Testing recvfrom() over UDP 2636 msg, addr = self.serv.recvfrom(len(MSG)) 2637 self.assertEqual(msg, MSG) 2638 2639 def _testRecvFrom(self): 2640 self.cli.sendto(MSG, 0, (HOST, self.port)) 2641 2642 def testRecvFromNegative(self): 2643 # Negative lengths passed to recvfrom should give ValueError. 2644 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2645 2646 def _testRecvFromNegative(self): 2647 self.cli.sendto(MSG, 0, (HOST, self.port)) 2648 2649 2650@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 2651 'UDPLITE sockets required for this test.') 2652class BasicUDPLITETest(ThreadedUDPLITESocketTest): 2653 2654 def __init__(self, methodName='runTest'): 2655 ThreadedUDPLITESocketTest.__init__(self, methodName=methodName) 2656 2657 def testSendtoAndRecv(self): 2658 # Testing sendto() and Recv() over UDPLITE 2659 msg = self.serv.recv(len(MSG)) 2660 self.assertEqual(msg, MSG) 2661 2662 def _testSendtoAndRecv(self): 2663 self.cli.sendto(MSG, 0, (HOST, self.port)) 2664 2665 def testRecvFrom(self): 2666 # Testing recvfrom() over UDPLITE 2667 msg, addr = self.serv.recvfrom(len(MSG)) 2668 self.assertEqual(msg, MSG) 2669 2670 def _testRecvFrom(self): 2671 self.cli.sendto(MSG, 0, (HOST, self.port)) 2672 2673 def testRecvFromNegative(self): 2674 # Negative lengths passed to recvfrom should give ValueError. 2675 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2676 2677 def _testRecvFromNegative(self): 2678 self.cli.sendto(MSG, 0, (HOST, self.port)) 2679 2680# Tests for the sendmsg()/recvmsg() interface. Where possible, the 2681# same test code is used with different families and types of socket 2682# (e.g. stream, datagram), and tests using recvmsg() are repeated 2683# using recvmsg_into(). 2684# 2685# The generic test classes such as SendmsgTests and 2686# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be 2687# supplied with sockets cli_sock and serv_sock representing the 2688# client's and the server's end of the connection respectively, and 2689# attributes cli_addr and serv_addr holding their (numeric where 2690# appropriate) addresses. 2691# 2692# The final concrete test classes combine these with subclasses of 2693# SocketTestBase which set up client and server sockets of a specific 2694# type, and with subclasses of SendrecvmsgBase such as 2695# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these 2696# sockets to cli_sock and serv_sock and override the methods and 2697# attributes of SendrecvmsgBase to fill in destination addresses if 2698# needed when sending, check for specific flags in msg_flags, etc. 2699# 2700# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using 2701# recvmsg_into(). 2702 2703# XXX: like the other datagram (UDP) tests in this module, the code 2704# here assumes that datagram delivery on the local machine will be 2705# reliable. 2706 2707class SendrecvmsgBase(ThreadSafeCleanupTestCase): 2708 # Base class for sendmsg()/recvmsg() tests. 2709 2710 # Time in seconds to wait before considering a test failed, or 2711 # None for no timeout. Not all tests actually set a timeout. 2712 fail_timeout = support.LOOPBACK_TIMEOUT 2713 2714 def setUp(self): 2715 self.misc_event = threading.Event() 2716 super().setUp() 2717 2718 def sendToServer(self, msg): 2719 # Send msg to the server. 2720 return self.cli_sock.send(msg) 2721 2722 # Tuple of alternative default arguments for sendmsg() when called 2723 # via sendmsgToServer() (e.g. to include a destination address). 2724 sendmsg_to_server_defaults = () 2725 2726 def sendmsgToServer(self, *args): 2727 # Call sendmsg() on self.cli_sock with the given arguments, 2728 # filling in any arguments which are not supplied with the 2729 # corresponding items of self.sendmsg_to_server_defaults, if 2730 # any. 2731 return self.cli_sock.sendmsg( 2732 *(args + self.sendmsg_to_server_defaults[len(args):])) 2733 2734 def doRecvmsg(self, sock, bufsize, *args): 2735 # Call recvmsg() on sock with given arguments and return its 2736 # result. Should be used for tests which can use either 2737 # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides 2738 # this method with one which emulates it using recvmsg_into(), 2739 # thus allowing the same test to be used for both methods. 2740 result = sock.recvmsg(bufsize, *args) 2741 self.registerRecvmsgResult(result) 2742 return result 2743 2744 def registerRecvmsgResult(self, result): 2745 # Called by doRecvmsg() with the return value of recvmsg() or 2746 # recvmsg_into(). Can be overridden to arrange cleanup based 2747 # on the returned ancillary data, for instance. 2748 pass 2749 2750 def checkRecvmsgAddress(self, addr1, addr2): 2751 # Called to compare the received address with the address of 2752 # the peer. 2753 self.assertEqual(addr1, addr2) 2754 2755 # Flags that are normally unset in msg_flags 2756 msg_flags_common_unset = 0 2757 for name in ("MSG_CTRUNC", "MSG_OOB"): 2758 msg_flags_common_unset |= getattr(socket, name, 0) 2759 2760 # Flags that are normally set 2761 msg_flags_common_set = 0 2762 2763 # Flags set when a complete record has been received (e.g. MSG_EOR 2764 # for SCTP) 2765 msg_flags_eor_indicator = 0 2766 2767 # Flags set when a complete record has not been received 2768 # (e.g. MSG_TRUNC for datagram sockets) 2769 msg_flags_non_eor_indicator = 0 2770 2771 def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): 2772 # Method to check the value of msg_flags returned by recvmsg[_into](). 2773 # 2774 # Checks that all bits in msg_flags_common_set attribute are 2775 # set in "flags" and all bits in msg_flags_common_unset are 2776 # unset. 2777 # 2778 # The "eor" argument specifies whether the flags should 2779 # indicate that a full record (or datagram) has been received. 2780 # If "eor" is None, no checks are done; otherwise, checks 2781 # that: 2782 # 2783 # * if "eor" is true, all bits in msg_flags_eor_indicator are 2784 # set and all bits in msg_flags_non_eor_indicator are unset 2785 # 2786 # * if "eor" is false, all bits in msg_flags_non_eor_indicator 2787 # are set and all bits in msg_flags_eor_indicator are unset 2788 # 2789 # If "checkset" and/or "checkunset" are supplied, they require 2790 # the given bits to be set or unset respectively, overriding 2791 # what the attributes require for those bits. 2792 # 2793 # If any bits are set in "ignore", they will not be checked, 2794 # regardless of the other inputs. 2795 # 2796 # Will raise Exception if the inputs require a bit to be both 2797 # set and unset, and it is not ignored. 2798 2799 defaultset = self.msg_flags_common_set 2800 defaultunset = self.msg_flags_common_unset 2801 2802 if eor: 2803 defaultset |= self.msg_flags_eor_indicator 2804 defaultunset |= self.msg_flags_non_eor_indicator 2805 elif eor is not None: 2806 defaultset |= self.msg_flags_non_eor_indicator 2807 defaultunset |= self.msg_flags_eor_indicator 2808 2809 # Function arguments override defaults 2810 defaultset &= ~checkunset 2811 defaultunset &= ~checkset 2812 2813 # Merge arguments with remaining defaults, and check for conflicts 2814 checkset |= defaultset 2815 checkunset |= defaultunset 2816 inboth = checkset & checkunset & ~ignore 2817 if inboth: 2818 raise Exception("contradictory set, unset requirements for flags " 2819 "{0:#x}".format(inboth)) 2820 2821 # Compare with given msg_flags value 2822 mask = (checkset | checkunset) & ~ignore 2823 self.assertEqual(flags & mask, checkset & mask) 2824 2825 2826class RecvmsgIntoMixin(SendrecvmsgBase): 2827 # Mixin to implement doRecvmsg() using recvmsg_into(). 2828 2829 def doRecvmsg(self, sock, bufsize, *args): 2830 buf = bytearray(bufsize) 2831 result = sock.recvmsg_into([buf], *args) 2832 self.registerRecvmsgResult(result) 2833 self.assertGreaterEqual(result[0], 0) 2834 self.assertLessEqual(result[0], bufsize) 2835 return (bytes(buf[:result[0]]),) + result[1:] 2836 2837 2838class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): 2839 # Defines flags to be checked in msg_flags for datagram sockets. 2840 2841 @property 2842 def msg_flags_non_eor_indicator(self): 2843 return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC 2844 2845 2846class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): 2847 # Defines flags to be checked in msg_flags for SCTP sockets. 2848 2849 @property 2850 def msg_flags_eor_indicator(self): 2851 return super().msg_flags_eor_indicator | socket.MSG_EOR 2852 2853 2854class SendrecvmsgConnectionlessBase(SendrecvmsgBase): 2855 # Base class for tests on connectionless-mode sockets. Users must 2856 # supply sockets on attributes cli and serv to be mapped to 2857 # cli_sock and serv_sock respectively. 2858 2859 @property 2860 def serv_sock(self): 2861 return self.serv 2862 2863 @property 2864 def cli_sock(self): 2865 return self.cli 2866 2867 @property 2868 def sendmsg_to_server_defaults(self): 2869 return ([], [], 0, self.serv_addr) 2870 2871 def sendToServer(self, msg): 2872 return self.cli_sock.sendto(msg, self.serv_addr) 2873 2874 2875class SendrecvmsgConnectedBase(SendrecvmsgBase): 2876 # Base class for tests on connected sockets. Users must supply 2877 # sockets on attributes serv_conn and cli_conn (representing the 2878 # connections *to* the server and the client), to be mapped to 2879 # cli_sock and serv_sock respectively. 2880 2881 @property 2882 def serv_sock(self): 2883 return self.cli_conn 2884 2885 @property 2886 def cli_sock(self): 2887 return self.serv_conn 2888 2889 def checkRecvmsgAddress(self, addr1, addr2): 2890 # Address is currently "unspecified" for a connected socket, 2891 # so we don't examine it 2892 pass 2893 2894 2895class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): 2896 # Base class to set a timeout on server's socket. 2897 2898 def setUp(self): 2899 super().setUp() 2900 self.serv_sock.settimeout(self.fail_timeout) 2901 2902 2903class SendmsgTests(SendrecvmsgServerTimeoutBase): 2904 # Tests for sendmsg() which can use any socket type and do not 2905 # involve recvmsg() or recvmsg_into(). 2906 2907 def testSendmsg(self): 2908 # Send a simple message with sendmsg(). 2909 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2910 2911 def _testSendmsg(self): 2912 self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) 2913 2914 def testSendmsgDataGenerator(self): 2915 # Send from buffer obtained from a generator (not a sequence). 2916 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2917 2918 def _testSendmsgDataGenerator(self): 2919 self.assertEqual(self.sendmsgToServer((o for o in [MSG])), 2920 len(MSG)) 2921 2922 def testSendmsgAncillaryGenerator(self): 2923 # Gather (empty) ancillary data from a generator. 2924 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2925 2926 def _testSendmsgAncillaryGenerator(self): 2927 self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), 2928 len(MSG)) 2929 2930 def testSendmsgArray(self): 2931 # Send data from an array instead of the usual bytes object. 2932 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2933 2934 def _testSendmsgArray(self): 2935 self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), 2936 len(MSG)) 2937 2938 def testSendmsgGather(self): 2939 # Send message data from more than one buffer (gather write). 2940 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2941 2942 def _testSendmsgGather(self): 2943 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2944 2945 def testSendmsgBadArgs(self): 2946 # Check that sendmsg() rejects invalid arguments. 2947 self.assertEqual(self.serv_sock.recv(1000), b"done") 2948 2949 def _testSendmsgBadArgs(self): 2950 self.assertRaises(TypeError, self.cli_sock.sendmsg) 2951 self.assertRaises(TypeError, self.sendmsgToServer, 2952 b"not in an iterable") 2953 self.assertRaises(TypeError, self.sendmsgToServer, 2954 object()) 2955 self.assertRaises(TypeError, self.sendmsgToServer, 2956 [object()]) 2957 self.assertRaises(TypeError, self.sendmsgToServer, 2958 [MSG, object()]) 2959 self.assertRaises(TypeError, self.sendmsgToServer, 2960 [MSG], object()) 2961 self.assertRaises(TypeError, self.sendmsgToServer, 2962 [MSG], [], object()) 2963 self.assertRaises(TypeError, self.sendmsgToServer, 2964 [MSG], [], 0, object()) 2965 self.sendToServer(b"done") 2966 2967 def testSendmsgBadCmsg(self): 2968 # Check that invalid ancillary data items are rejected. 2969 self.assertEqual(self.serv_sock.recv(1000), b"done") 2970 2971 def _testSendmsgBadCmsg(self): 2972 self.assertRaises(TypeError, self.sendmsgToServer, 2973 [MSG], [object()]) 2974 self.assertRaises(TypeError, self.sendmsgToServer, 2975 [MSG], [(object(), 0, b"data")]) 2976 self.assertRaises(TypeError, self.sendmsgToServer, 2977 [MSG], [(0, object(), b"data")]) 2978 self.assertRaises(TypeError, self.sendmsgToServer, 2979 [MSG], [(0, 0, object())]) 2980 self.assertRaises(TypeError, self.sendmsgToServer, 2981 [MSG], [(0, 0)]) 2982 self.assertRaises(TypeError, self.sendmsgToServer, 2983 [MSG], [(0, 0, b"data", 42)]) 2984 self.sendToServer(b"done") 2985 2986 @requireAttrs(socket, "CMSG_SPACE") 2987 def testSendmsgBadMultiCmsg(self): 2988 # Check that invalid ancillary data items are rejected when 2989 # more than one item is present. 2990 self.assertEqual(self.serv_sock.recv(1000), b"done") 2991 2992 @testSendmsgBadMultiCmsg.client_skip 2993 def _testSendmsgBadMultiCmsg(self): 2994 self.assertRaises(TypeError, self.sendmsgToServer, 2995 [MSG], [0, 0, b""]) 2996 self.assertRaises(TypeError, self.sendmsgToServer, 2997 [MSG], [(0, 0, b""), object()]) 2998 self.sendToServer(b"done") 2999 3000 def testSendmsgExcessCmsgReject(self): 3001 # Check that sendmsg() rejects excess ancillary data items 3002 # when the number that can be sent is limited. 3003 self.assertEqual(self.serv_sock.recv(1000), b"done") 3004 3005 def _testSendmsgExcessCmsgReject(self): 3006 if not hasattr(socket, "CMSG_SPACE"): 3007 # Can only send one item 3008 with self.assertRaises(OSError) as cm: 3009 self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) 3010 self.assertIsNone(cm.exception.errno) 3011 self.sendToServer(b"done") 3012 3013 def testSendmsgAfterClose(self): 3014 # Check that sendmsg() fails on a closed socket. 3015 pass 3016 3017 def _testSendmsgAfterClose(self): 3018 self.cli_sock.close() 3019 self.assertRaises(OSError, self.sendmsgToServer, [MSG]) 3020 3021 3022class SendmsgStreamTests(SendmsgTests): 3023 # Tests for sendmsg() which require a stream socket and do not 3024 # involve recvmsg() or recvmsg_into(). 3025 3026 def testSendmsgExplicitNoneAddr(self): 3027 # Check that peer address can be specified as None. 3028 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 3029 3030 def _testSendmsgExplicitNoneAddr(self): 3031 self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) 3032 3033 def testSendmsgTimeout(self): 3034 # Check that timeout works with sendmsg(). 3035 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 3036 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3037 3038 def _testSendmsgTimeout(self): 3039 try: 3040 self.cli_sock.settimeout(0.03) 3041 try: 3042 while True: 3043 self.sendmsgToServer([b"a"*512]) 3044 except TimeoutError: 3045 pass 3046 except OSError as exc: 3047 if exc.errno != errno.ENOMEM: 3048 raise 3049 # bpo-33937 the test randomly fails on Travis CI with 3050 # "OSError: [Errno 12] Cannot allocate memory" 3051 else: 3052 self.fail("TimeoutError not raised") 3053 finally: 3054 self.misc_event.set() 3055 3056 # XXX: would be nice to have more tests for sendmsg flags argument. 3057 3058 # Linux supports MSG_DONTWAIT when sending, but in general, it 3059 # only works when receiving. Could add other platforms if they 3060 # support it too. 3061 @skipWithClientIf(sys.platform not in {"linux"}, 3062 "MSG_DONTWAIT not known to work on this platform when " 3063 "sending") 3064 def testSendmsgDontWait(self): 3065 # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. 3066 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 3067 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3068 3069 @testSendmsgDontWait.client_skip 3070 def _testSendmsgDontWait(self): 3071 try: 3072 with self.assertRaises(OSError) as cm: 3073 while True: 3074 self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) 3075 # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI 3076 # with "OSError: [Errno 12] Cannot allocate memory" 3077 self.assertIn(cm.exception.errno, 3078 (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM)) 3079 finally: 3080 self.misc_event.set() 3081 3082 3083class SendmsgConnectionlessTests(SendmsgTests): 3084 # Tests for sendmsg() which require a connectionless-mode 3085 # (e.g. datagram) socket, and do not involve recvmsg() or 3086 # recvmsg_into(). 3087 3088 def testSendmsgNoDestAddr(self): 3089 # Check that sendmsg() fails when no destination address is 3090 # given for unconnected socket. 3091 pass 3092 3093 def _testSendmsgNoDestAddr(self): 3094 self.assertRaises(OSError, self.cli_sock.sendmsg, 3095 [MSG]) 3096 self.assertRaises(OSError, self.cli_sock.sendmsg, 3097 [MSG], [], 0, None) 3098 3099 3100class RecvmsgGenericTests(SendrecvmsgBase): 3101 # Tests for recvmsg() which can also be emulated using 3102 # recvmsg_into(), and can use any socket type. 3103 3104 def testRecvmsg(self): 3105 # Receive a simple message with recvmsg[_into](). 3106 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3107 self.assertEqual(msg, MSG) 3108 self.checkRecvmsgAddress(addr, self.cli_addr) 3109 self.assertEqual(ancdata, []) 3110 self.checkFlags(flags, eor=True) 3111 3112 def _testRecvmsg(self): 3113 self.sendToServer(MSG) 3114 3115 def testRecvmsgExplicitDefaults(self): 3116 # Test recvmsg[_into]() with default arguments provided explicitly. 3117 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3118 len(MSG), 0, 0) 3119 self.assertEqual(msg, MSG) 3120 self.checkRecvmsgAddress(addr, self.cli_addr) 3121 self.assertEqual(ancdata, []) 3122 self.checkFlags(flags, eor=True) 3123 3124 def _testRecvmsgExplicitDefaults(self): 3125 self.sendToServer(MSG) 3126 3127 def testRecvmsgShorter(self): 3128 # Receive a message smaller than buffer. 3129 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3130 len(MSG) + 42) 3131 self.assertEqual(msg, MSG) 3132 self.checkRecvmsgAddress(addr, self.cli_addr) 3133 self.assertEqual(ancdata, []) 3134 self.checkFlags(flags, eor=True) 3135 3136 def _testRecvmsgShorter(self): 3137 self.sendToServer(MSG) 3138 3139 def testRecvmsgTrunc(self): 3140 # Receive part of message, check for truncation indicators. 3141 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3142 len(MSG) - 3) 3143 self.assertEqual(msg, MSG[:-3]) 3144 self.checkRecvmsgAddress(addr, self.cli_addr) 3145 self.assertEqual(ancdata, []) 3146 self.checkFlags(flags, eor=False) 3147 3148 def _testRecvmsgTrunc(self): 3149 self.sendToServer(MSG) 3150 3151 def testRecvmsgShortAncillaryBuf(self): 3152 # Test ancillary data buffer too small to hold any ancillary data. 3153 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3154 len(MSG), 1) 3155 self.assertEqual(msg, MSG) 3156 self.checkRecvmsgAddress(addr, self.cli_addr) 3157 self.assertEqual(ancdata, []) 3158 self.checkFlags(flags, eor=True) 3159 3160 def _testRecvmsgShortAncillaryBuf(self): 3161 self.sendToServer(MSG) 3162 3163 def testRecvmsgLongAncillaryBuf(self): 3164 # Test large ancillary data buffer. 3165 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3166 len(MSG), 10240) 3167 self.assertEqual(msg, MSG) 3168 self.checkRecvmsgAddress(addr, self.cli_addr) 3169 self.assertEqual(ancdata, []) 3170 self.checkFlags(flags, eor=True) 3171 3172 def _testRecvmsgLongAncillaryBuf(self): 3173 self.sendToServer(MSG) 3174 3175 def testRecvmsgAfterClose(self): 3176 # Check that recvmsg[_into]() fails on a closed socket. 3177 self.serv_sock.close() 3178 self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) 3179 3180 def _testRecvmsgAfterClose(self): 3181 pass 3182 3183 def testRecvmsgTimeout(self): 3184 # Check that timeout works. 3185 try: 3186 self.serv_sock.settimeout(0.03) 3187 self.assertRaises(TimeoutError, 3188 self.doRecvmsg, self.serv_sock, len(MSG)) 3189 finally: 3190 self.misc_event.set() 3191 3192 def _testRecvmsgTimeout(self): 3193 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3194 3195 @requireAttrs(socket, "MSG_PEEK") 3196 def testRecvmsgPeek(self): 3197 # Check that MSG_PEEK in flags enables examination of pending 3198 # data without consuming it. 3199 3200 # Receive part of data with MSG_PEEK. 3201 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3202 len(MSG) - 3, 0, 3203 socket.MSG_PEEK) 3204 self.assertEqual(msg, MSG[:-3]) 3205 self.checkRecvmsgAddress(addr, self.cli_addr) 3206 self.assertEqual(ancdata, []) 3207 # Ignoring MSG_TRUNC here (so this test is the same for stream 3208 # and datagram sockets). Some wording in POSIX seems to 3209 # suggest that it needn't be set when peeking, but that may 3210 # just be a slip. 3211 self.checkFlags(flags, eor=False, 3212 ignore=getattr(socket, "MSG_TRUNC", 0)) 3213 3214 # Receive all data with MSG_PEEK. 3215 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3216 len(MSG), 0, 3217 socket.MSG_PEEK) 3218 self.assertEqual(msg, MSG) 3219 self.checkRecvmsgAddress(addr, self.cli_addr) 3220 self.assertEqual(ancdata, []) 3221 self.checkFlags(flags, eor=True) 3222 3223 # Check that the same data can still be received normally. 3224 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3225 self.assertEqual(msg, MSG) 3226 self.checkRecvmsgAddress(addr, self.cli_addr) 3227 self.assertEqual(ancdata, []) 3228 self.checkFlags(flags, eor=True) 3229 3230 @testRecvmsgPeek.client_skip 3231 def _testRecvmsgPeek(self): 3232 self.sendToServer(MSG) 3233 3234 @requireAttrs(socket.socket, "sendmsg") 3235 def testRecvmsgFromSendmsg(self): 3236 # Test receiving with recvmsg[_into]() when message is sent 3237 # using sendmsg(). 3238 self.serv_sock.settimeout(self.fail_timeout) 3239 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3240 self.assertEqual(msg, MSG) 3241 self.checkRecvmsgAddress(addr, self.cli_addr) 3242 self.assertEqual(ancdata, []) 3243 self.checkFlags(flags, eor=True) 3244 3245 @testRecvmsgFromSendmsg.client_skip 3246 def _testRecvmsgFromSendmsg(self): 3247 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 3248 3249 3250class RecvmsgGenericStreamTests(RecvmsgGenericTests): 3251 # Tests which require a stream socket and can use either recvmsg() 3252 # or recvmsg_into(). 3253 3254 def testRecvmsgEOF(self): 3255 # Receive end-of-stream indicator (b"", peer socket closed). 3256 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3257 self.assertEqual(msg, b"") 3258 self.checkRecvmsgAddress(addr, self.cli_addr) 3259 self.assertEqual(ancdata, []) 3260 self.checkFlags(flags, eor=None) # Might not have end-of-record marker 3261 3262 def _testRecvmsgEOF(self): 3263 self.cli_sock.close() 3264 3265 def testRecvmsgOverflow(self): 3266 # Receive a message in more than one chunk. 3267 seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3268 len(MSG) - 3) 3269 self.checkRecvmsgAddress(addr, self.cli_addr) 3270 self.assertEqual(ancdata, []) 3271 self.checkFlags(flags, eor=False) 3272 3273 seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3274 self.checkRecvmsgAddress(addr, self.cli_addr) 3275 self.assertEqual(ancdata, []) 3276 self.checkFlags(flags, eor=True) 3277 3278 msg = seg1 + seg2 3279 self.assertEqual(msg, MSG) 3280 3281 def _testRecvmsgOverflow(self): 3282 self.sendToServer(MSG) 3283 3284 3285class RecvmsgTests(RecvmsgGenericTests): 3286 # Tests for recvmsg() which can use any socket type. 3287 3288 def testRecvmsgBadArgs(self): 3289 # Check that recvmsg() rejects invalid arguments. 3290 self.assertRaises(TypeError, self.serv_sock.recvmsg) 3291 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3292 -1, 0, 0) 3293 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3294 len(MSG), -1, 0) 3295 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3296 [bytearray(10)], 0, 0) 3297 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3298 object(), 0, 0) 3299 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3300 len(MSG), object(), 0) 3301 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3302 len(MSG), 0, object()) 3303 3304 msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) 3305 self.assertEqual(msg, MSG) 3306 self.checkRecvmsgAddress(addr, self.cli_addr) 3307 self.assertEqual(ancdata, []) 3308 self.checkFlags(flags, eor=True) 3309 3310 def _testRecvmsgBadArgs(self): 3311 self.sendToServer(MSG) 3312 3313 3314class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): 3315 # Tests for recvmsg_into() which can use any socket type. 3316 3317 def testRecvmsgIntoBadArgs(self): 3318 # Check that recvmsg_into() rejects invalid arguments. 3319 buf = bytearray(len(MSG)) 3320 self.assertRaises(TypeError, self.serv_sock.recvmsg_into) 3321 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3322 len(MSG), 0, 0) 3323 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3324 buf, 0, 0) 3325 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3326 [object()], 0, 0) 3327 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3328 [b"I'm not writable"], 0, 0) 3329 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3330 [buf, object()], 0, 0) 3331 self.assertRaises(ValueError, self.serv_sock.recvmsg_into, 3332 [buf], -1, 0) 3333 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3334 [buf], object(), 0) 3335 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3336 [buf], 0, object()) 3337 3338 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) 3339 self.assertEqual(nbytes, len(MSG)) 3340 self.assertEqual(buf, bytearray(MSG)) 3341 self.checkRecvmsgAddress(addr, self.cli_addr) 3342 self.assertEqual(ancdata, []) 3343 self.checkFlags(flags, eor=True) 3344 3345 def _testRecvmsgIntoBadArgs(self): 3346 self.sendToServer(MSG) 3347 3348 def testRecvmsgIntoGenerator(self): 3349 # Receive into buffer obtained from a generator (not a sequence). 3350 buf = bytearray(len(MSG)) 3351 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3352 (o for o in [buf])) 3353 self.assertEqual(nbytes, len(MSG)) 3354 self.assertEqual(buf, bytearray(MSG)) 3355 self.checkRecvmsgAddress(addr, self.cli_addr) 3356 self.assertEqual(ancdata, []) 3357 self.checkFlags(flags, eor=True) 3358 3359 def _testRecvmsgIntoGenerator(self): 3360 self.sendToServer(MSG) 3361 3362 def testRecvmsgIntoArray(self): 3363 # Receive into an array rather than the usual bytearray. 3364 buf = array.array("B", [0] * len(MSG)) 3365 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) 3366 self.assertEqual(nbytes, len(MSG)) 3367 self.assertEqual(buf.tobytes(), MSG) 3368 self.checkRecvmsgAddress(addr, self.cli_addr) 3369 self.assertEqual(ancdata, []) 3370 self.checkFlags(flags, eor=True) 3371 3372 def _testRecvmsgIntoArray(self): 3373 self.sendToServer(MSG) 3374 3375 def testRecvmsgIntoScatter(self): 3376 # Receive into multiple buffers (scatter write). 3377 b1 = bytearray(b"----") 3378 b2 = bytearray(b"0123456789") 3379 b3 = bytearray(b"--------------") 3380 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3381 [b1, memoryview(b2)[2:9], b3]) 3382 self.assertEqual(nbytes, len(b"Mary had a little lamb")) 3383 self.assertEqual(b1, bytearray(b"Mary")) 3384 self.assertEqual(b2, bytearray(b"01 had a 9")) 3385 self.assertEqual(b3, bytearray(b"little lamb---")) 3386 self.checkRecvmsgAddress(addr, self.cli_addr) 3387 self.assertEqual(ancdata, []) 3388 self.checkFlags(flags, eor=True) 3389 3390 def _testRecvmsgIntoScatter(self): 3391 self.sendToServer(b"Mary had a little lamb") 3392 3393 3394class CmsgMacroTests(unittest.TestCase): 3395 # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests 3396 # assumptions used by sendmsg() and recvmsg[_into](), which share 3397 # code with these functions. 3398 3399 # Match the definition in socketmodule.c 3400 try: 3401 import _testcapi 3402 except ImportError: 3403 socklen_t_limit = 0x7fffffff 3404 else: 3405 socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) 3406 3407 @requireAttrs(socket, "CMSG_LEN") 3408 def testCMSG_LEN(self): 3409 # Test CMSG_LEN() with various valid and invalid values, 3410 # checking the assumptions used by recvmsg() and sendmsg(). 3411 toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 3412 values = list(range(257)) + list(range(toobig - 257, toobig)) 3413 3414 # struct cmsghdr has at least three members, two of which are ints 3415 self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) 3416 for n in values: 3417 ret = socket.CMSG_LEN(n) 3418 # This is how recvmsg() calculates the data size 3419 self.assertEqual(ret - socket.CMSG_LEN(0), n) 3420 self.assertLessEqual(ret, self.socklen_t_limit) 3421 3422 self.assertRaises(OverflowError, socket.CMSG_LEN, -1) 3423 # sendmsg() shares code with these functions, and requires 3424 # that it reject values over the limit. 3425 self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) 3426 self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) 3427 3428 @requireAttrs(socket, "CMSG_SPACE") 3429 def testCMSG_SPACE(self): 3430 # Test CMSG_SPACE() with various valid and invalid values, 3431 # checking the assumptions used by sendmsg(). 3432 toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 3433 values = list(range(257)) + list(range(toobig - 257, toobig)) 3434 3435 last = socket.CMSG_SPACE(0) 3436 # struct cmsghdr has at least three members, two of which are ints 3437 self.assertGreater(last, array.array("i").itemsize * 2) 3438 for n in values: 3439 ret = socket.CMSG_SPACE(n) 3440 self.assertGreaterEqual(ret, last) 3441 self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) 3442 self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) 3443 self.assertLessEqual(ret, self.socklen_t_limit) 3444 last = ret 3445 3446 self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) 3447 # sendmsg() shares code with these functions, and requires 3448 # that it reject values over the limit. 3449 self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) 3450 self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) 3451 3452 3453class SCMRightsTest(SendrecvmsgServerTimeoutBase): 3454 # Tests for file descriptor passing on Unix-domain sockets. 3455 3456 # Invalid file descriptor value that's unlikely to evaluate to a 3457 # real FD even if one of its bytes is replaced with a different 3458 # value (which shouldn't actually happen). 3459 badfd = -0x5555 3460 3461 def newFDs(self, n): 3462 # Return a list of n file descriptors for newly-created files 3463 # containing their list indices as ASCII numbers. 3464 fds = [] 3465 for i in range(n): 3466 fd, path = tempfile.mkstemp() 3467 self.addCleanup(os.unlink, path) 3468 self.addCleanup(os.close, fd) 3469 os.write(fd, str(i).encode()) 3470 fds.append(fd) 3471 return fds 3472 3473 def checkFDs(self, fds): 3474 # Check that the file descriptors in the given list contain 3475 # their correct list indices as ASCII numbers. 3476 for n, fd in enumerate(fds): 3477 os.lseek(fd, 0, os.SEEK_SET) 3478 self.assertEqual(os.read(fd, 1024), str(n).encode()) 3479 3480 def registerRecvmsgResult(self, result): 3481 self.addCleanup(self.closeRecvmsgFDs, result) 3482 3483 def closeRecvmsgFDs(self, recvmsg_result): 3484 # Close all file descriptors specified in the ancillary data 3485 # of the given return value from recvmsg() or recvmsg_into(). 3486 for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: 3487 if (cmsg_level == socket.SOL_SOCKET and 3488 cmsg_type == socket.SCM_RIGHTS): 3489 fds = array.array("i") 3490 fds.frombytes(cmsg_data[: 3491 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3492 for fd in fds: 3493 os.close(fd) 3494 3495 def createAndSendFDs(self, n): 3496 # Send n new file descriptors created by newFDs() to the 3497 # server, with the constant MSG as the non-ancillary data. 3498 self.assertEqual( 3499 self.sendmsgToServer([MSG], 3500 [(socket.SOL_SOCKET, 3501 socket.SCM_RIGHTS, 3502 array.array("i", self.newFDs(n)))]), 3503 len(MSG)) 3504 3505 def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): 3506 # Check that constant MSG was received with numfds file 3507 # descriptors in a maximum of maxcmsgs control messages (which 3508 # must contain only complete integers). By default, check 3509 # that MSG_CTRUNC is unset, but ignore any flags in 3510 # ignoreflags. 3511 msg, ancdata, flags, addr = result 3512 self.assertEqual(msg, MSG) 3513 self.checkRecvmsgAddress(addr, self.cli_addr) 3514 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3515 ignore=ignoreflags) 3516 3517 self.assertIsInstance(ancdata, list) 3518 self.assertLessEqual(len(ancdata), maxcmsgs) 3519 fds = array.array("i") 3520 for item in ancdata: 3521 self.assertIsInstance(item, tuple) 3522 cmsg_level, cmsg_type, cmsg_data = item 3523 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3524 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3525 self.assertIsInstance(cmsg_data, bytes) 3526 self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) 3527 fds.frombytes(cmsg_data) 3528 3529 self.assertEqual(len(fds), numfds) 3530 self.checkFDs(fds) 3531 3532 def testFDPassSimple(self): 3533 # Pass a single FD (array read from bytes object). 3534 self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, 3535 len(MSG), 10240)) 3536 3537 def _testFDPassSimple(self): 3538 self.assertEqual( 3539 self.sendmsgToServer( 3540 [MSG], 3541 [(socket.SOL_SOCKET, 3542 socket.SCM_RIGHTS, 3543 array.array("i", self.newFDs(1)).tobytes())]), 3544 len(MSG)) 3545 3546 def testMultipleFDPass(self): 3547 # Pass multiple FDs in a single array. 3548 self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, 3549 len(MSG), 10240)) 3550 3551 def _testMultipleFDPass(self): 3552 self.createAndSendFDs(4) 3553 3554 @requireAttrs(socket, "CMSG_SPACE") 3555 def testFDPassCMSG_SPACE(self): 3556 # Test using CMSG_SPACE() to calculate ancillary buffer size. 3557 self.checkRecvmsgFDs( 3558 4, self.doRecvmsg(self.serv_sock, len(MSG), 3559 socket.CMSG_SPACE(4 * SIZEOF_INT))) 3560 3561 @testFDPassCMSG_SPACE.client_skip 3562 def _testFDPassCMSG_SPACE(self): 3563 self.createAndSendFDs(4) 3564 3565 def testFDPassCMSG_LEN(self): 3566 # Test using CMSG_LEN() to calculate ancillary buffer size. 3567 self.checkRecvmsgFDs(1, 3568 self.doRecvmsg(self.serv_sock, len(MSG), 3569 socket.CMSG_LEN(4 * SIZEOF_INT)), 3570 # RFC 3542 says implementations may set 3571 # MSG_CTRUNC if there isn't enough space 3572 # for trailing padding. 3573 ignoreflags=socket.MSG_CTRUNC) 3574 3575 def _testFDPassCMSG_LEN(self): 3576 self.createAndSendFDs(1) 3577 3578 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3579 @unittest.skipIf(AIX, "skipping, see issue #22397") 3580 @requireAttrs(socket, "CMSG_SPACE") 3581 def testFDPassSeparate(self): 3582 # Pass two FDs in two separate arrays. Arrays may be combined 3583 # into a single control message by the OS. 3584 self.checkRecvmsgFDs(2, 3585 self.doRecvmsg(self.serv_sock, len(MSG), 10240), 3586 maxcmsgs=2) 3587 3588 @testFDPassSeparate.client_skip 3589 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3590 @unittest.skipIf(AIX, "skipping, see issue #22397") 3591 def _testFDPassSeparate(self): 3592 fd0, fd1 = self.newFDs(2) 3593 self.assertEqual( 3594 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3595 socket.SCM_RIGHTS, 3596 array.array("i", [fd0])), 3597 (socket.SOL_SOCKET, 3598 socket.SCM_RIGHTS, 3599 array.array("i", [fd1]))]), 3600 len(MSG)) 3601 3602 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3603 @unittest.skipIf(AIX, "skipping, see issue #22397") 3604 @requireAttrs(socket, "CMSG_SPACE") 3605 def testFDPassSeparateMinSpace(self): 3606 # Pass two FDs in two separate arrays, receiving them into the 3607 # minimum space for two arrays. 3608 num_fds = 2 3609 self.checkRecvmsgFDs(num_fds, 3610 self.doRecvmsg(self.serv_sock, len(MSG), 3611 socket.CMSG_SPACE(SIZEOF_INT) + 3612 socket.CMSG_LEN(SIZEOF_INT * num_fds)), 3613 maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) 3614 3615 @testFDPassSeparateMinSpace.client_skip 3616 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3617 @unittest.skipIf(AIX, "skipping, see issue #22397") 3618 def _testFDPassSeparateMinSpace(self): 3619 fd0, fd1 = self.newFDs(2) 3620 self.assertEqual( 3621 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3622 socket.SCM_RIGHTS, 3623 array.array("i", [fd0])), 3624 (socket.SOL_SOCKET, 3625 socket.SCM_RIGHTS, 3626 array.array("i", [fd1]))]), 3627 len(MSG)) 3628 3629 def sendAncillaryIfPossible(self, msg, ancdata): 3630 # Try to send msg and ancdata to server, but if the system 3631 # call fails, just send msg with no ancillary data. 3632 try: 3633 nbytes = self.sendmsgToServer([msg], ancdata) 3634 except OSError as e: 3635 # Check that it was the system call that failed 3636 self.assertIsInstance(e.errno, int) 3637 nbytes = self.sendmsgToServer([msg]) 3638 self.assertEqual(nbytes, len(msg)) 3639 3640 @unittest.skipIf(sys.platform == "darwin", "see issue #24725") 3641 def testFDPassEmpty(self): 3642 # Try to pass an empty FD array. Can receive either no array 3643 # or an empty array. 3644 self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, 3645 len(MSG), 10240), 3646 ignoreflags=socket.MSG_CTRUNC) 3647 3648 def _testFDPassEmpty(self): 3649 self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, 3650 socket.SCM_RIGHTS, 3651 b"")]) 3652 3653 def testFDPassPartialInt(self): 3654 # Try to pass a truncated FD array. 3655 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3656 len(MSG), 10240) 3657 self.assertEqual(msg, MSG) 3658 self.checkRecvmsgAddress(addr, self.cli_addr) 3659 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3660 self.assertLessEqual(len(ancdata), 1) 3661 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3662 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3663 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3664 self.assertLess(len(cmsg_data), SIZEOF_INT) 3665 3666 def _testFDPassPartialInt(self): 3667 self.sendAncillaryIfPossible( 3668 MSG, 3669 [(socket.SOL_SOCKET, 3670 socket.SCM_RIGHTS, 3671 array.array("i", [self.badfd]).tobytes()[:-1])]) 3672 3673 @requireAttrs(socket, "CMSG_SPACE") 3674 def testFDPassPartialIntInMiddle(self): 3675 # Try to pass two FD arrays, the first of which is truncated. 3676 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3677 len(MSG), 10240) 3678 self.assertEqual(msg, MSG) 3679 self.checkRecvmsgAddress(addr, self.cli_addr) 3680 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3681 self.assertLessEqual(len(ancdata), 2) 3682 fds = array.array("i") 3683 # Arrays may have been combined in a single control message 3684 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3685 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3686 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3687 fds.frombytes(cmsg_data[: 3688 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3689 self.assertLessEqual(len(fds), 2) 3690 self.checkFDs(fds) 3691 3692 @testFDPassPartialIntInMiddle.client_skip 3693 def _testFDPassPartialIntInMiddle(self): 3694 fd0, fd1 = self.newFDs(2) 3695 self.sendAncillaryIfPossible( 3696 MSG, 3697 [(socket.SOL_SOCKET, 3698 socket.SCM_RIGHTS, 3699 array.array("i", [fd0, self.badfd]).tobytes()[:-1]), 3700 (socket.SOL_SOCKET, 3701 socket.SCM_RIGHTS, 3702 array.array("i", [fd1]))]) 3703 3704 def checkTruncatedHeader(self, result, ignoreflags=0): 3705 # Check that no ancillary data items are returned when data is 3706 # truncated inside the cmsghdr structure. 3707 msg, ancdata, flags, addr = result 3708 self.assertEqual(msg, MSG) 3709 self.checkRecvmsgAddress(addr, self.cli_addr) 3710 self.assertEqual(ancdata, []) 3711 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3712 ignore=ignoreflags) 3713 3714 def testCmsgTruncNoBufSize(self): 3715 # Check that no ancillary data is received when no buffer size 3716 # is specified. 3717 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), 3718 # BSD seems to set MSG_CTRUNC only 3719 # if an item has been partially 3720 # received. 3721 ignoreflags=socket.MSG_CTRUNC) 3722 3723 def _testCmsgTruncNoBufSize(self): 3724 self.createAndSendFDs(1) 3725 3726 def testCmsgTrunc0(self): 3727 # Check that no ancillary data is received when buffer size is 0. 3728 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), 3729 ignoreflags=socket.MSG_CTRUNC) 3730 3731 def _testCmsgTrunc0(self): 3732 self.createAndSendFDs(1) 3733 3734 # Check that no ancillary data is returned for various non-zero 3735 # (but still too small) buffer sizes. 3736 3737 def testCmsgTrunc1(self): 3738 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) 3739 3740 def _testCmsgTrunc1(self): 3741 self.createAndSendFDs(1) 3742 3743 def testCmsgTrunc2Int(self): 3744 # The cmsghdr structure has at least three members, two of 3745 # which are ints, so we still shouldn't see any ancillary 3746 # data. 3747 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3748 SIZEOF_INT * 2)) 3749 3750 def _testCmsgTrunc2Int(self): 3751 self.createAndSendFDs(1) 3752 3753 def testCmsgTruncLen0Minus1(self): 3754 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3755 socket.CMSG_LEN(0) - 1)) 3756 3757 def _testCmsgTruncLen0Minus1(self): 3758 self.createAndSendFDs(1) 3759 3760 # The following tests try to truncate the control message in the 3761 # middle of the FD array. 3762 3763 def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): 3764 # Check that file descriptor data is truncated to between 3765 # mindata and maxdata bytes when received with buffer size 3766 # ancbuf, and that any complete file descriptor numbers are 3767 # valid. 3768 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3769 len(MSG), ancbuf) 3770 self.assertEqual(msg, MSG) 3771 self.checkRecvmsgAddress(addr, self.cli_addr) 3772 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3773 3774 if mindata == 0 and ancdata == []: 3775 return 3776 self.assertEqual(len(ancdata), 1) 3777 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3778 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3779 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3780 self.assertGreaterEqual(len(cmsg_data), mindata) 3781 self.assertLessEqual(len(cmsg_data), maxdata) 3782 fds = array.array("i") 3783 fds.frombytes(cmsg_data[: 3784 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3785 self.checkFDs(fds) 3786 3787 def testCmsgTruncLen0(self): 3788 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) 3789 3790 def _testCmsgTruncLen0(self): 3791 self.createAndSendFDs(1) 3792 3793 def testCmsgTruncLen0Plus1(self): 3794 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) 3795 3796 def _testCmsgTruncLen0Plus1(self): 3797 self.createAndSendFDs(2) 3798 3799 def testCmsgTruncLen1(self): 3800 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), 3801 maxdata=SIZEOF_INT) 3802 3803 def _testCmsgTruncLen1(self): 3804 self.createAndSendFDs(2) 3805 3806 def testCmsgTruncLen2Minus1(self): 3807 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, 3808 maxdata=(2 * SIZEOF_INT) - 1) 3809 3810 def _testCmsgTruncLen2Minus1(self): 3811 self.createAndSendFDs(2) 3812 3813 3814class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): 3815 # Test sendmsg() and recvmsg[_into]() using the ancillary data 3816 # features of the RFC 3542 Advanced Sockets API for IPv6. 3817 # Currently we can only handle certain data items (e.g. traffic 3818 # class, hop limit, MTU discovery and fragmentation settings) 3819 # without resorting to unportable means such as the struct module, 3820 # but the tests here are aimed at testing the ancillary data 3821 # handling in sendmsg() and recvmsg() rather than the IPv6 API 3822 # itself. 3823 3824 # Test value to use when setting hop limit of packet 3825 hop_limit = 2 3826 3827 # Test value to use when setting traffic class of packet. 3828 # -1 means "use kernel default". 3829 traffic_class = -1 3830 3831 def ancillaryMapping(self, ancdata): 3832 # Given ancillary data list ancdata, return a mapping from 3833 # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. 3834 # Check that no (level, type) pair appears more than once. 3835 d = {} 3836 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3837 self.assertNotIn((cmsg_level, cmsg_type), d) 3838 d[(cmsg_level, cmsg_type)] = cmsg_data 3839 return d 3840 3841 def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): 3842 # Receive hop limit into ancbufsize bytes of ancillary data 3843 # space. Check that data is MSG, ancillary data is not 3844 # truncated (but ignore any flags in ignoreflags), and hop 3845 # limit is between 0 and maxhop inclusive. 3846 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3847 socket.IPV6_RECVHOPLIMIT, 1) 3848 self.misc_event.set() 3849 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3850 len(MSG), ancbufsize) 3851 3852 self.assertEqual(msg, MSG) 3853 self.checkRecvmsgAddress(addr, self.cli_addr) 3854 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3855 ignore=ignoreflags) 3856 3857 self.assertEqual(len(ancdata), 1) 3858 self.assertIsInstance(ancdata[0], tuple) 3859 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3860 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3861 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3862 self.assertIsInstance(cmsg_data, bytes) 3863 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3864 a = array.array("i") 3865 a.frombytes(cmsg_data) 3866 self.assertGreaterEqual(a[0], 0) 3867 self.assertLessEqual(a[0], maxhop) 3868 3869 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3870 def testRecvHopLimit(self): 3871 # Test receiving the packet hop limit as ancillary data. 3872 self.checkHopLimit(ancbufsize=10240) 3873 3874 @testRecvHopLimit.client_skip 3875 def _testRecvHopLimit(self): 3876 # Need to wait until server has asked to receive ancillary 3877 # data, as implementations are not required to buffer it 3878 # otherwise. 3879 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3880 self.sendToServer(MSG) 3881 3882 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3883 def testRecvHopLimitCMSG_SPACE(self): 3884 # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. 3885 self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) 3886 3887 @testRecvHopLimitCMSG_SPACE.client_skip 3888 def _testRecvHopLimitCMSG_SPACE(self): 3889 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3890 self.sendToServer(MSG) 3891 3892 # Could test receiving into buffer sized using CMSG_LEN, but RFC 3893 # 3542 says portable applications must provide space for trailing 3894 # padding. Implementations may set MSG_CTRUNC if there isn't 3895 # enough space for the padding. 3896 3897 @requireAttrs(socket.socket, "sendmsg") 3898 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3899 def testSetHopLimit(self): 3900 # Test setting hop limit on outgoing packet and receiving it 3901 # at the other end. 3902 self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) 3903 3904 @testSetHopLimit.client_skip 3905 def _testSetHopLimit(self): 3906 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3907 self.assertEqual( 3908 self.sendmsgToServer([MSG], 3909 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3910 array.array("i", [self.hop_limit]))]), 3911 len(MSG)) 3912 3913 def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, 3914 ignoreflags=0): 3915 # Receive traffic class and hop limit into ancbufsize bytes of 3916 # ancillary data space. Check that data is MSG, ancillary 3917 # data is not truncated (but ignore any flags in ignoreflags), 3918 # and traffic class and hop limit are in range (hop limit no 3919 # more than maxhop). 3920 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3921 socket.IPV6_RECVHOPLIMIT, 1) 3922 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3923 socket.IPV6_RECVTCLASS, 1) 3924 self.misc_event.set() 3925 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3926 len(MSG), ancbufsize) 3927 3928 self.assertEqual(msg, MSG) 3929 self.checkRecvmsgAddress(addr, self.cli_addr) 3930 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3931 ignore=ignoreflags) 3932 self.assertEqual(len(ancdata), 2) 3933 ancmap = self.ancillaryMapping(ancdata) 3934 3935 tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] 3936 self.assertEqual(len(tcdata), SIZEOF_INT) 3937 a = array.array("i") 3938 a.frombytes(tcdata) 3939 self.assertGreaterEqual(a[0], 0) 3940 self.assertLessEqual(a[0], 255) 3941 3942 hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] 3943 self.assertEqual(len(hldata), SIZEOF_INT) 3944 a = array.array("i") 3945 a.frombytes(hldata) 3946 self.assertGreaterEqual(a[0], 0) 3947 self.assertLessEqual(a[0], maxhop) 3948 3949 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3950 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3951 def testRecvTrafficClassAndHopLimit(self): 3952 # Test receiving traffic class and hop limit as ancillary data. 3953 self.checkTrafficClassAndHopLimit(ancbufsize=10240) 3954 3955 @testRecvTrafficClassAndHopLimit.client_skip 3956 def _testRecvTrafficClassAndHopLimit(self): 3957 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3958 self.sendToServer(MSG) 3959 3960 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3961 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3962 def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3963 # Test receiving traffic class and hop limit, using 3964 # CMSG_SPACE() to calculate buffer size. 3965 self.checkTrafficClassAndHopLimit( 3966 ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) 3967 3968 @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip 3969 def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3970 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3971 self.sendToServer(MSG) 3972 3973 @requireAttrs(socket.socket, "sendmsg") 3974 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3975 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3976 def testSetTrafficClassAndHopLimit(self): 3977 # Test setting traffic class and hop limit on outgoing packet, 3978 # and receiving them at the other end. 3979 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3980 maxhop=self.hop_limit) 3981 3982 @testSetTrafficClassAndHopLimit.client_skip 3983 def _testSetTrafficClassAndHopLimit(self): 3984 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3985 self.assertEqual( 3986 self.sendmsgToServer([MSG], 3987 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3988 array.array("i", [self.traffic_class])), 3989 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3990 array.array("i", [self.hop_limit]))]), 3991 len(MSG)) 3992 3993 @requireAttrs(socket.socket, "sendmsg") 3994 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3995 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3996 def testOddCmsgSize(self): 3997 # Try to send ancillary data with first item one byte too 3998 # long. Fall back to sending with correct size if this fails, 3999 # and check that second item was handled correctly. 4000 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 4001 maxhop=self.hop_limit) 4002 4003 @testOddCmsgSize.client_skip 4004 def _testOddCmsgSize(self): 4005 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4006 try: 4007 nbytes = self.sendmsgToServer( 4008 [MSG], 4009 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 4010 array.array("i", [self.traffic_class]).tobytes() + b"\x00"), 4011 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 4012 array.array("i", [self.hop_limit]))]) 4013 except OSError as e: 4014 self.assertIsInstance(e.errno, int) 4015 nbytes = self.sendmsgToServer( 4016 [MSG], 4017 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 4018 array.array("i", [self.traffic_class])), 4019 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 4020 array.array("i", [self.hop_limit]))]) 4021 self.assertEqual(nbytes, len(MSG)) 4022 4023 # Tests for proper handling of truncated ancillary data 4024 4025 def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): 4026 # Receive hop limit into ancbufsize bytes of ancillary data 4027 # space, which should be too small to contain the ancillary 4028 # data header (if ancbufsize is None, pass no second argument 4029 # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set 4030 # (unless included in ignoreflags), and no ancillary data is 4031 # returned. 4032 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4033 socket.IPV6_RECVHOPLIMIT, 1) 4034 self.misc_event.set() 4035 args = () if ancbufsize is None else (ancbufsize,) 4036 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 4037 len(MSG), *args) 4038 4039 self.assertEqual(msg, MSG) 4040 self.checkRecvmsgAddress(addr, self.cli_addr) 4041 self.assertEqual(ancdata, []) 4042 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 4043 ignore=ignoreflags) 4044 4045 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4046 def testCmsgTruncNoBufSize(self): 4047 # Check that no ancillary data is received when no ancillary 4048 # buffer size is provided. 4049 self.checkHopLimitTruncatedHeader(ancbufsize=None, 4050 # BSD seems to set 4051 # MSG_CTRUNC only if an item 4052 # has been partially 4053 # received. 4054 ignoreflags=socket.MSG_CTRUNC) 4055 4056 @testCmsgTruncNoBufSize.client_skip 4057 def _testCmsgTruncNoBufSize(self): 4058 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4059 self.sendToServer(MSG) 4060 4061 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4062 def testSingleCmsgTrunc0(self): 4063 # Check that no ancillary data is received when ancillary 4064 # buffer size is zero. 4065 self.checkHopLimitTruncatedHeader(ancbufsize=0, 4066 ignoreflags=socket.MSG_CTRUNC) 4067 4068 @testSingleCmsgTrunc0.client_skip 4069 def _testSingleCmsgTrunc0(self): 4070 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4071 self.sendToServer(MSG) 4072 4073 # Check that no ancillary data is returned for various non-zero 4074 # (but still too small) buffer sizes. 4075 4076 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4077 def testSingleCmsgTrunc1(self): 4078 self.checkHopLimitTruncatedHeader(ancbufsize=1) 4079 4080 @testSingleCmsgTrunc1.client_skip 4081 def _testSingleCmsgTrunc1(self): 4082 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4083 self.sendToServer(MSG) 4084 4085 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4086 def testSingleCmsgTrunc2Int(self): 4087 self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) 4088 4089 @testSingleCmsgTrunc2Int.client_skip 4090 def _testSingleCmsgTrunc2Int(self): 4091 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4092 self.sendToServer(MSG) 4093 4094 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4095 def testSingleCmsgTruncLen0Minus1(self): 4096 self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) 4097 4098 @testSingleCmsgTruncLen0Minus1.client_skip 4099 def _testSingleCmsgTruncLen0Minus1(self): 4100 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4101 self.sendToServer(MSG) 4102 4103 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4104 def testSingleCmsgTruncInData(self): 4105 # Test truncation of a control message inside its associated 4106 # data. The message may be returned with its data truncated, 4107 # or not returned at all. 4108 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4109 socket.IPV6_RECVHOPLIMIT, 1) 4110 self.misc_event.set() 4111 msg, ancdata, flags, addr = self.doRecvmsg( 4112 self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) 4113 4114 self.assertEqual(msg, MSG) 4115 self.checkRecvmsgAddress(addr, self.cli_addr) 4116 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4117 4118 self.assertLessEqual(len(ancdata), 1) 4119 if ancdata: 4120 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4121 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4122 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 4123 self.assertLess(len(cmsg_data), SIZEOF_INT) 4124 4125 @testSingleCmsgTruncInData.client_skip 4126 def _testSingleCmsgTruncInData(self): 4127 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4128 self.sendToServer(MSG) 4129 4130 def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): 4131 # Receive traffic class and hop limit into ancbufsize bytes of 4132 # ancillary data space, which should be large enough to 4133 # contain the first item, but too small to contain the header 4134 # of the second. Check that data is MSG, MSG_CTRUNC is set 4135 # (unless included in ignoreflags), and only one ancillary 4136 # data item is returned. 4137 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4138 socket.IPV6_RECVHOPLIMIT, 1) 4139 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4140 socket.IPV6_RECVTCLASS, 1) 4141 self.misc_event.set() 4142 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 4143 len(MSG), ancbufsize) 4144 4145 self.assertEqual(msg, MSG) 4146 self.checkRecvmsgAddress(addr, self.cli_addr) 4147 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 4148 ignore=ignoreflags) 4149 4150 self.assertEqual(len(ancdata), 1) 4151 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4152 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4153 self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) 4154 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4155 a = array.array("i") 4156 a.frombytes(cmsg_data) 4157 self.assertGreaterEqual(a[0], 0) 4158 self.assertLessEqual(a[0], 255) 4159 4160 # Try the above test with various buffer sizes. 4161 4162 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4163 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4164 def testSecondCmsgTrunc0(self): 4165 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), 4166 ignoreflags=socket.MSG_CTRUNC) 4167 4168 @testSecondCmsgTrunc0.client_skip 4169 def _testSecondCmsgTrunc0(self): 4170 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4171 self.sendToServer(MSG) 4172 4173 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4174 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4175 def testSecondCmsgTrunc1(self): 4176 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) 4177 4178 @testSecondCmsgTrunc1.client_skip 4179 def _testSecondCmsgTrunc1(self): 4180 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4181 self.sendToServer(MSG) 4182 4183 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4184 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4185 def testSecondCmsgTrunc2Int(self): 4186 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4187 2 * SIZEOF_INT) 4188 4189 @testSecondCmsgTrunc2Int.client_skip 4190 def _testSecondCmsgTrunc2Int(self): 4191 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4192 self.sendToServer(MSG) 4193 4194 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4195 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4196 def testSecondCmsgTruncLen0Minus1(self): 4197 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4198 socket.CMSG_LEN(0) - 1) 4199 4200 @testSecondCmsgTruncLen0Minus1.client_skip 4201 def _testSecondCmsgTruncLen0Minus1(self): 4202 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4203 self.sendToServer(MSG) 4204 4205 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4206 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4207 def testSecondCmsgTruncInData(self): 4208 # Test truncation of the second of two control messages inside 4209 # its associated data. 4210 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4211 socket.IPV6_RECVHOPLIMIT, 1) 4212 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4213 socket.IPV6_RECVTCLASS, 1) 4214 self.misc_event.set() 4215 msg, ancdata, flags, addr = self.doRecvmsg( 4216 self.serv_sock, len(MSG), 4217 socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) 4218 4219 self.assertEqual(msg, MSG) 4220 self.checkRecvmsgAddress(addr, self.cli_addr) 4221 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4222 4223 cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} 4224 4225 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4226 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4227 cmsg_types.remove(cmsg_type) 4228 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4229 a = array.array("i") 4230 a.frombytes(cmsg_data) 4231 self.assertGreaterEqual(a[0], 0) 4232 self.assertLessEqual(a[0], 255) 4233 4234 if ancdata: 4235 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4236 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4237 cmsg_types.remove(cmsg_type) 4238 self.assertLess(len(cmsg_data), SIZEOF_INT) 4239 4240 self.assertEqual(ancdata, []) 4241 4242 @testSecondCmsgTruncInData.client_skip 4243 def _testSecondCmsgTruncInData(self): 4244 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4245 self.sendToServer(MSG) 4246 4247 4248# Derive concrete test classes for different socket types. 4249 4250class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, 4251 SendrecvmsgConnectionlessBase, 4252 ThreadedSocketTestMixin, UDPTestBase): 4253 pass 4254 4255@requireAttrs(socket.socket, "sendmsg") 4256class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): 4257 pass 4258 4259@requireAttrs(socket.socket, "recvmsg") 4260class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): 4261 pass 4262 4263@requireAttrs(socket.socket, "recvmsg_into") 4264class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): 4265 pass 4266 4267 4268class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, 4269 SendrecvmsgConnectionlessBase, 4270 ThreadedSocketTestMixin, UDP6TestBase): 4271 4272 def checkRecvmsgAddress(self, addr1, addr2): 4273 # Called to compare the received address with the address of 4274 # the peer, ignoring scope ID 4275 self.assertEqual(addr1[:-1], addr2[:-1]) 4276 4277@requireAttrs(socket.socket, "sendmsg") 4278@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4279@requireSocket("AF_INET6", "SOCK_DGRAM") 4280class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): 4281 pass 4282 4283@requireAttrs(socket.socket, "recvmsg") 4284@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4285@requireSocket("AF_INET6", "SOCK_DGRAM") 4286class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): 4287 pass 4288 4289@requireAttrs(socket.socket, "recvmsg_into") 4290@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4291@requireSocket("AF_INET6", "SOCK_DGRAM") 4292class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): 4293 pass 4294 4295@requireAttrs(socket.socket, "recvmsg") 4296@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4297@requireAttrs(socket, "IPPROTO_IPV6") 4298@requireSocket("AF_INET6", "SOCK_DGRAM") 4299class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, 4300 SendrecvmsgUDP6TestBase): 4301 pass 4302 4303@requireAttrs(socket.socket, "recvmsg_into") 4304@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4305@requireAttrs(socket, "IPPROTO_IPV6") 4306@requireSocket("AF_INET6", "SOCK_DGRAM") 4307class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, 4308 RFC3542AncillaryTest, 4309 SendrecvmsgUDP6TestBase): 4310 pass 4311 4312 4313@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4314 'UDPLITE sockets required for this test.') 4315class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase, 4316 SendrecvmsgConnectionlessBase, 4317 ThreadedSocketTestMixin, UDPLITETestBase): 4318 pass 4319 4320@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4321 'UDPLITE sockets required for this test.') 4322@requireAttrs(socket.socket, "sendmsg") 4323class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase): 4324 pass 4325 4326@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4327 'UDPLITE sockets required for this test.') 4328@requireAttrs(socket.socket, "recvmsg") 4329class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase): 4330 pass 4331 4332@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4333 'UDPLITE sockets required for this test.') 4334@requireAttrs(socket.socket, "recvmsg_into") 4335class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase): 4336 pass 4337 4338 4339@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4340 'UDPLITE sockets required for this test.') 4341class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase, 4342 SendrecvmsgConnectionlessBase, 4343 ThreadedSocketTestMixin, UDPLITE6TestBase): 4344 4345 def checkRecvmsgAddress(self, addr1, addr2): 4346 # Called to compare the received address with the address of 4347 # the peer, ignoring scope ID 4348 self.assertEqual(addr1[:-1], addr2[:-1]) 4349 4350@requireAttrs(socket.socket, "sendmsg") 4351@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4352@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4353 'UDPLITE sockets required for this test.') 4354@requireSocket("AF_INET6", "SOCK_DGRAM") 4355class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase): 4356 pass 4357 4358@requireAttrs(socket.socket, "recvmsg") 4359@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4360@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4361 'UDPLITE sockets required for this test.') 4362@requireSocket("AF_INET6", "SOCK_DGRAM") 4363class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase): 4364 pass 4365 4366@requireAttrs(socket.socket, "recvmsg_into") 4367@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4368@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4369 'UDPLITE sockets required for this test.') 4370@requireSocket("AF_INET6", "SOCK_DGRAM") 4371class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase): 4372 pass 4373 4374@requireAttrs(socket.socket, "recvmsg") 4375@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4376@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4377 'UDPLITE sockets required for this test.') 4378@requireAttrs(socket, "IPPROTO_IPV6") 4379@requireSocket("AF_INET6", "SOCK_DGRAM") 4380class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest, 4381 SendrecvmsgUDPLITE6TestBase): 4382 pass 4383 4384@requireAttrs(socket.socket, "recvmsg_into") 4385@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4386@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4387 'UDPLITE sockets required for this test.') 4388@requireAttrs(socket, "IPPROTO_IPV6") 4389@requireSocket("AF_INET6", "SOCK_DGRAM") 4390class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin, 4391 RFC3542AncillaryTest, 4392 SendrecvmsgUDPLITE6TestBase): 4393 pass 4394 4395 4396class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, 4397 ConnectedStreamTestMixin, TCPTestBase): 4398 pass 4399 4400@requireAttrs(socket.socket, "sendmsg") 4401class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): 4402 pass 4403 4404@requireAttrs(socket.socket, "recvmsg") 4405class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, 4406 SendrecvmsgTCPTestBase): 4407 pass 4408 4409@requireAttrs(socket.socket, "recvmsg_into") 4410class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4411 SendrecvmsgTCPTestBase): 4412 pass 4413 4414 4415class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, 4416 SendrecvmsgConnectedBase, 4417 ConnectedStreamTestMixin, SCTPStreamBase): 4418 pass 4419 4420@requireAttrs(socket.socket, "sendmsg") 4421@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4422@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4423class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): 4424 pass 4425 4426@requireAttrs(socket.socket, "recvmsg") 4427@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4428@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4429class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4430 SendrecvmsgSCTPStreamTestBase): 4431 4432 def testRecvmsgEOF(self): 4433 try: 4434 super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() 4435 except OSError as e: 4436 if e.errno != errno.ENOTCONN: 4437 raise 4438 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4439 4440@requireAttrs(socket.socket, "recvmsg_into") 4441@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4442@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4443class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4444 SendrecvmsgSCTPStreamTestBase): 4445 4446 def testRecvmsgEOF(self): 4447 try: 4448 super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() 4449 except OSError as e: 4450 if e.errno != errno.ENOTCONN: 4451 raise 4452 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4453 4454 4455class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, 4456 ConnectedStreamTestMixin, UnixStreamBase): 4457 pass 4458 4459@requireAttrs(socket.socket, "sendmsg") 4460@requireAttrs(socket, "AF_UNIX") 4461class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): 4462 pass 4463 4464@requireAttrs(socket.socket, "recvmsg") 4465@requireAttrs(socket, "AF_UNIX") 4466class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4467 SendrecvmsgUnixStreamTestBase): 4468 pass 4469 4470@requireAttrs(socket.socket, "recvmsg_into") 4471@requireAttrs(socket, "AF_UNIX") 4472class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4473 SendrecvmsgUnixStreamTestBase): 4474 pass 4475 4476@requireAttrs(socket.socket, "sendmsg", "recvmsg") 4477@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4478class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): 4479 pass 4480 4481@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") 4482@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4483class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, 4484 SendrecvmsgUnixStreamTestBase): 4485 pass 4486 4487 4488# Test interrupting the interruptible send/receive methods with a 4489# signal when a timeout is set. These tests avoid having multiple 4490# threads alive during the test so that the OS cannot deliver the 4491# signal to the wrong one. 4492 4493class InterruptedTimeoutBase: 4494 # Base class for interrupted send/receive tests. Installs an 4495 # empty handler for SIGALRM and removes it on teardown, along with 4496 # any scheduled alarms. 4497 4498 def setUp(self): 4499 super().setUp() 4500 orig_alrm_handler = signal.signal(signal.SIGALRM, 4501 lambda signum, frame: 1 / 0) 4502 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) 4503 4504 # Timeout for socket operations 4505 timeout = support.LOOPBACK_TIMEOUT 4506 4507 # Provide setAlarm() method to schedule delivery of SIGALRM after 4508 # given number of seconds, or cancel it if zero, and an 4509 # appropriate time value to use. Use setitimer() if available. 4510 if hasattr(signal, "setitimer"): 4511 alarm_time = 0.05 4512 4513 def setAlarm(self, seconds): 4514 signal.setitimer(signal.ITIMER_REAL, seconds) 4515 else: 4516 # Old systems may deliver the alarm up to one second early 4517 alarm_time = 2 4518 4519 def setAlarm(self, seconds): 4520 signal.alarm(seconds) 4521 4522 4523# Require siginterrupt() in order to ensure that system calls are 4524# interrupted by default. 4525@requireAttrs(signal, "siginterrupt") 4526@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4527 "Don't have signal.alarm or signal.setitimer") 4528class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): 4529 # Test interrupting the recv*() methods with signals when a 4530 # timeout is set. 4531 4532 def setUp(self): 4533 super().setUp() 4534 self.serv.settimeout(self.timeout) 4535 4536 def checkInterruptedRecv(self, func, *args, **kwargs): 4537 # Check that func(*args, **kwargs) raises 4538 # errno of EINTR when interrupted by a signal. 4539 try: 4540 self.setAlarm(self.alarm_time) 4541 with self.assertRaises(ZeroDivisionError) as cm: 4542 func(*args, **kwargs) 4543 finally: 4544 self.setAlarm(0) 4545 4546 def testInterruptedRecvTimeout(self): 4547 self.checkInterruptedRecv(self.serv.recv, 1024) 4548 4549 def testInterruptedRecvIntoTimeout(self): 4550 self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) 4551 4552 def testInterruptedRecvfromTimeout(self): 4553 self.checkInterruptedRecv(self.serv.recvfrom, 1024) 4554 4555 def testInterruptedRecvfromIntoTimeout(self): 4556 self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) 4557 4558 @requireAttrs(socket.socket, "recvmsg") 4559 def testInterruptedRecvmsgTimeout(self): 4560 self.checkInterruptedRecv(self.serv.recvmsg, 1024) 4561 4562 @requireAttrs(socket.socket, "recvmsg_into") 4563 def testInterruptedRecvmsgIntoTimeout(self): 4564 self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) 4565 4566 4567# Require siginterrupt() in order to ensure that system calls are 4568# interrupted by default. 4569@requireAttrs(signal, "siginterrupt") 4570@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4571 "Don't have signal.alarm or signal.setitimer") 4572class InterruptedSendTimeoutTest(InterruptedTimeoutBase, 4573 ThreadSafeCleanupTestCase, 4574 SocketListeningTestMixin, TCPTestBase): 4575 # Test interrupting the interruptible send*() methods with signals 4576 # when a timeout is set. 4577 4578 def setUp(self): 4579 super().setUp() 4580 self.serv_conn = self.newSocket() 4581 self.addCleanup(self.serv_conn.close) 4582 # Use a thread to complete the connection, but wait for it to 4583 # terminate before running the test, so that there is only one 4584 # thread to accept the signal. 4585 cli_thread = threading.Thread(target=self.doConnect) 4586 cli_thread.start() 4587 self.cli_conn, addr = self.serv.accept() 4588 self.addCleanup(self.cli_conn.close) 4589 cli_thread.join() 4590 self.serv_conn.settimeout(self.timeout) 4591 4592 def doConnect(self): 4593 self.serv_conn.connect(self.serv_addr) 4594 4595 def checkInterruptedSend(self, func, *args, **kwargs): 4596 # Check that func(*args, **kwargs), run in a loop, raises 4597 # OSError with an errno of EINTR when interrupted by a 4598 # signal. 4599 try: 4600 with self.assertRaises(ZeroDivisionError) as cm: 4601 while True: 4602 self.setAlarm(self.alarm_time) 4603 func(*args, **kwargs) 4604 finally: 4605 self.setAlarm(0) 4606 4607 # Issue #12958: The following tests have problems on OS X prior to 10.7 4608 @support.requires_mac_ver(10, 7) 4609 def testInterruptedSendTimeout(self): 4610 self.checkInterruptedSend(self.serv_conn.send, b"a"*512) 4611 4612 @support.requires_mac_ver(10, 7) 4613 def testInterruptedSendtoTimeout(self): 4614 # Passing an actual address here as Python's wrapper for 4615 # sendto() doesn't allow passing a zero-length one; POSIX 4616 # requires that the address is ignored since the socket is 4617 # connection-mode, however. 4618 self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, 4619 self.serv_addr) 4620 4621 @support.requires_mac_ver(10, 7) 4622 @requireAttrs(socket.socket, "sendmsg") 4623 def testInterruptedSendmsgTimeout(self): 4624 self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) 4625 4626 4627class TCPCloserTest(ThreadedTCPSocketTest): 4628 4629 def testClose(self): 4630 conn, addr = self.serv.accept() 4631 conn.close() 4632 4633 sd = self.cli 4634 read, write, err = select.select([sd], [], [], 1.0) 4635 self.assertEqual(read, [sd]) 4636 self.assertEqual(sd.recv(1), b'') 4637 4638 # Calling close() many times should be safe. 4639 conn.close() 4640 conn.close() 4641 4642 def _testClose(self): 4643 self.cli.connect((HOST, self.port)) 4644 time.sleep(1.0) 4645 4646 4647class BasicSocketPairTest(SocketPairTest): 4648 4649 def __init__(self, methodName='runTest'): 4650 SocketPairTest.__init__(self, methodName=methodName) 4651 4652 def _check_defaults(self, sock): 4653 self.assertIsInstance(sock, socket.socket) 4654 if hasattr(socket, 'AF_UNIX'): 4655 self.assertEqual(sock.family, socket.AF_UNIX) 4656 else: 4657 self.assertEqual(sock.family, socket.AF_INET) 4658 self.assertEqual(sock.type, socket.SOCK_STREAM) 4659 self.assertEqual(sock.proto, 0) 4660 4661 def _testDefaults(self): 4662 self._check_defaults(self.cli) 4663 4664 def testDefaults(self): 4665 self._check_defaults(self.serv) 4666 4667 def testRecv(self): 4668 msg = self.serv.recv(1024) 4669 self.assertEqual(msg, MSG) 4670 4671 def _testRecv(self): 4672 self.cli.send(MSG) 4673 4674 def testSend(self): 4675 self.serv.send(MSG) 4676 4677 def _testSend(self): 4678 msg = self.cli.recv(1024) 4679 self.assertEqual(msg, MSG) 4680 4681 4682class PurePythonSocketPairTest(SocketPairTest): 4683 4684 # Explicitly use socketpair AF_INET or AF_INET6 to ensure that is the 4685 # code path we're using regardless platform is the pure python one where 4686 # `_socket.socketpair` does not exist. (AF_INET does not work with 4687 # _socket.socketpair on many platforms). 4688 def socketpair(self): 4689 # called by super().setUp(). 4690 try: 4691 return socket.socketpair(socket.AF_INET6) 4692 except OSError: 4693 return socket.socketpair(socket.AF_INET) 4694 # Local imports in this class make for easy security fix backporting. 4695 4696 def setUp(self): 4697 if hasattr(_socket, "socketpair"): 4698 self._orig_sp = socket.socketpair 4699 # This forces the version using the non-OS provided socketpair 4700 # emulation via an AF_INET socket in Lib/socket.py. 4701 socket.socketpair = socket._fallback_socketpair 4702 else: 4703 # This platform already uses the non-OS provided version. 4704 # This platform already uses the non-OS provided version. 4705 self._orig_sp = None 4706 super().setUp() 4707 4708 def tearDown(self): 4709 super().tearDown() 4710 4711 if self._orig_sp is not None: 4712 # Restore the default socket.socketpair definition. 4713 socket.socketpair = self._orig_sp 4714 4715 def test_recv(self): 4716 msg = self.serv.recv(1024) 4717 self.assertEqual(msg, MSG) 4718 def _test_recv(self): 4719 self.cli.send(MSG) 4720 def test_send(self): 4721 self.serv.send(MSG) 4722 def _test_send(self): 4723 msg = self.cli.recv(1024) 4724 self.assertEqual(msg, MSG) 4725 def test_ipv4(self): 4726 cli, srv = socket.socketpair(socket.AF_INET) 4727 cli.close() 4728 srv.close() 4729 def _test_ipv4(self): 4730 pass 4731 @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or 4732 not hasattr(_socket, 'IPV6_V6ONLY'), 4733 "IPV6_V6ONLY option not supported") 4734 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 4735 def test_ipv6(self): 4736 cli, srv = socket.socketpair(socket.AF_INET6) 4737 cli.close() 4738 srv.close() 4739 def _test_ipv6(self): 4740 pass 4741 def test_injected_authentication_failure(self): 4742 orig_getsockname = socket.socket.getsockname 4743 inject_sock = None 4744 def inject_getsocketname(self): 4745 nonlocal inject_sock 4746 sockname = orig_getsockname(self) 4747 # Connect to the listening socket ahead of the 4748 # client socket. 4749 if inject_sock is None: 4750 inject_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4751 inject_sock.setblocking(False) 4752 try: 4753 inject_sock.connect(sockname[:2]) 4754 except (BlockingIOError, InterruptedError): 4755 pass 4756 inject_sock.setblocking(True) 4757 return sockname 4758 sock1 = sock2 = None 4759 try: 4760 socket.socket.getsockname = inject_getsocketname 4761 with self.assertRaises(OSError): 4762 sock1, sock2 = socket.socketpair() 4763 finally: 4764 socket.socket.getsockname = orig_getsockname 4765 if inject_sock: 4766 inject_sock.close() 4767 if sock1: # This cleanup isn't needed on a successful test. 4768 sock1.close() 4769 if sock2: 4770 sock2.close() 4771 def _test_injected_authentication_failure(self): 4772 # No-op. Exists for base class threading infrastructure to call. 4773 # We could refactor this test into its own lesser class along with the 4774 # setUp and tearDown code to construct an ideal; it is simpler to keep 4775 # it here and live with extra overhead one this _one_ failure test. 4776 pass 4777 4778 4779class NonBlockingTCPTests(ThreadedTCPSocketTest): 4780 4781 def __init__(self, methodName='runTest'): 4782 self.event = threading.Event() 4783 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 4784 4785 def assert_sock_timeout(self, sock, timeout): 4786 self.assertEqual(self.serv.gettimeout(), timeout) 4787 4788 blocking = (timeout != 0.0) 4789 self.assertEqual(sock.getblocking(), blocking) 4790 4791 if fcntl is not None: 4792 # When a Python socket has a non-zero timeout, it's switched 4793 # internally to a non-blocking mode. Later, sock.sendall(), 4794 # sock.recv(), and other socket operations use a select() call and 4795 # handle EWOULDBLOCK/EGAIN on all socket operations. That's how 4796 # timeouts are enforced. 4797 fd_blocking = (timeout is None) 4798 4799 flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK) 4800 self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking) 4801 4802 def testSetBlocking(self): 4803 # Test setblocking() and settimeout() methods 4804 self.serv.setblocking(True) 4805 self.assert_sock_timeout(self.serv, None) 4806 4807 self.serv.setblocking(False) 4808 self.assert_sock_timeout(self.serv, 0.0) 4809 4810 self.serv.settimeout(None) 4811 self.assert_sock_timeout(self.serv, None) 4812 4813 self.serv.settimeout(0) 4814 self.assert_sock_timeout(self.serv, 0) 4815 4816 self.serv.settimeout(10) 4817 self.assert_sock_timeout(self.serv, 10) 4818 4819 self.serv.settimeout(0) 4820 self.assert_sock_timeout(self.serv, 0) 4821 4822 def _testSetBlocking(self): 4823 pass 4824 4825 @support.cpython_only 4826 def testSetBlocking_overflow(self): 4827 # Issue 15989 4828 import _testcapi 4829 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 4830 self.skipTest('needs UINT_MAX < ULONG_MAX') 4831 4832 self.serv.setblocking(False) 4833 self.assertEqual(self.serv.gettimeout(), 0.0) 4834 4835 self.serv.setblocking(_testcapi.UINT_MAX + 1) 4836 self.assertIsNone(self.serv.gettimeout()) 4837 4838 _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) 4839 4840 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 4841 'test needs socket.SOCK_NONBLOCK') 4842 @support.requires_linux_version(2, 6, 28) 4843 def testInitNonBlocking(self): 4844 # create a socket with SOCK_NONBLOCK 4845 self.serv.close() 4846 self.serv = socket.socket(socket.AF_INET, 4847 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 4848 self.assert_sock_timeout(self.serv, 0) 4849 4850 def _testInitNonBlocking(self): 4851 pass 4852 4853 def testInheritFlagsBlocking(self): 4854 # bpo-7995: accept() on a listening socket with a timeout and the 4855 # default timeout is None, the resulting socket must be blocking. 4856 with socket_setdefaulttimeout(None): 4857 self.serv.settimeout(10) 4858 conn, addr = self.serv.accept() 4859 self.addCleanup(conn.close) 4860 self.assertIsNone(conn.gettimeout()) 4861 4862 def _testInheritFlagsBlocking(self): 4863 self.cli.connect((HOST, self.port)) 4864 4865 def testInheritFlagsTimeout(self): 4866 # bpo-7995: accept() on a listening socket with a timeout and the 4867 # default timeout is None, the resulting socket must inherit 4868 # the default timeout. 4869 default_timeout = 20.0 4870 with socket_setdefaulttimeout(default_timeout): 4871 self.serv.settimeout(10) 4872 conn, addr = self.serv.accept() 4873 self.addCleanup(conn.close) 4874 self.assertEqual(conn.gettimeout(), default_timeout) 4875 4876 def _testInheritFlagsTimeout(self): 4877 self.cli.connect((HOST, self.port)) 4878 4879 def testAccept(self): 4880 # Testing non-blocking accept 4881 self.serv.setblocking(False) 4882 4883 # connect() didn't start: non-blocking accept() fails 4884 start_time = time.monotonic() 4885 with self.assertRaises(BlockingIOError): 4886 conn, addr = self.serv.accept() 4887 dt = time.monotonic() - start_time 4888 self.assertLess(dt, 1.0) 4889 4890 self.event.set() 4891 4892 read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT) 4893 if self.serv not in read: 4894 self.fail("Error trying to do accept after select.") 4895 4896 # connect() completed: non-blocking accept() doesn't block 4897 conn, addr = self.serv.accept() 4898 self.addCleanup(conn.close) 4899 self.assertIsNone(conn.gettimeout()) 4900 4901 def _testAccept(self): 4902 # don't connect before event is set to check 4903 # that non-blocking accept() raises BlockingIOError 4904 self.event.wait() 4905 4906 self.cli.connect((HOST, self.port)) 4907 4908 def testRecv(self): 4909 # Testing non-blocking recv 4910 conn, addr = self.serv.accept() 4911 self.addCleanup(conn.close) 4912 conn.setblocking(False) 4913 4914 # the server didn't send data yet: non-blocking recv() fails 4915 with self.assertRaises(BlockingIOError): 4916 msg = conn.recv(len(MSG)) 4917 4918 self.event.set() 4919 4920 read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT) 4921 if conn not in read: 4922 self.fail("Error during select call to non-blocking socket.") 4923 4924 # the server sent data yet: non-blocking recv() doesn't block 4925 msg = conn.recv(len(MSG)) 4926 self.assertEqual(msg, MSG) 4927 4928 def _testRecv(self): 4929 self.cli.connect((HOST, self.port)) 4930 4931 # don't send anything before event is set to check 4932 # that non-blocking recv() raises BlockingIOError 4933 self.event.wait() 4934 4935 # send data: recv() will no longer block 4936 self.cli.sendall(MSG) 4937 4938 4939class FileObjectClassTestCase(SocketConnectedTest): 4940 """Unit tests for the object returned by socket.makefile() 4941 4942 self.read_file is the io object returned by makefile() on 4943 the client connection. You can read from this file to 4944 get output from the server. 4945 4946 self.write_file is the io object returned by makefile() on the 4947 server connection. You can write to this file to send output 4948 to the client. 4949 """ 4950 4951 bufsize = -1 # Use default buffer size 4952 encoding = 'utf-8' 4953 errors = 'strict' 4954 newline = None 4955 4956 read_mode = 'rb' 4957 read_msg = MSG 4958 write_mode = 'wb' 4959 write_msg = MSG 4960 4961 def __init__(self, methodName='runTest'): 4962 SocketConnectedTest.__init__(self, methodName=methodName) 4963 4964 def setUp(self): 4965 self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ 4966 threading.Event() for i in range(4)] 4967 SocketConnectedTest.setUp(self) 4968 self.read_file = self.cli_conn.makefile( 4969 self.read_mode, self.bufsize, 4970 encoding = self.encoding, 4971 errors = self.errors, 4972 newline = self.newline) 4973 4974 def tearDown(self): 4975 self.serv_finished.set() 4976 self.read_file.close() 4977 self.assertTrue(self.read_file.closed) 4978 self.read_file = None 4979 SocketConnectedTest.tearDown(self) 4980 4981 def clientSetUp(self): 4982 SocketConnectedTest.clientSetUp(self) 4983 self.write_file = self.serv_conn.makefile( 4984 self.write_mode, self.bufsize, 4985 encoding = self.encoding, 4986 errors = self.errors, 4987 newline = self.newline) 4988 4989 def clientTearDown(self): 4990 self.cli_finished.set() 4991 self.write_file.close() 4992 self.assertTrue(self.write_file.closed) 4993 self.write_file = None 4994 SocketConnectedTest.clientTearDown(self) 4995 4996 def testReadAfterTimeout(self): 4997 # Issue #7322: A file object must disallow further reads 4998 # after a timeout has occurred. 4999 self.cli_conn.settimeout(1) 5000 self.read_file.read(3) 5001 # First read raises a timeout 5002 self.assertRaises(TimeoutError, self.read_file.read, 1) 5003 # Second read is disallowed 5004 with self.assertRaises(OSError) as ctx: 5005 self.read_file.read(1) 5006 self.assertIn("cannot read from timed out object", str(ctx.exception)) 5007 5008 def _testReadAfterTimeout(self): 5009 self.write_file.write(self.write_msg[0:3]) 5010 self.write_file.flush() 5011 self.serv_finished.wait() 5012 5013 def testSmallRead(self): 5014 # Performing small file read test 5015 first_seg = self.read_file.read(len(self.read_msg)-3) 5016 second_seg = self.read_file.read(3) 5017 msg = first_seg + second_seg 5018 self.assertEqual(msg, self.read_msg) 5019 5020 def _testSmallRead(self): 5021 self.write_file.write(self.write_msg) 5022 self.write_file.flush() 5023 5024 def testFullRead(self): 5025 # read until EOF 5026 msg = self.read_file.read() 5027 self.assertEqual(msg, self.read_msg) 5028 5029 def _testFullRead(self): 5030 self.write_file.write(self.write_msg) 5031 self.write_file.close() 5032 5033 def testUnbufferedRead(self): 5034 # Performing unbuffered file read test 5035 buf = type(self.read_msg)() 5036 while 1: 5037 char = self.read_file.read(1) 5038 if not char: 5039 break 5040 buf += char 5041 self.assertEqual(buf, self.read_msg) 5042 5043 def _testUnbufferedRead(self): 5044 self.write_file.write(self.write_msg) 5045 self.write_file.flush() 5046 5047 def testReadline(self): 5048 # Performing file readline test 5049 line = self.read_file.readline() 5050 self.assertEqual(line, self.read_msg) 5051 5052 def _testReadline(self): 5053 self.write_file.write(self.write_msg) 5054 self.write_file.flush() 5055 5056 def testCloseAfterMakefile(self): 5057 # The file returned by makefile should keep the socket open. 5058 self.cli_conn.close() 5059 # read until EOF 5060 msg = self.read_file.read() 5061 self.assertEqual(msg, self.read_msg) 5062 5063 def _testCloseAfterMakefile(self): 5064 self.write_file.write(self.write_msg) 5065 self.write_file.flush() 5066 5067 def testMakefileAfterMakefileClose(self): 5068 self.read_file.close() 5069 msg = self.cli_conn.recv(len(MSG)) 5070 if isinstance(self.read_msg, str): 5071 msg = msg.decode() 5072 self.assertEqual(msg, self.read_msg) 5073 5074 def _testMakefileAfterMakefileClose(self): 5075 self.write_file.write(self.write_msg) 5076 self.write_file.flush() 5077 5078 def testClosedAttr(self): 5079 self.assertTrue(not self.read_file.closed) 5080 5081 def _testClosedAttr(self): 5082 self.assertTrue(not self.write_file.closed) 5083 5084 def testAttributes(self): 5085 self.assertEqual(self.read_file.mode, self.read_mode) 5086 self.assertEqual(self.read_file.name, self.cli_conn.fileno()) 5087 5088 def _testAttributes(self): 5089 self.assertEqual(self.write_file.mode, self.write_mode) 5090 self.assertEqual(self.write_file.name, self.serv_conn.fileno()) 5091 5092 def testRealClose(self): 5093 self.read_file.close() 5094 self.assertRaises(ValueError, self.read_file.fileno) 5095 self.cli_conn.close() 5096 self.assertRaises(OSError, self.cli_conn.getsockname) 5097 5098 def _testRealClose(self): 5099 pass 5100 5101 5102class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 5103 5104 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 5105 5106 In this case (and in this case only), it should be possible to 5107 create a file object, read a line from it, create another file 5108 object, read another line from it, without loss of data in the 5109 first file object's buffer. Note that http.client relies on this 5110 when reading multiple requests from the same socket.""" 5111 5112 bufsize = 0 # Use unbuffered mode 5113 5114 def testUnbufferedReadline(self): 5115 # Read a line, create a new file object, read another line with it 5116 line = self.read_file.readline() # first line 5117 self.assertEqual(line, b"A. " + self.write_msg) # first line 5118 self.read_file = self.cli_conn.makefile('rb', 0) 5119 line = self.read_file.readline() # second line 5120 self.assertEqual(line, b"B. " + self.write_msg) # second line 5121 5122 def _testUnbufferedReadline(self): 5123 self.write_file.write(b"A. " + self.write_msg) 5124 self.write_file.write(b"B. " + self.write_msg) 5125 self.write_file.flush() 5126 5127 def testMakefileClose(self): 5128 # The file returned by makefile should keep the socket open... 5129 self.cli_conn.close() 5130 msg = self.cli_conn.recv(1024) 5131 self.assertEqual(msg, self.read_msg) 5132 # ...until the file is itself closed 5133 self.read_file.close() 5134 self.assertRaises(OSError, self.cli_conn.recv, 1024) 5135 5136 def _testMakefileClose(self): 5137 self.write_file.write(self.write_msg) 5138 self.write_file.flush() 5139 5140 def testMakefileCloseSocketDestroy(self): 5141 refcount_before = sys.getrefcount(self.cli_conn) 5142 self.read_file.close() 5143 refcount_after = sys.getrefcount(self.cli_conn) 5144 self.assertEqual(refcount_before - 1, refcount_after) 5145 5146 def _testMakefileCloseSocketDestroy(self): 5147 pass 5148 5149 # Non-blocking ops 5150 # NOTE: to set `read_file` as non-blocking, we must call 5151 # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). 5152 5153 def testSmallReadNonBlocking(self): 5154 self.cli_conn.setblocking(False) 5155 self.assertEqual(self.read_file.readinto(bytearray(10)), None) 5156 self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) 5157 self.evt1.set() 5158 self.evt2.wait(1.0) 5159 first_seg = self.read_file.read(len(self.read_msg) - 3) 5160 if first_seg is None: 5161 # Data not arrived (can happen under Windows), wait a bit 5162 time.sleep(0.5) 5163 first_seg = self.read_file.read(len(self.read_msg) - 3) 5164 buf = bytearray(10) 5165 n = self.read_file.readinto(buf) 5166 self.assertEqual(n, 3) 5167 msg = first_seg + buf[:n] 5168 self.assertEqual(msg, self.read_msg) 5169 self.assertEqual(self.read_file.readinto(bytearray(16)), None) 5170 self.assertEqual(self.read_file.read(1), None) 5171 5172 def _testSmallReadNonBlocking(self): 5173 self.evt1.wait(1.0) 5174 self.write_file.write(self.write_msg) 5175 self.write_file.flush() 5176 self.evt2.set() 5177 # Avoid closing the socket before the server test has finished, 5178 # otherwise system recv() will return 0 instead of EWOULDBLOCK. 5179 self.serv_finished.wait(5.0) 5180 5181 def testWriteNonBlocking(self): 5182 self.cli_finished.wait(5.0) 5183 # The client thread can't skip directly - the SkipTest exception 5184 # would appear as a failure. 5185 if self.serv_skipped: 5186 self.skipTest(self.serv_skipped) 5187 5188 def _testWriteNonBlocking(self): 5189 self.serv_skipped = None 5190 self.serv_conn.setblocking(False) 5191 # Try to saturate the socket buffer pipe with repeated large writes. 5192 BIG = b"x" * support.SOCK_MAX_SIZE 5193 LIMIT = 10 5194 # The first write() succeeds since a chunk of data can be buffered 5195 n = self.write_file.write(BIG) 5196 self.assertGreater(n, 0) 5197 for i in range(LIMIT): 5198 n = self.write_file.write(BIG) 5199 if n is None: 5200 # Succeeded 5201 break 5202 self.assertGreater(n, 0) 5203 else: 5204 # Let us know that this test didn't manage to establish 5205 # the expected conditions. This is not a failure in itself but, 5206 # if it happens repeatedly, the test should be fixed. 5207 self.serv_skipped = "failed to saturate the socket buffer" 5208 5209 5210class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5211 5212 bufsize = 1 # Default-buffered for reading; line-buffered for writing 5213 5214 5215class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5216 5217 bufsize = 2 # Exercise the buffering code 5218 5219 5220class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): 5221 """Tests for socket.makefile() in text mode (rather than binary)""" 5222 5223 read_mode = 'r' 5224 read_msg = MSG.decode('utf-8') 5225 write_mode = 'wb' 5226 write_msg = MSG 5227 newline = '' 5228 5229 5230class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): 5231 """Tests for socket.makefile() in text mode (rather than binary)""" 5232 5233 read_mode = 'rb' 5234 read_msg = MSG 5235 write_mode = 'w' 5236 write_msg = MSG.decode('utf-8') 5237 newline = '' 5238 5239 5240class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): 5241 """Tests for socket.makefile() in text mode (rather than binary)""" 5242 5243 read_mode = 'r' 5244 read_msg = MSG.decode('utf-8') 5245 write_mode = 'w' 5246 write_msg = MSG.decode('utf-8') 5247 newline = '' 5248 5249 5250class NetworkConnectionTest(object): 5251 """Prove network connection.""" 5252 5253 def clientSetUp(self): 5254 # We're inherited below by BasicTCPTest2, which also inherits 5255 # BasicTCPTest, which defines self.port referenced below. 5256 self.cli = socket.create_connection((HOST, self.port)) 5257 self.serv_conn = self.cli 5258 5259class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 5260 """Tests that NetworkConnection does not break existing TCP functionality. 5261 """ 5262 5263class NetworkConnectionNoServer(unittest.TestCase): 5264 5265 class MockSocket(socket.socket): 5266 def connect(self, *args): 5267 raise TimeoutError('timed out') 5268 5269 @contextlib.contextmanager 5270 def mocked_socket_module(self): 5271 """Return a socket which times out on connect""" 5272 old_socket = socket.socket 5273 socket.socket = self.MockSocket 5274 try: 5275 yield 5276 finally: 5277 socket.socket = old_socket 5278 5279 def test_connect(self): 5280 port = socket_helper.find_unused_port() 5281 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5282 self.addCleanup(cli.close) 5283 with self.assertRaises(OSError) as cm: 5284 cli.connect((HOST, port)) 5285 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 5286 5287 def test_create_connection(self): 5288 # Issue #9792: errors raised by create_connection() should have 5289 # a proper errno attribute. 5290 port = socket_helper.find_unused_port() 5291 with self.assertRaises(OSError) as cm: 5292 socket.create_connection((HOST, port)) 5293 5294 # Issue #16257: create_connection() calls getaddrinfo() against 5295 # 'localhost'. This may result in an IPV6 addr being returned 5296 # as well as an IPV4 one: 5297 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 5298 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 5299 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 5300 # 5301 # create_connection() enumerates through all the addresses returned 5302 # and if it doesn't successfully bind to any of them, it propagates 5303 # the last exception it encountered. 5304 # 5305 # On Solaris, ENETUNREACH is returned in this circumstance instead 5306 # of ECONNREFUSED. So, if that errno exists, add it to our list of 5307 # expected errnos. 5308 expected_errnos = socket_helper.get_socket_conn_refused_errs() 5309 self.assertIn(cm.exception.errno, expected_errnos) 5310 5311 def test_create_connection_all_errors(self): 5312 port = socket_helper.find_unused_port() 5313 try: 5314 socket.create_connection((HOST, port), all_errors=True) 5315 except ExceptionGroup as e: 5316 eg = e 5317 else: 5318 self.fail('expected connection to fail') 5319 5320 self.assertIsInstance(eg, ExceptionGroup) 5321 for e in eg.exceptions: 5322 self.assertIsInstance(e, OSError) 5323 5324 addresses = socket.getaddrinfo( 5325 'localhost', port, 0, socket.SOCK_STREAM) 5326 # assert that we got an exception for each address 5327 self.assertEqual(len(addresses), len(eg.exceptions)) 5328 5329 def test_create_connection_timeout(self): 5330 # Issue #9792: create_connection() should not recast timeout errors 5331 # as generic socket errors. 5332 with self.mocked_socket_module(): 5333 try: 5334 socket.create_connection((HOST, 1234)) 5335 except TimeoutError: 5336 pass 5337 except OSError as exc: 5338 if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: 5339 raise 5340 else: 5341 self.fail('TimeoutError not raised') 5342 5343 5344class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 5345 5346 def __init__(self, methodName='runTest'): 5347 SocketTCPTest.__init__(self, methodName=methodName) 5348 ThreadableTest.__init__(self) 5349 5350 def clientSetUp(self): 5351 self.source_port = socket_helper.find_unused_port() 5352 5353 def clientTearDown(self): 5354 self.cli.close() 5355 self.cli = None 5356 ThreadableTest.clientTearDown(self) 5357 5358 def _justAccept(self): 5359 conn, addr = self.serv.accept() 5360 conn.close() 5361 5362 testFamily = _justAccept 5363 def _testFamily(self): 5364 self.cli = socket.create_connection((HOST, self.port), 5365 timeout=support.LOOPBACK_TIMEOUT) 5366 self.addCleanup(self.cli.close) 5367 self.assertEqual(self.cli.family, 2) 5368 5369 testSourceAddress = _justAccept 5370 def _testSourceAddress(self): 5371 self.cli = socket.create_connection((HOST, self.port), 5372 timeout=support.LOOPBACK_TIMEOUT, 5373 source_address=('', self.source_port)) 5374 self.addCleanup(self.cli.close) 5375 self.assertEqual(self.cli.getsockname()[1], self.source_port) 5376 # The port number being used is sufficient to show that the bind() 5377 # call happened. 5378 5379 testTimeoutDefault = _justAccept 5380 def _testTimeoutDefault(self): 5381 # passing no explicit timeout uses socket's global default 5382 self.assertTrue(socket.getdefaulttimeout() is None) 5383 socket.setdefaulttimeout(42) 5384 try: 5385 self.cli = socket.create_connection((HOST, self.port)) 5386 self.addCleanup(self.cli.close) 5387 finally: 5388 socket.setdefaulttimeout(None) 5389 self.assertEqual(self.cli.gettimeout(), 42) 5390 5391 testTimeoutNone = _justAccept 5392 def _testTimeoutNone(self): 5393 # None timeout means the same as sock.settimeout(None) 5394 self.assertTrue(socket.getdefaulttimeout() is None) 5395 socket.setdefaulttimeout(30) 5396 try: 5397 self.cli = socket.create_connection((HOST, self.port), timeout=None) 5398 self.addCleanup(self.cli.close) 5399 finally: 5400 socket.setdefaulttimeout(None) 5401 self.assertEqual(self.cli.gettimeout(), None) 5402 5403 testTimeoutValueNamed = _justAccept 5404 def _testTimeoutValueNamed(self): 5405 self.cli = socket.create_connection((HOST, self.port), timeout=30) 5406 self.assertEqual(self.cli.gettimeout(), 30) 5407 5408 testTimeoutValueNonamed = _justAccept 5409 def _testTimeoutValueNonamed(self): 5410 self.cli = socket.create_connection((HOST, self.port), 30) 5411 self.addCleanup(self.cli.close) 5412 self.assertEqual(self.cli.gettimeout(), 30) 5413 5414 5415class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 5416 5417 def __init__(self, methodName='runTest'): 5418 SocketTCPTest.__init__(self, methodName=methodName) 5419 ThreadableTest.__init__(self) 5420 5421 def clientSetUp(self): 5422 pass 5423 5424 def clientTearDown(self): 5425 self.cli.close() 5426 self.cli = None 5427 ThreadableTest.clientTearDown(self) 5428 5429 def testInsideTimeout(self): 5430 conn, addr = self.serv.accept() 5431 self.addCleanup(conn.close) 5432 time.sleep(3) 5433 conn.send(b"done!") 5434 testOutsideTimeout = testInsideTimeout 5435 5436 def _testInsideTimeout(self): 5437 self.cli = sock = socket.create_connection((HOST, self.port)) 5438 data = sock.recv(5) 5439 self.assertEqual(data, b"done!") 5440 5441 def _testOutsideTimeout(self): 5442 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 5443 self.assertRaises(TimeoutError, lambda: sock.recv(5)) 5444 5445 5446class TCPTimeoutTest(SocketTCPTest): 5447 5448 def testTCPTimeout(self): 5449 def raise_timeout(*args, **kwargs): 5450 self.serv.settimeout(1.0) 5451 self.serv.accept() 5452 self.assertRaises(TimeoutError, raise_timeout, 5453 "Error generating a timeout exception (TCP)") 5454 5455 def testTimeoutZero(self): 5456 ok = False 5457 try: 5458 self.serv.settimeout(0.0) 5459 foo = self.serv.accept() 5460 except TimeoutError: 5461 self.fail("caught timeout instead of error (TCP)") 5462 except OSError: 5463 ok = True 5464 except: 5465 self.fail("caught unexpected exception (TCP)") 5466 if not ok: 5467 self.fail("accept() returned success when we did not expect it") 5468 5469 @unittest.skipUnless(hasattr(signal, 'alarm'), 5470 'test needs signal.alarm()') 5471 def testInterruptedTimeout(self): 5472 # XXX I don't know how to do this test on MSWindows or any other 5473 # platform that doesn't support signal.alarm() or os.kill(), though 5474 # the bug should have existed on all platforms. 5475 self.serv.settimeout(5.0) # must be longer than alarm 5476 class Alarm(Exception): 5477 pass 5478 def alarm_handler(signal, frame): 5479 raise Alarm 5480 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 5481 try: 5482 try: 5483 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 5484 foo = self.serv.accept() 5485 except TimeoutError: 5486 self.fail("caught timeout instead of Alarm") 5487 except Alarm: 5488 pass 5489 except: 5490 self.fail("caught other exception instead of Alarm:" 5491 " %s(%s):\n%s" % 5492 (sys.exc_info()[:2] + (traceback.format_exc(),))) 5493 else: 5494 self.fail("nothing caught") 5495 finally: 5496 signal.alarm(0) # shut off alarm 5497 except Alarm: 5498 self.fail("got Alarm in wrong place") 5499 finally: 5500 # no alarm can be pending. Safe to restore old handler. 5501 signal.signal(signal.SIGALRM, old_alarm) 5502 5503class UDPTimeoutTest(SocketUDPTest): 5504 5505 def testUDPTimeout(self): 5506 def raise_timeout(*args, **kwargs): 5507 self.serv.settimeout(1.0) 5508 self.serv.recv(1024) 5509 self.assertRaises(TimeoutError, raise_timeout, 5510 "Error generating a timeout exception (UDP)") 5511 5512 def testTimeoutZero(self): 5513 ok = False 5514 try: 5515 self.serv.settimeout(0.0) 5516 foo = self.serv.recv(1024) 5517 except TimeoutError: 5518 self.fail("caught timeout instead of error (UDP)") 5519 except OSError: 5520 ok = True 5521 except: 5522 self.fail("caught unexpected exception (UDP)") 5523 if not ok: 5524 self.fail("recv() returned success when we did not expect it") 5525 5526@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 5527 'UDPLITE sockets required for this test.') 5528class UDPLITETimeoutTest(SocketUDPLITETest): 5529 5530 def testUDPLITETimeout(self): 5531 def raise_timeout(*args, **kwargs): 5532 self.serv.settimeout(1.0) 5533 self.serv.recv(1024) 5534 self.assertRaises(TimeoutError, raise_timeout, 5535 "Error generating a timeout exception (UDPLITE)") 5536 5537 def testTimeoutZero(self): 5538 ok = False 5539 try: 5540 self.serv.settimeout(0.0) 5541 foo = self.serv.recv(1024) 5542 except TimeoutError: 5543 self.fail("caught timeout instead of error (UDPLITE)") 5544 except OSError: 5545 ok = True 5546 except: 5547 self.fail("caught unexpected exception (UDPLITE)") 5548 if not ok: 5549 self.fail("recv() returned success when we did not expect it") 5550 5551class TestExceptions(unittest.TestCase): 5552 5553 def testExceptionTree(self): 5554 self.assertTrue(issubclass(OSError, Exception)) 5555 self.assertTrue(issubclass(socket.herror, OSError)) 5556 self.assertTrue(issubclass(socket.gaierror, OSError)) 5557 self.assertTrue(issubclass(socket.timeout, OSError)) 5558 self.assertIs(socket.error, OSError) 5559 self.assertIs(socket.timeout, TimeoutError) 5560 5561 def test_setblocking_invalidfd(self): 5562 # Regression test for issue #28471 5563 5564 sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 5565 sock = socket.socket( 5566 socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) 5567 sock0.close() 5568 self.addCleanup(sock.detach) 5569 5570 with self.assertRaises(OSError): 5571 sock.setblocking(False) 5572 5573 5574@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 5575class TestLinuxAbstractNamespace(unittest.TestCase): 5576 5577 UNIX_PATH_MAX = 108 5578 5579 def testLinuxAbstractNamespace(self): 5580 address = b"\x00python-test-hello\x00\xff" 5581 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 5582 s1.bind(address) 5583 s1.listen() 5584 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 5585 s2.connect(s1.getsockname()) 5586 with s1.accept()[0] as s3: 5587 self.assertEqual(s1.getsockname(), address) 5588 self.assertEqual(s2.getpeername(), address) 5589 5590 def testMaxName(self): 5591 address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) 5592 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5593 s.bind(address) 5594 self.assertEqual(s.getsockname(), address) 5595 5596 def testNameOverflow(self): 5597 address = "\x00" + "h" * self.UNIX_PATH_MAX 5598 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5599 self.assertRaises(OSError, s.bind, address) 5600 5601 def testStrName(self): 5602 # Check that an abstract name can be passed as a string. 5603 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5604 try: 5605 s.bind("\x00python\x00test\x00") 5606 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5607 finally: 5608 s.close() 5609 5610 def testBytearrayName(self): 5611 # Check that an abstract name can be passed as a bytearray. 5612 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5613 s.bind(bytearray(b"\x00python\x00test\x00")) 5614 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5615 5616 def testAutobind(self): 5617 # Check that binding to an empty string binds to an available address 5618 # in the abstract namespace as specified in unix(7) "Autobind feature". 5619 abstract_address = b"^\0[0-9a-f]{5}" 5620 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 5621 s1.bind("") 5622 self.assertRegex(s1.getsockname(), abstract_address) 5623 # Each socket is bound to a different abstract address. 5624 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 5625 s2.bind("") 5626 self.assertRegex(s2.getsockname(), abstract_address) 5627 self.assertNotEqual(s1.getsockname(), s2.getsockname()) 5628 5629 5630@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') 5631class TestUnixDomain(unittest.TestCase): 5632 5633 def setUp(self): 5634 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5635 5636 def tearDown(self): 5637 self.sock.close() 5638 5639 def encoded(self, path): 5640 # Return the given path encoded in the file system encoding, 5641 # or skip the test if this is not possible. 5642 try: 5643 return os.fsencode(path) 5644 except UnicodeEncodeError: 5645 self.skipTest( 5646 "Pathname {0!a} cannot be represented in file " 5647 "system encoding {1!r}".format( 5648 path, sys.getfilesystemencoding())) 5649 5650 def bind(self, sock, path): 5651 # Bind the socket 5652 try: 5653 socket_helper.bind_unix_socket(sock, path) 5654 except OSError as e: 5655 if str(e) == "AF_UNIX path too long": 5656 self.skipTest( 5657 "Pathname {0!a} is too long to serve as an AF_UNIX path" 5658 .format(path)) 5659 else: 5660 raise 5661 5662 def testUnbound(self): 5663 # Issue #30205 (note getsockname() can return None on OS X) 5664 self.assertIn(self.sock.getsockname(), ('', None)) 5665 5666 def testStrAddr(self): 5667 # Test binding to and retrieving a normal string pathname. 5668 path = os.path.abspath(os_helper.TESTFN) 5669 self.bind(self.sock, path) 5670 self.addCleanup(os_helper.unlink, path) 5671 self.assertEqual(self.sock.getsockname(), path) 5672 5673 def testBytesAddr(self): 5674 # Test binding to a bytes pathname. 5675 path = os.path.abspath(os_helper.TESTFN) 5676 self.bind(self.sock, self.encoded(path)) 5677 self.addCleanup(os_helper.unlink, path) 5678 self.assertEqual(self.sock.getsockname(), path) 5679 5680 def testSurrogateescapeBind(self): 5681 # Test binding to a valid non-ASCII pathname, with the 5682 # non-ASCII bytes supplied using surrogateescape encoding. 5683 path = os.path.abspath(os_helper.TESTFN_UNICODE) 5684 b = self.encoded(path) 5685 self.bind(self.sock, b.decode("ascii", "surrogateescape")) 5686 self.addCleanup(os_helper.unlink, path) 5687 self.assertEqual(self.sock.getsockname(), path) 5688 5689 def testUnencodableAddr(self): 5690 # Test binding to a pathname that cannot be encoded in the 5691 # file system encoding. 5692 if os_helper.TESTFN_UNENCODABLE is None: 5693 self.skipTest("No unencodable filename available") 5694 path = os.path.abspath(os_helper.TESTFN_UNENCODABLE) 5695 self.bind(self.sock, path) 5696 self.addCleanup(os_helper.unlink, path) 5697 self.assertEqual(self.sock.getsockname(), path) 5698 5699 @unittest.skipIf(sys.platform == 'linux', 'Linux specific test') 5700 def testEmptyAddress(self): 5701 # Test that binding empty address fails. 5702 self.assertRaises(OSError, self.sock.bind, "") 5703 5704 5705class BufferIOTest(SocketConnectedTest): 5706 """ 5707 Test the buffer versions of socket.recv() and socket.send(). 5708 """ 5709 def __init__(self, methodName='runTest'): 5710 SocketConnectedTest.__init__(self, methodName=methodName) 5711 5712 def testRecvIntoArray(self): 5713 buf = array.array("B", [0] * len(MSG)) 5714 nbytes = self.cli_conn.recv_into(buf) 5715 self.assertEqual(nbytes, len(MSG)) 5716 buf = buf.tobytes() 5717 msg = buf[:len(MSG)] 5718 self.assertEqual(msg, MSG) 5719 5720 def _testRecvIntoArray(self): 5721 buf = bytes(MSG) 5722 self.serv_conn.send(buf) 5723 5724 def testRecvIntoBytearray(self): 5725 buf = bytearray(1024) 5726 nbytes = self.cli_conn.recv_into(buf) 5727 self.assertEqual(nbytes, len(MSG)) 5728 msg = buf[:len(MSG)] 5729 self.assertEqual(msg, MSG) 5730 5731 _testRecvIntoBytearray = _testRecvIntoArray 5732 5733 def testRecvIntoMemoryview(self): 5734 buf = bytearray(1024) 5735 nbytes = self.cli_conn.recv_into(memoryview(buf)) 5736 self.assertEqual(nbytes, len(MSG)) 5737 msg = buf[:len(MSG)] 5738 self.assertEqual(msg, MSG) 5739 5740 _testRecvIntoMemoryview = _testRecvIntoArray 5741 5742 def testRecvFromIntoArray(self): 5743 buf = array.array("B", [0] * len(MSG)) 5744 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5745 self.assertEqual(nbytes, len(MSG)) 5746 buf = buf.tobytes() 5747 msg = buf[:len(MSG)] 5748 self.assertEqual(msg, MSG) 5749 5750 def _testRecvFromIntoArray(self): 5751 buf = bytes(MSG) 5752 self.serv_conn.send(buf) 5753 5754 def testRecvFromIntoBytearray(self): 5755 buf = bytearray(1024) 5756 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5757 self.assertEqual(nbytes, len(MSG)) 5758 msg = buf[:len(MSG)] 5759 self.assertEqual(msg, MSG) 5760 5761 _testRecvFromIntoBytearray = _testRecvFromIntoArray 5762 5763 def testRecvFromIntoMemoryview(self): 5764 buf = bytearray(1024) 5765 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 5766 self.assertEqual(nbytes, len(MSG)) 5767 msg = buf[:len(MSG)] 5768 self.assertEqual(msg, MSG) 5769 5770 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 5771 5772 def testRecvFromIntoSmallBuffer(self): 5773 # See issue #20246. 5774 buf = bytearray(8) 5775 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 5776 5777 def _testRecvFromIntoSmallBuffer(self): 5778 self.serv_conn.send(MSG) 5779 5780 def testRecvFromIntoEmptyBuffer(self): 5781 buf = bytearray() 5782 self.cli_conn.recvfrom_into(buf) 5783 self.cli_conn.recvfrom_into(buf, 0) 5784 5785 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 5786 5787 5788TIPC_STYPE = 2000 5789TIPC_LOWER = 200 5790TIPC_UPPER = 210 5791 5792def isTipcAvailable(): 5793 """Check if the TIPC module is loaded 5794 5795 The TIPC module is not loaded automatically on Ubuntu and probably 5796 other Linux distros. 5797 """ 5798 if not hasattr(socket, "AF_TIPC"): 5799 return False 5800 try: 5801 f = open("/proc/modules", encoding="utf-8") 5802 except (FileNotFoundError, IsADirectoryError, PermissionError): 5803 # It's ok if the file does not exist, is a directory or if we 5804 # have not the permission to read it. 5805 return False 5806 with f: 5807 for line in f: 5808 if line.startswith("tipc "): 5809 return True 5810 return False 5811 5812@unittest.skipUnless(isTipcAvailable(), 5813 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5814class TIPCTest(unittest.TestCase): 5815 def testRDM(self): 5816 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5817 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5818 self.addCleanup(srv.close) 5819 self.addCleanup(cli.close) 5820 5821 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5822 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5823 TIPC_LOWER, TIPC_UPPER) 5824 srv.bind(srvaddr) 5825 5826 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5827 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5828 cli.sendto(MSG, sendaddr) 5829 5830 msg, recvaddr = srv.recvfrom(1024) 5831 5832 self.assertEqual(cli.getsockname(), recvaddr) 5833 self.assertEqual(msg, MSG) 5834 5835 5836@unittest.skipUnless(isTipcAvailable(), 5837 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5838class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 5839 def __init__(self, methodName = 'runTest'): 5840 unittest.TestCase.__init__(self, methodName = methodName) 5841 ThreadableTest.__init__(self) 5842 5843 def setUp(self): 5844 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5845 self.addCleanup(self.srv.close) 5846 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5847 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5848 TIPC_LOWER, TIPC_UPPER) 5849 self.srv.bind(srvaddr) 5850 self.srv.listen() 5851 self.serverExplicitReady() 5852 self.conn, self.connaddr = self.srv.accept() 5853 self.addCleanup(self.conn.close) 5854 5855 def clientSetUp(self): 5856 # There is a hittable race between serverExplicitReady() and the 5857 # accept() call; sleep a little while to avoid it, otherwise 5858 # we could get an exception 5859 time.sleep(0.1) 5860 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5861 self.addCleanup(self.cli.close) 5862 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5863 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5864 self.cli.connect(addr) 5865 self.cliaddr = self.cli.getsockname() 5866 5867 def testStream(self): 5868 msg = self.conn.recv(1024) 5869 self.assertEqual(msg, MSG) 5870 self.assertEqual(self.cliaddr, self.connaddr) 5871 5872 def _testStream(self): 5873 self.cli.send(MSG) 5874 self.cli.close() 5875 5876 5877class ContextManagersTest(ThreadedTCPSocketTest): 5878 5879 def _testSocketClass(self): 5880 # base test 5881 with socket.socket() as sock: 5882 self.assertFalse(sock._closed) 5883 self.assertTrue(sock._closed) 5884 # close inside with block 5885 with socket.socket() as sock: 5886 sock.close() 5887 self.assertTrue(sock._closed) 5888 # exception inside with block 5889 with socket.socket() as sock: 5890 self.assertRaises(OSError, sock.sendall, b'foo') 5891 self.assertTrue(sock._closed) 5892 5893 def testCreateConnectionBase(self): 5894 conn, addr = self.serv.accept() 5895 self.addCleanup(conn.close) 5896 data = conn.recv(1024) 5897 conn.sendall(data) 5898 5899 def _testCreateConnectionBase(self): 5900 address = self.serv.getsockname() 5901 with socket.create_connection(address) as sock: 5902 self.assertFalse(sock._closed) 5903 sock.sendall(b'foo') 5904 self.assertEqual(sock.recv(1024), b'foo') 5905 self.assertTrue(sock._closed) 5906 5907 def testCreateConnectionClose(self): 5908 conn, addr = self.serv.accept() 5909 self.addCleanup(conn.close) 5910 data = conn.recv(1024) 5911 conn.sendall(data) 5912 5913 def _testCreateConnectionClose(self): 5914 address = self.serv.getsockname() 5915 with socket.create_connection(address) as sock: 5916 sock.close() 5917 self.assertTrue(sock._closed) 5918 self.assertRaises(OSError, sock.sendall, b'foo') 5919 5920 5921class InheritanceTest(unittest.TestCase): 5922 @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), 5923 "SOCK_CLOEXEC not defined") 5924 @support.requires_linux_version(2, 6, 28) 5925 def test_SOCK_CLOEXEC(self): 5926 with socket.socket(socket.AF_INET, 5927 socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: 5928 self.assertEqual(s.type, socket.SOCK_STREAM) 5929 self.assertFalse(s.get_inheritable()) 5930 5931 def test_default_inheritable(self): 5932 sock = socket.socket() 5933 with sock: 5934 self.assertEqual(sock.get_inheritable(), False) 5935 5936 def test_dup(self): 5937 sock = socket.socket() 5938 with sock: 5939 newsock = sock.dup() 5940 sock.close() 5941 with newsock: 5942 self.assertEqual(newsock.get_inheritable(), False) 5943 5944 def test_set_inheritable(self): 5945 sock = socket.socket() 5946 with sock: 5947 sock.set_inheritable(True) 5948 self.assertEqual(sock.get_inheritable(), True) 5949 5950 sock.set_inheritable(False) 5951 self.assertEqual(sock.get_inheritable(), False) 5952 5953 @unittest.skipIf(fcntl is None, "need fcntl") 5954 def test_get_inheritable_cloexec(self): 5955 sock = socket.socket() 5956 with sock: 5957 fd = sock.fileno() 5958 self.assertEqual(sock.get_inheritable(), False) 5959 5960 # clear FD_CLOEXEC flag 5961 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 5962 flags &= ~fcntl.FD_CLOEXEC 5963 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 5964 5965 self.assertEqual(sock.get_inheritable(), True) 5966 5967 @unittest.skipIf(fcntl is None, "need fcntl") 5968 def test_set_inheritable_cloexec(self): 5969 sock = socket.socket() 5970 with sock: 5971 fd = sock.fileno() 5972 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5973 fcntl.FD_CLOEXEC) 5974 5975 sock.set_inheritable(True) 5976 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5977 0) 5978 5979 5980 def test_socketpair(self): 5981 s1, s2 = socket.socketpair() 5982 self.addCleanup(s1.close) 5983 self.addCleanup(s2.close) 5984 self.assertEqual(s1.get_inheritable(), False) 5985 self.assertEqual(s2.get_inheritable(), False) 5986 5987 5988@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), 5989 "SOCK_NONBLOCK not defined") 5990class NonblockConstantTest(unittest.TestCase): 5991 def checkNonblock(self, s, nonblock=True, timeout=0.0): 5992 if nonblock: 5993 self.assertEqual(s.type, socket.SOCK_STREAM) 5994 self.assertEqual(s.gettimeout(), timeout) 5995 self.assertTrue( 5996 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5997 if timeout == 0: 5998 # timeout == 0: means that getblocking() must be False. 5999 self.assertFalse(s.getblocking()) 6000 else: 6001 # If timeout > 0, the socket will be in a "blocking" mode 6002 # from the standpoint of the Python API. For Python socket 6003 # object, "blocking" means that operations like 'sock.recv()' 6004 # will block. Internally, file descriptors for 6005 # "blocking" Python sockets *with timeouts* are in a 6006 # *non-blocking* mode, and 'sock.recv()' uses 'select()' 6007 # and handles EWOULDBLOCK/EAGAIN to enforce the timeout. 6008 self.assertTrue(s.getblocking()) 6009 else: 6010 self.assertEqual(s.type, socket.SOCK_STREAM) 6011 self.assertEqual(s.gettimeout(), None) 6012 self.assertFalse( 6013 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 6014 self.assertTrue(s.getblocking()) 6015 6016 @support.requires_linux_version(2, 6, 28) 6017 def test_SOCK_NONBLOCK(self): 6018 # a lot of it seems silly and redundant, but I wanted to test that 6019 # changing back and forth worked ok 6020 with socket.socket(socket.AF_INET, 6021 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: 6022 self.checkNonblock(s) 6023 s.setblocking(True) 6024 self.checkNonblock(s, nonblock=False) 6025 s.setblocking(False) 6026 self.checkNonblock(s) 6027 s.settimeout(None) 6028 self.checkNonblock(s, nonblock=False) 6029 s.settimeout(2.0) 6030 self.checkNonblock(s, timeout=2.0) 6031 s.setblocking(True) 6032 self.checkNonblock(s, nonblock=False) 6033 # defaulttimeout 6034 t = socket.getdefaulttimeout() 6035 socket.setdefaulttimeout(0.0) 6036 with socket.socket() as s: 6037 self.checkNonblock(s) 6038 socket.setdefaulttimeout(None) 6039 with socket.socket() as s: 6040 self.checkNonblock(s, False) 6041 socket.setdefaulttimeout(2.0) 6042 with socket.socket() as s: 6043 self.checkNonblock(s, timeout=2.0) 6044 socket.setdefaulttimeout(None) 6045 with socket.socket() as s: 6046 self.checkNonblock(s, False) 6047 socket.setdefaulttimeout(t) 6048 6049 6050@unittest.skipUnless(os.name == "nt", "Windows specific") 6051@unittest.skipUnless(multiprocessing, "need multiprocessing") 6052class TestSocketSharing(SocketTCPTest): 6053 # This must be classmethod and not staticmethod or multiprocessing 6054 # won't be able to bootstrap it. 6055 @classmethod 6056 def remoteProcessServer(cls, q): 6057 # Recreate socket from shared data 6058 sdata = q.get() 6059 message = q.get() 6060 6061 s = socket.fromshare(sdata) 6062 s2, c = s.accept() 6063 6064 # Send the message 6065 s2.sendall(message) 6066 s2.close() 6067 s.close() 6068 6069 def testShare(self): 6070 # Transfer the listening server socket to another process 6071 # and service it from there. 6072 6073 # Create process: 6074 q = multiprocessing.Queue() 6075 p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) 6076 p.start() 6077 6078 # Get the shared socket data 6079 data = self.serv.share(p.pid) 6080 6081 # Pass the shared socket to the other process 6082 addr = self.serv.getsockname() 6083 self.serv.close() 6084 q.put(data) 6085 6086 # The data that the server will send us 6087 message = b"slapmahfro" 6088 q.put(message) 6089 6090 # Connect 6091 s = socket.create_connection(addr) 6092 # listen for the data 6093 m = [] 6094 while True: 6095 data = s.recv(100) 6096 if not data: 6097 break 6098 m.append(data) 6099 s.close() 6100 received = b"".join(m) 6101 self.assertEqual(received, message) 6102 p.join() 6103 6104 def testShareLength(self): 6105 data = self.serv.share(os.getpid()) 6106 self.assertRaises(ValueError, socket.fromshare, data[:-1]) 6107 self.assertRaises(ValueError, socket.fromshare, data+b"foo") 6108 6109 def compareSockets(self, org, other): 6110 # socket sharing is expected to work only for blocking socket 6111 # since the internal python timeout value isn't transferred. 6112 self.assertEqual(org.gettimeout(), None) 6113 self.assertEqual(org.gettimeout(), other.gettimeout()) 6114 6115 self.assertEqual(org.family, other.family) 6116 self.assertEqual(org.type, other.type) 6117 # If the user specified "0" for proto, then 6118 # internally windows will have picked the correct value. 6119 # Python introspection on the socket however will still return 6120 # 0. For the shared socket, the python value is recreated 6121 # from the actual value, so it may not compare correctly. 6122 if org.proto != 0: 6123 self.assertEqual(org.proto, other.proto) 6124 6125 def testShareLocal(self): 6126 data = self.serv.share(os.getpid()) 6127 s = socket.fromshare(data) 6128 try: 6129 self.compareSockets(self.serv, s) 6130 finally: 6131 s.close() 6132 6133 def testTypes(self): 6134 families = [socket.AF_INET, socket.AF_INET6] 6135 types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] 6136 for f in families: 6137 for t in types: 6138 try: 6139 source = socket.socket(f, t) 6140 except OSError: 6141 continue # This combination is not supported 6142 try: 6143 data = source.share(os.getpid()) 6144 shared = socket.fromshare(data) 6145 try: 6146 self.compareSockets(source, shared) 6147 finally: 6148 shared.close() 6149 finally: 6150 source.close() 6151 6152 6153class SendfileUsingSendTest(ThreadedTCPSocketTest): 6154 """ 6155 Test the send() implementation of socket.sendfile(). 6156 """ 6157 6158 FILESIZE = (10 * 1024 * 1024) # 10 MiB 6159 BUFSIZE = 8192 6160 FILEDATA = b"" 6161 TIMEOUT = support.LOOPBACK_TIMEOUT 6162 6163 @classmethod 6164 def setUpClass(cls): 6165 def chunks(total, step): 6166 assert total >= step 6167 while total > step: 6168 yield step 6169 total -= step 6170 if total: 6171 yield total 6172 6173 chunk = b"".join([random.choice(string.ascii_letters).encode() 6174 for i in range(cls.BUFSIZE)]) 6175 with open(os_helper.TESTFN, 'wb') as f: 6176 for csize in chunks(cls.FILESIZE, cls.BUFSIZE): 6177 f.write(chunk) 6178 with open(os_helper.TESTFN, 'rb') as f: 6179 cls.FILEDATA = f.read() 6180 assert len(cls.FILEDATA) == cls.FILESIZE 6181 6182 @classmethod 6183 def tearDownClass(cls): 6184 os_helper.unlink(os_helper.TESTFN) 6185 6186 def accept_conn(self): 6187 self.serv.settimeout(support.LONG_TIMEOUT) 6188 conn, addr = self.serv.accept() 6189 conn.settimeout(self.TIMEOUT) 6190 self.addCleanup(conn.close) 6191 return conn 6192 6193 def recv_data(self, conn): 6194 received = [] 6195 while True: 6196 chunk = conn.recv(self.BUFSIZE) 6197 if not chunk: 6198 break 6199 received.append(chunk) 6200 return b''.join(received) 6201 6202 def meth_from_sock(self, sock): 6203 # Depending on the mixin class being run return either send() 6204 # or sendfile() method implementation. 6205 return getattr(sock, "_sendfile_use_send") 6206 6207 # regular file 6208 6209 def _testRegularFile(self): 6210 address = self.serv.getsockname() 6211 file = open(os_helper.TESTFN, 'rb') 6212 with socket.create_connection(address) as sock, file as file: 6213 meth = self.meth_from_sock(sock) 6214 sent = meth(file) 6215 self.assertEqual(sent, self.FILESIZE) 6216 self.assertEqual(file.tell(), self.FILESIZE) 6217 6218 def testRegularFile(self): 6219 conn = self.accept_conn() 6220 data = self.recv_data(conn) 6221 self.assertEqual(len(data), self.FILESIZE) 6222 self.assertEqual(data, self.FILEDATA) 6223 6224 # non regular file 6225 6226 def _testNonRegularFile(self): 6227 address = self.serv.getsockname() 6228 file = io.BytesIO(self.FILEDATA) 6229 with socket.create_connection(address) as sock, file as file: 6230 sent = sock.sendfile(file) 6231 self.assertEqual(sent, self.FILESIZE) 6232 self.assertEqual(file.tell(), self.FILESIZE) 6233 self.assertRaises(socket._GiveupOnSendfile, 6234 sock._sendfile_use_sendfile, file) 6235 6236 def testNonRegularFile(self): 6237 conn = self.accept_conn() 6238 data = self.recv_data(conn) 6239 self.assertEqual(len(data), self.FILESIZE) 6240 self.assertEqual(data, self.FILEDATA) 6241 6242 # empty file 6243 6244 def _testEmptyFileSend(self): 6245 address = self.serv.getsockname() 6246 filename = os_helper.TESTFN + "2" 6247 with open(filename, 'wb'): 6248 self.addCleanup(os_helper.unlink, filename) 6249 file = open(filename, 'rb') 6250 with socket.create_connection(address) as sock, file as file: 6251 meth = self.meth_from_sock(sock) 6252 sent = meth(file) 6253 self.assertEqual(sent, 0) 6254 self.assertEqual(file.tell(), 0) 6255 6256 def testEmptyFileSend(self): 6257 conn = self.accept_conn() 6258 data = self.recv_data(conn) 6259 self.assertEqual(data, b"") 6260 6261 # offset 6262 6263 def _testOffset(self): 6264 address = self.serv.getsockname() 6265 file = open(os_helper.TESTFN, 'rb') 6266 with socket.create_connection(address) as sock, file as file: 6267 meth = self.meth_from_sock(sock) 6268 sent = meth(file, offset=5000) 6269 self.assertEqual(sent, self.FILESIZE - 5000) 6270 self.assertEqual(file.tell(), self.FILESIZE) 6271 6272 def testOffset(self): 6273 conn = self.accept_conn() 6274 data = self.recv_data(conn) 6275 self.assertEqual(len(data), self.FILESIZE - 5000) 6276 self.assertEqual(data, self.FILEDATA[5000:]) 6277 6278 # count 6279 6280 def _testCount(self): 6281 address = self.serv.getsockname() 6282 file = open(os_helper.TESTFN, 'rb') 6283 sock = socket.create_connection(address, 6284 timeout=support.LOOPBACK_TIMEOUT) 6285 with sock, file: 6286 count = 5000007 6287 meth = self.meth_from_sock(sock) 6288 sent = meth(file, count=count) 6289 self.assertEqual(sent, count) 6290 self.assertEqual(file.tell(), count) 6291 6292 def testCount(self): 6293 count = 5000007 6294 conn = self.accept_conn() 6295 data = self.recv_data(conn) 6296 self.assertEqual(len(data), count) 6297 self.assertEqual(data, self.FILEDATA[:count]) 6298 6299 # count small 6300 6301 def _testCountSmall(self): 6302 address = self.serv.getsockname() 6303 file = open(os_helper.TESTFN, 'rb') 6304 sock = socket.create_connection(address, 6305 timeout=support.LOOPBACK_TIMEOUT) 6306 with sock, file: 6307 count = 1 6308 meth = self.meth_from_sock(sock) 6309 sent = meth(file, count=count) 6310 self.assertEqual(sent, count) 6311 self.assertEqual(file.tell(), count) 6312 6313 def testCountSmall(self): 6314 count = 1 6315 conn = self.accept_conn() 6316 data = self.recv_data(conn) 6317 self.assertEqual(len(data), count) 6318 self.assertEqual(data, self.FILEDATA[:count]) 6319 6320 # count + offset 6321 6322 def _testCountWithOffset(self): 6323 address = self.serv.getsockname() 6324 file = open(os_helper.TESTFN, 'rb') 6325 with socket.create_connection(address, timeout=2) as sock, file as file: 6326 count = 100007 6327 meth = self.meth_from_sock(sock) 6328 sent = meth(file, offset=2007, count=count) 6329 self.assertEqual(sent, count) 6330 self.assertEqual(file.tell(), count + 2007) 6331 6332 def testCountWithOffset(self): 6333 count = 100007 6334 conn = self.accept_conn() 6335 data = self.recv_data(conn) 6336 self.assertEqual(len(data), count) 6337 self.assertEqual(data, self.FILEDATA[2007:count+2007]) 6338 6339 # non blocking sockets are not supposed to work 6340 6341 def _testNonBlocking(self): 6342 address = self.serv.getsockname() 6343 file = open(os_helper.TESTFN, 'rb') 6344 with socket.create_connection(address) as sock, file as file: 6345 sock.setblocking(False) 6346 meth = self.meth_from_sock(sock) 6347 self.assertRaises(ValueError, meth, file) 6348 self.assertRaises(ValueError, sock.sendfile, file) 6349 6350 def testNonBlocking(self): 6351 conn = self.accept_conn() 6352 if conn.recv(8192): 6353 self.fail('was not supposed to receive any data') 6354 6355 # timeout (non-triggered) 6356 6357 def _testWithTimeout(self): 6358 address = self.serv.getsockname() 6359 file = open(os_helper.TESTFN, 'rb') 6360 sock = socket.create_connection(address, 6361 timeout=support.LOOPBACK_TIMEOUT) 6362 with sock, file: 6363 meth = self.meth_from_sock(sock) 6364 sent = meth(file) 6365 self.assertEqual(sent, self.FILESIZE) 6366 6367 def testWithTimeout(self): 6368 conn = self.accept_conn() 6369 data = self.recv_data(conn) 6370 self.assertEqual(len(data), self.FILESIZE) 6371 self.assertEqual(data, self.FILEDATA) 6372 6373 # timeout (triggered) 6374 6375 def _testWithTimeoutTriggeredSend(self): 6376 address = self.serv.getsockname() 6377 with open(os_helper.TESTFN, 'rb') as file: 6378 with socket.create_connection(address) as sock: 6379 sock.settimeout(0.01) 6380 meth = self.meth_from_sock(sock) 6381 self.assertRaises(TimeoutError, meth, file) 6382 6383 def testWithTimeoutTriggeredSend(self): 6384 conn = self.accept_conn() 6385 conn.recv(88192) 6386 # bpo-45212: the wait here needs to be longer than the client-side timeout (0.01s) 6387 time.sleep(1) 6388 6389 # errors 6390 6391 def _test_errors(self): 6392 pass 6393 6394 def test_errors(self): 6395 with open(os_helper.TESTFN, 'rb') as file: 6396 with socket.socket(type=socket.SOCK_DGRAM) as s: 6397 meth = self.meth_from_sock(s) 6398 self.assertRaisesRegex( 6399 ValueError, "SOCK_STREAM", meth, file) 6400 with open(os_helper.TESTFN, encoding="utf-8") as file: 6401 with socket.socket() as s: 6402 meth = self.meth_from_sock(s) 6403 self.assertRaisesRegex( 6404 ValueError, "binary mode", meth, file) 6405 with open(os_helper.TESTFN, 'rb') as file: 6406 with socket.socket() as s: 6407 meth = self.meth_from_sock(s) 6408 self.assertRaisesRegex(TypeError, "positive integer", 6409 meth, file, count='2') 6410 self.assertRaisesRegex(TypeError, "positive integer", 6411 meth, file, count=0.1) 6412 self.assertRaisesRegex(ValueError, "positive integer", 6413 meth, file, count=0) 6414 self.assertRaisesRegex(ValueError, "positive integer", 6415 meth, file, count=-1) 6416 6417 6418@unittest.skipUnless(hasattr(os, "sendfile"), 6419 'os.sendfile() required for this test.') 6420class SendfileUsingSendfileTest(SendfileUsingSendTest): 6421 """ 6422 Test the sendfile() implementation of socket.sendfile(). 6423 """ 6424 def meth_from_sock(self, sock): 6425 return getattr(sock, "_sendfile_use_sendfile") 6426 6427 6428@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') 6429class LinuxKernelCryptoAPI(unittest.TestCase): 6430 # tests for AF_ALG 6431 def create_alg(self, typ, name): 6432 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6433 try: 6434 sock.bind((typ, name)) 6435 except FileNotFoundError as e: 6436 # type / algorithm is not available 6437 sock.close() 6438 raise unittest.SkipTest(str(e), typ, name) 6439 else: 6440 return sock 6441 6442 # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY, 6443 # at least on ppc64le architecture 6444 @support.requires_linux_version(4, 5) 6445 def test_sha256(self): 6446 expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" 6447 "177a9cb410ff61f20015ad") 6448 with self.create_alg('hash', 'sha256') as algo: 6449 op, _ = algo.accept() 6450 with op: 6451 op.sendall(b"abc") 6452 self.assertEqual(op.recv(512), expected) 6453 6454 op, _ = algo.accept() 6455 with op: 6456 op.send(b'a', socket.MSG_MORE) 6457 op.send(b'b', socket.MSG_MORE) 6458 op.send(b'c', socket.MSG_MORE) 6459 op.send(b'') 6460 self.assertEqual(op.recv(512), expected) 6461 6462 def test_hmac_sha1(self): 6463 expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") 6464 with self.create_alg('hash', 'hmac(sha1)') as algo: 6465 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") 6466 op, _ = algo.accept() 6467 with op: 6468 op.sendall(b"what do ya want for nothing?") 6469 self.assertEqual(op.recv(512), expected) 6470 6471 # Although it should work with 3.19 and newer the test blocks on 6472 # Ubuntu 15.10 with Kernel 4.2.0-19. 6473 @support.requires_linux_version(4, 3) 6474 def test_aes_cbc(self): 6475 key = bytes.fromhex('06a9214036b8a15b512e03d534120006') 6476 iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') 6477 msg = b"Single block msg" 6478 ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') 6479 msglen = len(msg) 6480 with self.create_alg('skcipher', 'cbc(aes)') as algo: 6481 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6482 op, _ = algo.accept() 6483 with op: 6484 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6485 flags=socket.MSG_MORE) 6486 op.sendall(msg) 6487 self.assertEqual(op.recv(msglen), ciphertext) 6488 6489 op, _ = algo.accept() 6490 with op: 6491 op.sendmsg_afalg([ciphertext], 6492 op=socket.ALG_OP_DECRYPT, iv=iv) 6493 self.assertEqual(op.recv(msglen), msg) 6494 6495 # long message 6496 multiplier = 1024 6497 longmsg = [msg] * multiplier 6498 op, _ = algo.accept() 6499 with op: 6500 op.sendmsg_afalg(longmsg, 6501 op=socket.ALG_OP_ENCRYPT, iv=iv) 6502 enc = op.recv(msglen * multiplier) 6503 self.assertEqual(len(enc), msglen * multiplier) 6504 self.assertEqual(enc[:msglen], ciphertext) 6505 6506 op, _ = algo.accept() 6507 with op: 6508 op.sendmsg_afalg([enc], 6509 op=socket.ALG_OP_DECRYPT, iv=iv) 6510 dec = op.recv(msglen * multiplier) 6511 self.assertEqual(len(dec), msglen * multiplier) 6512 self.assertEqual(dec, msg * multiplier) 6513 6514 @support.requires_linux_version(4, 9) # see issue29324 6515 def test_aead_aes_gcm(self): 6516 key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') 6517 iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') 6518 plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') 6519 assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') 6520 expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') 6521 expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') 6522 6523 taglen = len(expected_tag) 6524 assoclen = len(assoc) 6525 6526 with self.create_alg('aead', 'gcm(aes)') as algo: 6527 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6528 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, 6529 None, taglen) 6530 6531 # send assoc, plain and tag buffer in separate steps 6532 op, _ = algo.accept() 6533 with op: 6534 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6535 assoclen=assoclen, flags=socket.MSG_MORE) 6536 op.sendall(assoc, socket.MSG_MORE) 6537 op.sendall(plain) 6538 res = op.recv(assoclen + len(plain) + taglen) 6539 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6540 self.assertEqual(expected_tag, res[-taglen:]) 6541 6542 # now with msg 6543 op, _ = algo.accept() 6544 with op: 6545 msg = assoc + plain 6546 op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, 6547 assoclen=assoclen) 6548 res = op.recv(assoclen + len(plain) + taglen) 6549 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6550 self.assertEqual(expected_tag, res[-taglen:]) 6551 6552 # create anc data manually 6553 pack_uint32 = struct.Struct('I').pack 6554 op, _ = algo.accept() 6555 with op: 6556 msg = assoc + plain 6557 op.sendmsg( 6558 [msg], 6559 ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], 6560 [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], 6561 [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], 6562 ) 6563 ) 6564 res = op.recv(len(msg) + taglen) 6565 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6566 self.assertEqual(expected_tag, res[-taglen:]) 6567 6568 # decrypt and verify 6569 op, _ = algo.accept() 6570 with op: 6571 msg = assoc + expected_ct + expected_tag 6572 op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, 6573 assoclen=assoclen) 6574 res = op.recv(len(msg) - taglen) 6575 self.assertEqual(plain, res[assoclen:]) 6576 6577 @support.requires_linux_version(4, 3) # see test_aes_cbc 6578 def test_drbg_pr_sha256(self): 6579 # deterministic random bit generator, prediction resistance, sha256 6580 with self.create_alg('rng', 'drbg_pr_sha256') as algo: 6581 extra_seed = os.urandom(32) 6582 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) 6583 op, _ = algo.accept() 6584 with op: 6585 rn = op.recv(32) 6586 self.assertEqual(len(rn), 32) 6587 6588 def test_sendmsg_afalg_args(self): 6589 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6590 with sock: 6591 with self.assertRaises(TypeError): 6592 sock.sendmsg_afalg() 6593 6594 with self.assertRaises(TypeError): 6595 sock.sendmsg_afalg(op=None) 6596 6597 with self.assertRaises(TypeError): 6598 sock.sendmsg_afalg(1) 6599 6600 with self.assertRaises(TypeError): 6601 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) 6602 6603 with self.assertRaises(TypeError): 6604 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) 6605 6606 def test_length_restriction(self): 6607 # bpo-35050, off-by-one error in length check 6608 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6609 self.addCleanup(sock.close) 6610 6611 # salg_type[14] 6612 with self.assertRaises(FileNotFoundError): 6613 sock.bind(("t" * 13, "name")) 6614 with self.assertRaisesRegex(ValueError, "type too long"): 6615 sock.bind(("t" * 14, "name")) 6616 6617 # salg_name[64] 6618 with self.assertRaises(FileNotFoundError): 6619 sock.bind(("type", "n" * 63)) 6620 with self.assertRaisesRegex(ValueError, "name too long"): 6621 sock.bind(("type", "n" * 64)) 6622 6623 6624@unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') 6625class TestMacOSTCPFlags(unittest.TestCase): 6626 def test_tcp_keepalive(self): 6627 self.assertTrue(socket.TCP_KEEPALIVE) 6628 6629 6630@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") 6631class TestMSWindowsTCPFlags(unittest.TestCase): 6632 knownTCPFlags = { 6633 # available since long time ago 6634 'TCP_MAXSEG', 6635 'TCP_NODELAY', 6636 # available starting with Windows 10 1607 6637 'TCP_FASTOPEN', 6638 # available starting with Windows 10 1703 6639 'TCP_KEEPCNT', 6640 # available starting with Windows 10 1709 6641 'TCP_KEEPIDLE', 6642 'TCP_KEEPINTVL' 6643 } 6644 6645 def test_new_tcp_flags(self): 6646 provided = [s for s in dir(socket) if s.startswith('TCP')] 6647 unknown = [s for s in provided if s not in self.knownTCPFlags] 6648 6649 self.assertEqual([], unknown, 6650 "New TCP flags were discovered. See bpo-32394 for more information") 6651 6652 6653class CreateServerTest(unittest.TestCase): 6654 6655 def test_address(self): 6656 port = socket_helper.find_unused_port() 6657 with socket.create_server(("127.0.0.1", port)) as sock: 6658 self.assertEqual(sock.getsockname()[0], "127.0.0.1") 6659 self.assertEqual(sock.getsockname()[1], port) 6660 if socket_helper.IPV6_ENABLED: 6661 with socket.create_server(("::1", port), 6662 family=socket.AF_INET6) as sock: 6663 self.assertEqual(sock.getsockname()[0], "::1") 6664 self.assertEqual(sock.getsockname()[1], port) 6665 6666 def test_family_and_type(self): 6667 with socket.create_server(("127.0.0.1", 0)) as sock: 6668 self.assertEqual(sock.family, socket.AF_INET) 6669 self.assertEqual(sock.type, socket.SOCK_STREAM) 6670 if socket_helper.IPV6_ENABLED: 6671 with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: 6672 self.assertEqual(s.family, socket.AF_INET6) 6673 self.assertEqual(sock.type, socket.SOCK_STREAM) 6674 6675 def test_reuse_port(self): 6676 if not hasattr(socket, "SO_REUSEPORT"): 6677 with self.assertRaises(ValueError): 6678 socket.create_server(("localhost", 0), reuse_port=True) 6679 else: 6680 with socket.create_server(("localhost", 0)) as sock: 6681 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6682 self.assertEqual(opt, 0) 6683 with socket.create_server(("localhost", 0), reuse_port=True) as sock: 6684 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6685 self.assertNotEqual(opt, 0) 6686 6687 @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or 6688 not hasattr(_socket, 'IPV6_V6ONLY'), 6689 "IPV6_V6ONLY option not supported") 6690 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6691 def test_ipv6_only_default(self): 6692 with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: 6693 assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) 6694 6695 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6696 "dualstack_ipv6 not supported") 6697 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6698 def test_dualstack_ipv6_family(self): 6699 with socket.create_server(("::1", 0), family=socket.AF_INET6, 6700 dualstack_ipv6=True) as sock: 6701 self.assertEqual(sock.family, socket.AF_INET6) 6702 6703 6704class CreateServerFunctionalTest(unittest.TestCase): 6705 timeout = support.LOOPBACK_TIMEOUT 6706 6707 def echo_server(self, sock): 6708 def run(sock): 6709 with sock: 6710 conn, _ = sock.accept() 6711 with conn: 6712 event.wait(self.timeout) 6713 msg = conn.recv(1024) 6714 if not msg: 6715 return 6716 conn.sendall(msg) 6717 6718 event = threading.Event() 6719 sock.settimeout(self.timeout) 6720 thread = threading.Thread(target=run, args=(sock, )) 6721 thread.start() 6722 self.addCleanup(thread.join, self.timeout) 6723 event.set() 6724 6725 def echo_client(self, addr, family): 6726 with socket.socket(family=family) as sock: 6727 sock.settimeout(self.timeout) 6728 sock.connect(addr) 6729 sock.sendall(b'foo') 6730 self.assertEqual(sock.recv(1024), b'foo') 6731 6732 def test_tcp4(self): 6733 port = socket_helper.find_unused_port() 6734 with socket.create_server(("", port)) as sock: 6735 self.echo_server(sock) 6736 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6737 6738 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6739 def test_tcp6(self): 6740 port = socket_helper.find_unused_port() 6741 with socket.create_server(("", port), 6742 family=socket.AF_INET6) as sock: 6743 self.echo_server(sock) 6744 self.echo_client(("::1", port), socket.AF_INET6) 6745 6746 # --- dual stack tests 6747 6748 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6749 "dualstack_ipv6 not supported") 6750 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6751 def test_dual_stack_client_v4(self): 6752 port = socket_helper.find_unused_port() 6753 with socket.create_server(("", port), family=socket.AF_INET6, 6754 dualstack_ipv6=True) as sock: 6755 self.echo_server(sock) 6756 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6757 6758 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6759 "dualstack_ipv6 not supported") 6760 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6761 def test_dual_stack_client_v6(self): 6762 port = socket_helper.find_unused_port() 6763 with socket.create_server(("", port), family=socket.AF_INET6, 6764 dualstack_ipv6=True) as sock: 6765 self.echo_server(sock) 6766 self.echo_client(("::1", port), socket.AF_INET6) 6767 6768@requireAttrs(socket, "send_fds") 6769@requireAttrs(socket, "recv_fds") 6770@requireAttrs(socket, "AF_UNIX") 6771class SendRecvFdsTests(unittest.TestCase): 6772 def testSendAndRecvFds(self): 6773 def close_pipes(pipes): 6774 for fd1, fd2 in pipes: 6775 os.close(fd1) 6776 os.close(fd2) 6777 6778 def close_fds(fds): 6779 for fd in fds: 6780 os.close(fd) 6781 6782 # send 10 file descriptors 6783 pipes = [os.pipe() for _ in range(10)] 6784 self.addCleanup(close_pipes, pipes) 6785 fds = [rfd for rfd, wfd in pipes] 6786 6787 # use a UNIX socket pair to exchange file descriptors locally 6788 sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) 6789 with sock1, sock2: 6790 socket.send_fds(sock1, [MSG], fds) 6791 # request more data and file descriptors than expected 6792 msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2) 6793 self.addCleanup(close_fds, fds2) 6794 6795 self.assertEqual(msg, MSG) 6796 self.assertEqual(len(fds2), len(fds)) 6797 self.assertEqual(flags, 0) 6798 # don't test addr 6799 6800 # test that file descriptors are connected 6801 for index, fds in enumerate(pipes): 6802 rfd, wfd = fds 6803 os.write(wfd, str(index).encode()) 6804 6805 for index, rfd in enumerate(fds2): 6806 data = os.read(rfd, 100) 6807 self.assertEqual(data, str(index).encode()) 6808 6809 6810def setUpModule(): 6811 thread_info = threading_helper.threading_setup() 6812 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 6813 6814 6815if __name__ == "__main__": 6816 unittest.main() 6817