1import unittest 2from test import support 3from test.support import socket_helper 4 5import errno 6import io 7import itertools 8import socket 9import select 10import tempfile 11import time 12import traceback 13import queue 14import sys 15import os 16import platform 17import array 18import contextlib 19from weakref import proxy 20import signal 21import math 22import pickle 23import struct 24import random 25import shutil 26import string 27import _thread as thread 28import threading 29try: 30 import multiprocessing 31except ImportError: 32 multiprocessing = False 33try: 34 import fcntl 35except ImportError: 36 fcntl = None 37 38HOST = socket_helper.HOST 39# test unicode string and carriage return 40MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') 41 42VSOCKPORT = 1234 43AIX = platform.system() == "AIX" 44 45try: 46 import _socket 47except ImportError: 48 _socket = None 49 50def get_cid(): 51 if fcntl is None: 52 return None 53 if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'): 54 return None 55 try: 56 with open("/dev/vsock", "rb") as f: 57 r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ") 58 except OSError: 59 return None 60 else: 61 return struct.unpack("I", r)[0] 62 63def _have_socket_can(): 64 """Check whether CAN sockets are supported on this host.""" 65 try: 66 s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 67 except (AttributeError, OSError): 68 return False 69 else: 70 s.close() 71 return True 72 73def _have_socket_can_isotp(): 74 """Check whether CAN ISOTP sockets are supported on this host.""" 75 try: 76 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) 77 except (AttributeError, OSError): 78 return False 79 else: 80 s.close() 81 return True 82 83def _have_socket_can_j1939(): 84 """Check whether CAN J1939 sockets are supported on this host.""" 85 try: 86 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) 87 except (AttributeError, OSError): 88 return False 89 else: 90 s.close() 91 return True 92 93def _have_socket_rds(): 94 """Check whether RDS sockets are supported on this host.""" 95 try: 96 s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 97 except (AttributeError, OSError): 98 return False 99 else: 100 s.close() 101 return True 102 103def _have_socket_alg(): 104 """Check whether AF_ALG sockets are supported on this host.""" 105 try: 106 s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 107 except (AttributeError, OSError): 108 return False 109 else: 110 s.close() 111 return True 112 113def _have_socket_qipcrtr(): 114 """Check whether AF_QIPCRTR sockets are supported on this host.""" 115 try: 116 s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0) 117 except (AttributeError, OSError): 118 return False 119 else: 120 s.close() 121 return True 122 123def _have_socket_vsock(): 124 """Check whether AF_VSOCK sockets are supported on this host.""" 125 ret = get_cid() is not None 126 return ret 127 128 129def _have_socket_bluetooth(): 130 """Check whether AF_BLUETOOTH sockets are supported on this host.""" 131 try: 132 # RFCOMM is supported by all platforms with bluetooth support. Windows 133 # does not support omitting the protocol. 134 s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) 135 except (AttributeError, OSError): 136 return False 137 else: 138 s.close() 139 return True 140 141 142@contextlib.contextmanager 143def socket_setdefaulttimeout(timeout): 144 old_timeout = socket.getdefaulttimeout() 145 try: 146 socket.setdefaulttimeout(timeout) 147 yield 148 finally: 149 socket.setdefaulttimeout(old_timeout) 150 151 152HAVE_SOCKET_CAN = _have_socket_can() 153 154HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() 155 156HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939() 157 158HAVE_SOCKET_RDS = _have_socket_rds() 159 160HAVE_SOCKET_ALG = _have_socket_alg() 161 162HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr() 163 164HAVE_SOCKET_VSOCK = _have_socket_vsock() 165 166HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE") 167 168HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth() 169 170# Size in bytes of the int type 171SIZEOF_INT = array.array("i").itemsize 172 173class SocketTCPTest(unittest.TestCase): 174 175 def setUp(self): 176 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 177 self.port = socket_helper.bind_port(self.serv) 178 self.serv.listen() 179 180 def tearDown(self): 181 self.serv.close() 182 self.serv = None 183 184class SocketUDPTest(unittest.TestCase): 185 186 def setUp(self): 187 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 188 self.port = socket_helper.bind_port(self.serv) 189 190 def tearDown(self): 191 self.serv.close() 192 self.serv = None 193 194class SocketUDPLITETest(SocketUDPTest): 195 196 def setUp(self): 197 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 198 self.port = socket_helper.bind_port(self.serv) 199 200class ThreadSafeCleanupTestCase(unittest.TestCase): 201 """Subclass of unittest.TestCase with thread-safe cleanup methods. 202 203 This subclass protects the addCleanup() and doCleanups() methods 204 with a recursive lock. 205 """ 206 207 def __init__(self, *args, **kwargs): 208 super().__init__(*args, **kwargs) 209 self._cleanup_lock = threading.RLock() 210 211 def addCleanup(self, *args, **kwargs): 212 with self._cleanup_lock: 213 return super().addCleanup(*args, **kwargs) 214 215 def doCleanups(self, *args, **kwargs): 216 with self._cleanup_lock: 217 return super().doCleanups(*args, **kwargs) 218 219class SocketCANTest(unittest.TestCase): 220 221 """To be able to run this test, a `vcan0` CAN interface can be created with 222 the following commands: 223 # modprobe vcan 224 # ip link add dev vcan0 type vcan 225 # ifconfig vcan0 up 226 """ 227 interface = 'vcan0' 228 bufsize = 128 229 230 """The CAN frame structure is defined in <linux/can.h>: 231 232 struct can_frame { 233 canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ 234 __u8 can_dlc; /* data length code: 0 .. 8 */ 235 __u8 data[8] __attribute__((aligned(8))); 236 }; 237 """ 238 can_frame_fmt = "=IB3x8s" 239 can_frame_size = struct.calcsize(can_frame_fmt) 240 241 """The Broadcast Management Command frame structure is defined 242 in <linux/can/bcm.h>: 243 244 struct bcm_msg_head { 245 __u32 opcode; 246 __u32 flags; 247 __u32 count; 248 struct timeval ival1, ival2; 249 canid_t can_id; 250 __u32 nframes; 251 struct can_frame frames[0]; 252 } 253 254 `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see 255 `struct can_frame` definition). Must use native not standard types for packing. 256 """ 257 bcm_cmd_msg_fmt = "@3I4l2I" 258 bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) 259 260 def setUp(self): 261 self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 262 self.addCleanup(self.s.close) 263 try: 264 self.s.bind((self.interface,)) 265 except OSError: 266 self.skipTest('network interface `%s` does not exist' % 267 self.interface) 268 269 270class SocketRDSTest(unittest.TestCase): 271 272 """To be able to run this test, the `rds` kernel module must be loaded: 273 # modprobe rds 274 """ 275 bufsize = 8192 276 277 def setUp(self): 278 self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 279 self.addCleanup(self.serv.close) 280 try: 281 self.port = socket_helper.bind_port(self.serv) 282 except OSError: 283 self.skipTest('unable to bind RDS socket') 284 285 286class ThreadableTest: 287 """Threadable Test class 288 289 The ThreadableTest class makes it easy to create a threaded 290 client/server pair from an existing unit test. To create a 291 new threaded class from an existing unit test, use multiple 292 inheritance: 293 294 class NewClass (OldClass, ThreadableTest): 295 pass 296 297 This class defines two new fixture functions with obvious 298 purposes for overriding: 299 300 clientSetUp () 301 clientTearDown () 302 303 Any new test functions within the class must then define 304 tests in pairs, where the test name is preceded with a 305 '_' to indicate the client portion of the test. Ex: 306 307 def testFoo(self): 308 # Server portion 309 310 def _testFoo(self): 311 # Client portion 312 313 Any exceptions raised by the clients during their tests 314 are caught and transferred to the main thread to alert 315 the testing framework. 316 317 Note, the server setup function cannot call any blocking 318 functions that rely on the client thread during setup, 319 unless serverExplicitReady() is called just before 320 the blocking call (such as in setting up a client/server 321 connection and performing the accept() in setUp(). 322 """ 323 324 def __init__(self): 325 # Swap the true setup function 326 self.__setUp = self.setUp 327 self.__tearDown = self.tearDown 328 self.setUp = self._setUp 329 self.tearDown = self._tearDown 330 331 def serverExplicitReady(self): 332 """This method allows the server to explicitly indicate that 333 it wants the client thread to proceed. This is useful if the 334 server is about to execute a blocking routine that is 335 dependent upon the client thread during its setup routine.""" 336 self.server_ready.set() 337 338 def _setUp(self): 339 self.wait_threads = support.wait_threads_exit() 340 self.wait_threads.__enter__() 341 342 self.server_ready = threading.Event() 343 self.client_ready = threading.Event() 344 self.done = threading.Event() 345 self.queue = queue.Queue(1) 346 self.server_crashed = False 347 348 # Do some munging to start the client test. 349 methodname = self.id() 350 i = methodname.rfind('.') 351 methodname = methodname[i+1:] 352 test_method = getattr(self, '_' + methodname) 353 self.client_thread = thread.start_new_thread( 354 self.clientRun, (test_method,)) 355 356 try: 357 self.__setUp() 358 except: 359 self.server_crashed = True 360 raise 361 finally: 362 self.server_ready.set() 363 self.client_ready.wait() 364 365 def _tearDown(self): 366 self.__tearDown() 367 self.done.wait() 368 self.wait_threads.__exit__(None, None, None) 369 370 if self.queue.qsize(): 371 exc = self.queue.get() 372 raise exc 373 374 def clientRun(self, test_func): 375 self.server_ready.wait() 376 try: 377 self.clientSetUp() 378 except BaseException as e: 379 self.queue.put(e) 380 self.clientTearDown() 381 return 382 finally: 383 self.client_ready.set() 384 if self.server_crashed: 385 self.clientTearDown() 386 return 387 if not hasattr(test_func, '__call__'): 388 raise TypeError("test_func must be a callable function") 389 try: 390 test_func() 391 except BaseException as e: 392 self.queue.put(e) 393 finally: 394 self.clientTearDown() 395 396 def clientSetUp(self): 397 raise NotImplementedError("clientSetUp must be implemented.") 398 399 def clientTearDown(self): 400 self.done.set() 401 thread.exit() 402 403class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 404 405 def __init__(self, methodName='runTest'): 406 SocketTCPTest.__init__(self, methodName=methodName) 407 ThreadableTest.__init__(self) 408 409 def clientSetUp(self): 410 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 411 412 def clientTearDown(self): 413 self.cli.close() 414 self.cli = None 415 ThreadableTest.clientTearDown(self) 416 417class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 418 419 def __init__(self, methodName='runTest'): 420 SocketUDPTest.__init__(self, methodName=methodName) 421 ThreadableTest.__init__(self) 422 423 def clientSetUp(self): 424 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 425 426 def clientTearDown(self): 427 self.cli.close() 428 self.cli = None 429 ThreadableTest.clientTearDown(self) 430 431@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 432 'UDPLITE sockets required for this test.') 433class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest): 434 435 def __init__(self, methodName='runTest'): 436 SocketUDPLITETest.__init__(self, methodName=methodName) 437 ThreadableTest.__init__(self) 438 439 def clientSetUp(self): 440 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 441 442 def clientTearDown(self): 443 self.cli.close() 444 self.cli = None 445 ThreadableTest.clientTearDown(self) 446 447class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): 448 449 def __init__(self, methodName='runTest'): 450 SocketCANTest.__init__(self, methodName=methodName) 451 ThreadableTest.__init__(self) 452 453 def clientSetUp(self): 454 self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 455 try: 456 self.cli.bind((self.interface,)) 457 except OSError: 458 # skipTest should not be called here, and will be called in the 459 # server instead 460 pass 461 462 def clientTearDown(self): 463 self.cli.close() 464 self.cli = None 465 ThreadableTest.clientTearDown(self) 466 467class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): 468 469 def __init__(self, methodName='runTest'): 470 SocketRDSTest.__init__(self, methodName=methodName) 471 ThreadableTest.__init__(self) 472 473 def clientSetUp(self): 474 self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 475 try: 476 # RDS sockets must be bound explicitly to send or receive data 477 self.cli.bind((HOST, 0)) 478 self.cli_addr = self.cli.getsockname() 479 except OSError: 480 # skipTest should not be called here, and will be called in the 481 # server instead 482 pass 483 484 def clientTearDown(self): 485 self.cli.close() 486 self.cli = None 487 ThreadableTest.clientTearDown(self) 488 489@unittest.skipIf(fcntl is None, "need fcntl") 490@unittest.skipUnless(HAVE_SOCKET_VSOCK, 491 'VSOCK sockets required for this test.') 492@unittest.skipUnless(get_cid() != 2, 493 "This test can only be run on a virtual guest.") 494class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest): 495 496 def __init__(self, methodName='runTest'): 497 unittest.TestCase.__init__(self, methodName=methodName) 498 ThreadableTest.__init__(self) 499 500 def setUp(self): 501 self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 502 self.addCleanup(self.serv.close) 503 self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT)) 504 self.serv.listen() 505 self.serverExplicitReady() 506 self.conn, self.connaddr = self.serv.accept() 507 self.addCleanup(self.conn.close) 508 509 def clientSetUp(self): 510 time.sleep(0.1) 511 self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 512 self.addCleanup(self.cli.close) 513 cid = get_cid() 514 self.cli.connect((cid, VSOCKPORT)) 515 516 def testStream(self): 517 msg = self.conn.recv(1024) 518 self.assertEqual(msg, MSG) 519 520 def _testStream(self): 521 self.cli.send(MSG) 522 self.cli.close() 523 524class SocketConnectedTest(ThreadedTCPSocketTest): 525 """Socket tests for client-server connection. 526 527 self.cli_conn is a client socket connected to the server. The 528 setUp() method guarantees that it is connected to the server. 529 """ 530 531 def __init__(self, methodName='runTest'): 532 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 533 534 def setUp(self): 535 ThreadedTCPSocketTest.setUp(self) 536 # Indicate explicitly we're ready for the client thread to 537 # proceed and then perform the blocking call to accept 538 self.serverExplicitReady() 539 conn, addr = self.serv.accept() 540 self.cli_conn = conn 541 542 def tearDown(self): 543 self.cli_conn.close() 544 self.cli_conn = None 545 ThreadedTCPSocketTest.tearDown(self) 546 547 def clientSetUp(self): 548 ThreadedTCPSocketTest.clientSetUp(self) 549 self.cli.connect((HOST, self.port)) 550 self.serv_conn = self.cli 551 552 def clientTearDown(self): 553 self.serv_conn.close() 554 self.serv_conn = None 555 ThreadedTCPSocketTest.clientTearDown(self) 556 557class SocketPairTest(unittest.TestCase, ThreadableTest): 558 559 def __init__(self, methodName='runTest'): 560 unittest.TestCase.__init__(self, methodName=methodName) 561 ThreadableTest.__init__(self) 562 563 def setUp(self): 564 self.serv, self.cli = socket.socketpair() 565 566 def tearDown(self): 567 self.serv.close() 568 self.serv = None 569 570 def clientSetUp(self): 571 pass 572 573 def clientTearDown(self): 574 self.cli.close() 575 self.cli = None 576 ThreadableTest.clientTearDown(self) 577 578 579# The following classes are used by the sendmsg()/recvmsg() tests. 580# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase 581# gives a drop-in replacement for SocketConnectedTest, but different 582# address families can be used, and the attributes serv_addr and 583# cli_addr will be set to the addresses of the endpoints. 584 585class SocketTestBase(unittest.TestCase): 586 """A base class for socket tests. 587 588 Subclasses must provide methods newSocket() to return a new socket 589 and bindSock(sock) to bind it to an unused address. 590 591 Creates a socket self.serv and sets self.serv_addr to its address. 592 """ 593 594 def setUp(self): 595 self.serv = self.newSocket() 596 self.bindServer() 597 598 def bindServer(self): 599 """Bind server socket and set self.serv_addr to its address.""" 600 self.bindSock(self.serv) 601 self.serv_addr = self.serv.getsockname() 602 603 def tearDown(self): 604 self.serv.close() 605 self.serv = None 606 607 608class SocketListeningTestMixin(SocketTestBase): 609 """Mixin to listen on the server socket.""" 610 611 def setUp(self): 612 super().setUp() 613 self.serv.listen() 614 615 616class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, 617 ThreadableTest): 618 """Mixin to add client socket and allow client/server tests. 619 620 Client socket is self.cli and its address is self.cli_addr. See 621 ThreadableTest for usage information. 622 """ 623 624 def __init__(self, *args, **kwargs): 625 super().__init__(*args, **kwargs) 626 ThreadableTest.__init__(self) 627 628 def clientSetUp(self): 629 self.cli = self.newClientSocket() 630 self.bindClient() 631 632 def newClientSocket(self): 633 """Return a new socket for use as client.""" 634 return self.newSocket() 635 636 def bindClient(self): 637 """Bind client socket and set self.cli_addr to its address.""" 638 self.bindSock(self.cli) 639 self.cli_addr = self.cli.getsockname() 640 641 def clientTearDown(self): 642 self.cli.close() 643 self.cli = None 644 ThreadableTest.clientTearDown(self) 645 646 647class ConnectedStreamTestMixin(SocketListeningTestMixin, 648 ThreadedSocketTestMixin): 649 """Mixin to allow client/server stream tests with connected client. 650 651 Server's socket representing connection to client is self.cli_conn 652 and client's connection to server is self.serv_conn. (Based on 653 SocketConnectedTest.) 654 """ 655 656 def setUp(self): 657 super().setUp() 658 # Indicate explicitly we're ready for the client thread to 659 # proceed and then perform the blocking call to accept 660 self.serverExplicitReady() 661 conn, addr = self.serv.accept() 662 self.cli_conn = conn 663 664 def tearDown(self): 665 self.cli_conn.close() 666 self.cli_conn = None 667 super().tearDown() 668 669 def clientSetUp(self): 670 super().clientSetUp() 671 self.cli.connect(self.serv_addr) 672 self.serv_conn = self.cli 673 674 def clientTearDown(self): 675 try: 676 self.serv_conn.close() 677 self.serv_conn = None 678 except AttributeError: 679 pass 680 super().clientTearDown() 681 682 683class UnixSocketTestBase(SocketTestBase): 684 """Base class for Unix-domain socket tests.""" 685 686 # This class is used for file descriptor passing tests, so we 687 # create the sockets in a private directory so that other users 688 # can't send anything that might be problematic for a privileged 689 # user running the tests. 690 691 def setUp(self): 692 self.dir_path = tempfile.mkdtemp() 693 self.addCleanup(os.rmdir, self.dir_path) 694 super().setUp() 695 696 def bindSock(self, sock): 697 path = tempfile.mktemp(dir=self.dir_path) 698 socket_helper.bind_unix_socket(sock, path) 699 self.addCleanup(support.unlink, path) 700 701class UnixStreamBase(UnixSocketTestBase): 702 """Base class for Unix-domain SOCK_STREAM tests.""" 703 704 def newSocket(self): 705 return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 706 707 708class InetTestBase(SocketTestBase): 709 """Base class for IPv4 socket tests.""" 710 711 host = HOST 712 713 def setUp(self): 714 super().setUp() 715 self.port = self.serv_addr[1] 716 717 def bindSock(self, sock): 718 socket_helper.bind_port(sock, host=self.host) 719 720class TCPTestBase(InetTestBase): 721 """Base class for TCP-over-IPv4 tests.""" 722 723 def newSocket(self): 724 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 725 726class UDPTestBase(InetTestBase): 727 """Base class for UDP-over-IPv4 tests.""" 728 729 def newSocket(self): 730 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 731 732class UDPLITETestBase(InetTestBase): 733 """Base class for UDPLITE-over-IPv4 tests.""" 734 735 def newSocket(self): 736 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 737 738class SCTPStreamBase(InetTestBase): 739 """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" 740 741 def newSocket(self): 742 return socket.socket(socket.AF_INET, socket.SOCK_STREAM, 743 socket.IPPROTO_SCTP) 744 745 746class Inet6TestBase(InetTestBase): 747 """Base class for IPv6 socket tests.""" 748 749 host = socket_helper.HOSTv6 750 751class UDP6TestBase(Inet6TestBase): 752 """Base class for UDP-over-IPv6 tests.""" 753 754 def newSocket(self): 755 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 756 757class UDPLITE6TestBase(Inet6TestBase): 758 """Base class for UDPLITE-over-IPv6 tests.""" 759 760 def newSocket(self): 761 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 762 763 764# Test-skipping decorators for use with ThreadableTest. 765 766def skipWithClientIf(condition, reason): 767 """Skip decorated test if condition is true, add client_skip decorator. 768 769 If the decorated object is not a class, sets its attribute 770 "client_skip" to a decorator which will return an empty function 771 if the test is to be skipped, or the original function if it is 772 not. This can be used to avoid running the client part of a 773 skipped test when using ThreadableTest. 774 """ 775 def client_pass(*args, **kwargs): 776 pass 777 def skipdec(obj): 778 retval = unittest.skip(reason)(obj) 779 if not isinstance(obj, type): 780 retval.client_skip = lambda f: client_pass 781 return retval 782 def noskipdec(obj): 783 if not (isinstance(obj, type) or hasattr(obj, "client_skip")): 784 obj.client_skip = lambda f: f 785 return obj 786 return skipdec if condition else noskipdec 787 788 789def requireAttrs(obj, *attributes): 790 """Skip decorated test if obj is missing any of the given attributes. 791 792 Sets client_skip attribute as skipWithClientIf() does. 793 """ 794 missing = [name for name in attributes if not hasattr(obj, name)] 795 return skipWithClientIf( 796 missing, "don't have " + ", ".join(name for name in missing)) 797 798 799def requireSocket(*args): 800 """Skip decorated test if a socket cannot be created with given arguments. 801 802 When an argument is given as a string, will use the value of that 803 attribute of the socket module, or skip the test if it doesn't 804 exist. Sets client_skip attribute as skipWithClientIf() does. 805 """ 806 err = None 807 missing = [obj for obj in args if 808 isinstance(obj, str) and not hasattr(socket, obj)] 809 if missing: 810 err = "don't have " + ", ".join(name for name in missing) 811 else: 812 callargs = [getattr(socket, obj) if isinstance(obj, str) else obj 813 for obj in args] 814 try: 815 s = socket.socket(*callargs) 816 except OSError as e: 817 # XXX: check errno? 818 err = str(e) 819 else: 820 s.close() 821 return skipWithClientIf( 822 err is not None, 823 "can't create socket({0}): {1}".format( 824 ", ".join(str(o) for o in args), err)) 825 826 827####################################################################### 828## Begin Tests 829 830class GeneralModuleTests(unittest.TestCase): 831 832 def test_SocketType_is_socketobject(self): 833 import _socket 834 self.assertTrue(socket.SocketType is _socket.socket) 835 s = socket.socket() 836 self.assertIsInstance(s, socket.SocketType) 837 s.close() 838 839 def test_repr(self): 840 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 841 with s: 842 self.assertIn('fd=%i' % s.fileno(), repr(s)) 843 self.assertIn('family=%s' % socket.AF_INET, repr(s)) 844 self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) 845 self.assertIn('proto=0', repr(s)) 846 self.assertNotIn('raddr', repr(s)) 847 s.bind(('127.0.0.1', 0)) 848 self.assertIn('laddr', repr(s)) 849 self.assertIn(str(s.getsockname()), repr(s)) 850 self.assertIn('[closed]', repr(s)) 851 self.assertNotIn('laddr', repr(s)) 852 853 @unittest.skipUnless(_socket is not None, 'need _socket module') 854 def test_csocket_repr(self): 855 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 856 try: 857 expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' 858 % (s.fileno(), s.family, s.type, s.proto)) 859 self.assertEqual(repr(s), expected) 860 finally: 861 s.close() 862 expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' 863 % (s.family, s.type, s.proto)) 864 self.assertEqual(repr(s), expected) 865 866 def test_weakref(self): 867 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 868 p = proxy(s) 869 self.assertEqual(p.fileno(), s.fileno()) 870 s = None 871 try: 872 p.fileno() 873 except ReferenceError: 874 pass 875 else: 876 self.fail('Socket proxy still exists') 877 878 def testSocketError(self): 879 # Testing socket module exceptions 880 msg = "Error raising socket exception (%s)." 881 with self.assertRaises(OSError, msg=msg % 'OSError'): 882 raise OSError 883 with self.assertRaises(OSError, msg=msg % 'socket.herror'): 884 raise socket.herror 885 with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): 886 raise socket.gaierror 887 888 def testSendtoErrors(self): 889 # Testing that sendto doesn't mask failures. See #10169. 890 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 891 self.addCleanup(s.close) 892 s.bind(('', 0)) 893 sockname = s.getsockname() 894 # 2 args 895 with self.assertRaises(TypeError) as cm: 896 s.sendto('\u2620', sockname) 897 self.assertEqual(str(cm.exception), 898 "a bytes-like object is required, not 'str'") 899 with self.assertRaises(TypeError) as cm: 900 s.sendto(5j, sockname) 901 self.assertEqual(str(cm.exception), 902 "a bytes-like object is required, not 'complex'") 903 with self.assertRaises(TypeError) as cm: 904 s.sendto(b'foo', None) 905 self.assertIn('not NoneType',str(cm.exception)) 906 # 3 args 907 with self.assertRaises(TypeError) as cm: 908 s.sendto('\u2620', 0, sockname) 909 self.assertEqual(str(cm.exception), 910 "a bytes-like object is required, not 'str'") 911 with self.assertRaises(TypeError) as cm: 912 s.sendto(5j, 0, sockname) 913 self.assertEqual(str(cm.exception), 914 "a bytes-like object is required, not 'complex'") 915 with self.assertRaises(TypeError) as cm: 916 s.sendto(b'foo', 0, None) 917 self.assertIn('not NoneType', str(cm.exception)) 918 with self.assertRaises(TypeError) as cm: 919 s.sendto(b'foo', 'bar', sockname) 920 self.assertIn('an integer is required', str(cm.exception)) 921 with self.assertRaises(TypeError) as cm: 922 s.sendto(b'foo', None, None) 923 self.assertIn('an integer is required', str(cm.exception)) 924 # wrong number of args 925 with self.assertRaises(TypeError) as cm: 926 s.sendto(b'foo') 927 self.assertIn('(1 given)', str(cm.exception)) 928 with self.assertRaises(TypeError) as cm: 929 s.sendto(b'foo', 0, sockname, 4) 930 self.assertIn('(4 given)', str(cm.exception)) 931 932 def testCrucialConstants(self): 933 # Testing for mission critical constants 934 socket.AF_INET 935 if socket.has_ipv6: 936 socket.AF_INET6 937 socket.SOCK_STREAM 938 socket.SOCK_DGRAM 939 socket.SOCK_RAW 940 socket.SOCK_RDM 941 socket.SOCK_SEQPACKET 942 socket.SOL_SOCKET 943 socket.SO_REUSEADDR 944 945 def testCrucialIpProtoConstants(self): 946 socket.IPPROTO_TCP 947 socket.IPPROTO_UDP 948 if socket.has_ipv6: 949 socket.IPPROTO_IPV6 950 951 @unittest.skipUnless(os.name == "nt", "Windows specific") 952 def testWindowsSpecificConstants(self): 953 socket.IPPROTO_ICLFXBM 954 socket.IPPROTO_ST 955 socket.IPPROTO_CBT 956 socket.IPPROTO_IGP 957 socket.IPPROTO_RDP 958 socket.IPPROTO_PGM 959 socket.IPPROTO_L2TP 960 socket.IPPROTO_SCTP 961 962 @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') 963 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 964 def test3542SocketOptions(self): 965 # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542 966 opts = { 967 'IPV6_CHECKSUM', 968 'IPV6_DONTFRAG', 969 'IPV6_DSTOPTS', 970 'IPV6_HOPLIMIT', 971 'IPV6_HOPOPTS', 972 'IPV6_NEXTHOP', 973 'IPV6_PATHMTU', 974 'IPV6_PKTINFO', 975 'IPV6_RECVDSTOPTS', 976 'IPV6_RECVHOPLIMIT', 977 'IPV6_RECVHOPOPTS', 978 'IPV6_RECVPATHMTU', 979 'IPV6_RECVPKTINFO', 980 'IPV6_RECVRTHDR', 981 'IPV6_RECVTCLASS', 982 'IPV6_RTHDR', 983 'IPV6_RTHDRDSTOPTS', 984 'IPV6_RTHDR_TYPE_0', 985 'IPV6_TCLASS', 986 'IPV6_USE_MIN_MTU', 987 } 988 for opt in opts: 989 self.assertTrue( 990 hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'" 991 ) 992 993 def testHostnameRes(self): 994 # Testing hostname resolution mechanisms 995 hostname = socket.gethostname() 996 try: 997 ip = socket.gethostbyname(hostname) 998 except OSError: 999 # Probably name lookup wasn't set up right; skip this test 1000 self.skipTest('name lookup failure') 1001 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 1002 try: 1003 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 1004 except OSError: 1005 # Probably a similar problem as above; skip this test 1006 self.skipTest('name lookup failure') 1007 all_host_names = [hostname, hname] + aliases 1008 fqhn = socket.getfqdn(ip) 1009 if not fqhn in all_host_names: 1010 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 1011 1012 def test_host_resolution(self): 1013 for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']: 1014 self.assertEqual(socket.gethostbyname(addr), addr) 1015 1016 # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have 1017 # a matching name entry (e.g. 'ip6-localhost') 1018 for host in [socket_helper.HOSTv4]: 1019 self.assertIn(host, socket.gethostbyaddr(host)[2]) 1020 1021 def test_host_resolution_bad_address(self): 1022 # These are all malformed IP addresses and expected not to resolve to 1023 # any result. But some ISPs, e.g. AWS, may successfully resolve these 1024 # IPs. 1025 explanation = ( 1026 "resolving an invalid IP address did not raise OSError; " 1027 "can be caused by a broken DNS server" 1028 ) 1029 for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', 1030 '1:1:1:1:1:1:1:1:1']: 1031 with self.assertRaises(OSError, msg=addr): 1032 socket.gethostbyname(addr) 1033 with self.assertRaises(OSError, msg=explanation): 1034 socket.gethostbyaddr(addr) 1035 1036 @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") 1037 @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") 1038 def test_sethostname(self): 1039 oldhn = socket.gethostname() 1040 try: 1041 socket.sethostname('new') 1042 except OSError as e: 1043 if e.errno == errno.EPERM: 1044 self.skipTest("test should be run as root") 1045 else: 1046 raise 1047 try: 1048 # running test as root! 1049 self.assertEqual(socket.gethostname(), 'new') 1050 # Should work with bytes objects too 1051 socket.sethostname(b'bar') 1052 self.assertEqual(socket.gethostname(), 'bar') 1053 finally: 1054 socket.sethostname(oldhn) 1055 1056 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 1057 'socket.if_nameindex() not available.') 1058 def testInterfaceNameIndex(self): 1059 interfaces = socket.if_nameindex() 1060 for index, name in interfaces: 1061 self.assertIsInstance(index, int) 1062 self.assertIsInstance(name, str) 1063 # interface indices are non-zero integers 1064 self.assertGreater(index, 0) 1065 _index = socket.if_nametoindex(name) 1066 self.assertIsInstance(_index, int) 1067 self.assertEqual(index, _index) 1068 _name = socket.if_indextoname(index) 1069 self.assertIsInstance(_name, str) 1070 self.assertEqual(name, _name) 1071 1072 @unittest.skipUnless(hasattr(socket, 'if_indextoname'), 1073 'socket.if_indextoname() not available.') 1074 def testInvalidInterfaceIndexToName(self): 1075 self.assertRaises(OSError, socket.if_indextoname, 0) 1076 self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') 1077 1078 @unittest.skipUnless(hasattr(socket, 'if_nametoindex'), 1079 'socket.if_nametoindex() not available.') 1080 def testInvalidInterfaceNameToIndex(self): 1081 self.assertRaises(TypeError, socket.if_nametoindex, 0) 1082 self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') 1083 1084 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 1085 'test needs sys.getrefcount()') 1086 def testRefCountGetNameInfo(self): 1087 # Testing reference count for getnameinfo 1088 try: 1089 # On some versions, this loses a reference 1090 orig = sys.getrefcount(__name__) 1091 socket.getnameinfo(__name__,0) 1092 except TypeError: 1093 if sys.getrefcount(__name__) != orig: 1094 self.fail("socket.getnameinfo loses a reference") 1095 1096 def testInterpreterCrash(self): 1097 # Making sure getnameinfo doesn't crash the interpreter 1098 try: 1099 # On some versions, this crashes the interpreter. 1100 socket.getnameinfo(('x', 0, 0, 0), 0) 1101 except OSError: 1102 pass 1103 1104 def testNtoH(self): 1105 # This just checks that htons etc. are their own inverse, 1106 # when looking at the lower 16 or 32 bits. 1107 sizes = {socket.htonl: 32, socket.ntohl: 32, 1108 socket.htons: 16, socket.ntohs: 16} 1109 for func, size in sizes.items(): 1110 mask = (1<<size) - 1 1111 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 1112 self.assertEqual(i & mask, func(func(i&mask)) & mask) 1113 1114 swapped = func(mask) 1115 self.assertEqual(swapped & mask, mask) 1116 self.assertRaises(OverflowError, func, 1<<34) 1117 1118 @support.cpython_only 1119 def testNtoHErrors(self): 1120 import _testcapi 1121 s_good_values = [0, 1, 2, 0xffff] 1122 l_good_values = s_good_values + [0xffffffff] 1123 l_bad_values = [-1, -2, 1<<32, 1<<1000] 1124 s_bad_values = l_bad_values + [_testcapi.INT_MIN - 1, 1125 _testcapi.INT_MAX + 1] 1126 s_deprecated_values = [1<<16, _testcapi.INT_MAX] 1127 for k in s_good_values: 1128 socket.ntohs(k) 1129 socket.htons(k) 1130 for k in l_good_values: 1131 socket.ntohl(k) 1132 socket.htonl(k) 1133 for k in s_bad_values: 1134 self.assertRaises(OverflowError, socket.ntohs, k) 1135 self.assertRaises(OverflowError, socket.htons, k) 1136 for k in l_bad_values: 1137 self.assertRaises(OverflowError, socket.ntohl, k) 1138 self.assertRaises(OverflowError, socket.htonl, k) 1139 for k in s_deprecated_values: 1140 self.assertWarns(DeprecationWarning, socket.ntohs, k) 1141 self.assertWarns(DeprecationWarning, socket.htons, k) 1142 1143 def testGetServBy(self): 1144 eq = self.assertEqual 1145 # Find one service that exists, then check all the related interfaces. 1146 # I've ordered this by protocols that have both a tcp and udp 1147 # protocol, at least for modern Linuxes. 1148 if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) 1149 or sys.platform in ('linux', 'darwin')): 1150 # avoid the 'echo' service on this platform, as there is an 1151 # assumption breaking non-standard port/protocol entry 1152 services = ('daytime', 'qotd', 'domain') 1153 else: 1154 services = ('echo', 'daytime', 'domain') 1155 for service in services: 1156 try: 1157 port = socket.getservbyname(service, 'tcp') 1158 break 1159 except OSError: 1160 pass 1161 else: 1162 raise OSError 1163 # Try same call with optional protocol omitted 1164 # Issue #26936: Android getservbyname() was broken before API 23. 1165 if (not hasattr(sys, 'getandroidapilevel') or 1166 sys.getandroidapilevel() >= 23): 1167 port2 = socket.getservbyname(service) 1168 eq(port, port2) 1169 # Try udp, but don't barf if it doesn't exist 1170 try: 1171 udpport = socket.getservbyname(service, 'udp') 1172 except OSError: 1173 udpport = None 1174 else: 1175 eq(udpport, port) 1176 # Now make sure the lookup by port returns the same service name 1177 # Issue #26936: Android getservbyport() is broken. 1178 if not support.is_android: 1179 eq(socket.getservbyport(port2), service) 1180 eq(socket.getservbyport(port, 'tcp'), service) 1181 if udpport is not None: 1182 eq(socket.getservbyport(udpport, 'udp'), service) 1183 # Make sure getservbyport does not accept out of range ports. 1184 self.assertRaises(OverflowError, socket.getservbyport, -1) 1185 self.assertRaises(OverflowError, socket.getservbyport, 65536) 1186 1187 def testDefaultTimeout(self): 1188 # Testing default timeout 1189 # The default timeout should initially be None 1190 self.assertEqual(socket.getdefaulttimeout(), None) 1191 with socket.socket() as s: 1192 self.assertEqual(s.gettimeout(), None) 1193 1194 # Set the default timeout to 10, and see if it propagates 1195 with socket_setdefaulttimeout(10): 1196 self.assertEqual(socket.getdefaulttimeout(), 10) 1197 with socket.socket() as sock: 1198 self.assertEqual(sock.gettimeout(), 10) 1199 1200 # Reset the default timeout to None, and see if it propagates 1201 socket.setdefaulttimeout(None) 1202 self.assertEqual(socket.getdefaulttimeout(), None) 1203 with socket.socket() as sock: 1204 self.assertEqual(sock.gettimeout(), None) 1205 1206 # Check that setting it to an invalid value raises ValueError 1207 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 1208 1209 # Check that setting it to an invalid type raises TypeError 1210 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 1211 1212 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 1213 'test needs socket.inet_aton()') 1214 def testIPv4_inet_aton_fourbytes(self): 1215 # Test that issue1008086 and issue767150 are fixed. 1216 # It must return 4 bytes. 1217 self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) 1218 self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) 1219 1220 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1221 'test needs socket.inet_pton()') 1222 def testIPv4toString(self): 1223 from socket import inet_aton as f, inet_pton, AF_INET 1224 g = lambda a: inet_pton(AF_INET, a) 1225 1226 assertInvalid = lambda func,a: self.assertRaises( 1227 (OSError, ValueError), func, a 1228 ) 1229 1230 self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) 1231 self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) 1232 self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) 1233 self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) 1234 self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) 1235 # bpo-29972: inet_pton() doesn't fail on AIX 1236 if not AIX: 1237 assertInvalid(f, '0.0.0.') 1238 assertInvalid(f, '300.0.0.0') 1239 assertInvalid(f, 'a.0.0.0') 1240 assertInvalid(f, '1.2.3.4.5') 1241 assertInvalid(f, '::1') 1242 1243 self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) 1244 self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) 1245 self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) 1246 self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) 1247 assertInvalid(g, '0.0.0.') 1248 assertInvalid(g, '300.0.0.0') 1249 assertInvalid(g, 'a.0.0.0') 1250 assertInvalid(g, '1.2.3.4.5') 1251 assertInvalid(g, '::1') 1252 1253 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1254 'test needs socket.inet_pton()') 1255 def testIPv6toString(self): 1256 try: 1257 from socket import inet_pton, AF_INET6, has_ipv6 1258 if not has_ipv6: 1259 self.skipTest('IPv6 not available') 1260 except ImportError: 1261 self.skipTest('could not import needed symbols from socket') 1262 1263 if sys.platform == "win32": 1264 try: 1265 inet_pton(AF_INET6, '::') 1266 except OSError as e: 1267 if e.winerror == 10022: 1268 self.skipTest('IPv6 might not be supported') 1269 1270 f = lambda a: inet_pton(AF_INET6, a) 1271 assertInvalid = lambda a: self.assertRaises( 1272 (OSError, ValueError), f, a 1273 ) 1274 1275 self.assertEqual(b'\x00' * 16, f('::')) 1276 self.assertEqual(b'\x00' * 16, f('0::0')) 1277 self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) 1278 self.assertEqual( 1279 b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 1280 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 1281 ) 1282 self.assertEqual( 1283 b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', 1284 f('ad42:abc::127:0:254:2') 1285 ) 1286 self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) 1287 assertInvalid('0x20::') 1288 assertInvalid(':::') 1289 assertInvalid('::0::') 1290 assertInvalid('1::abc::') 1291 assertInvalid('1::abc::def') 1292 assertInvalid('1:2:3:4:5:6') 1293 assertInvalid('1:2:3:4:5:6:') 1294 assertInvalid('1:2:3:4:5:6:7:8:0') 1295 # bpo-29972: inet_pton() doesn't fail on AIX 1296 if not AIX: 1297 assertInvalid('1:2:3:4:5:6:7:8:') 1298 1299 self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', 1300 f('::254.42.23.64') 1301 ) 1302 self.assertEqual( 1303 b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', 1304 f('42::a29b:254.42.23.64') 1305 ) 1306 self.assertEqual( 1307 b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', 1308 f('42:a8b9:0:2:ffff:a29b:254.42.23.64') 1309 ) 1310 assertInvalid('255.254.253.252') 1311 assertInvalid('1::260.2.3.0') 1312 assertInvalid('1::0.be.e.0') 1313 assertInvalid('1:2:3:4:5:6:7:1.2.3.4') 1314 assertInvalid('::1.2.3.4:0') 1315 assertInvalid('0.100.200.0:3:4:5:6:7:8') 1316 1317 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1318 'test needs socket.inet_ntop()') 1319 def testStringToIPv4(self): 1320 from socket import inet_ntoa as f, inet_ntop, AF_INET 1321 g = lambda a: inet_ntop(AF_INET, a) 1322 assertInvalid = lambda func,a: self.assertRaises( 1323 (OSError, ValueError), func, a 1324 ) 1325 1326 self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) 1327 self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) 1328 self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) 1329 self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) 1330 assertInvalid(f, b'\x00' * 3) 1331 assertInvalid(f, b'\x00' * 5) 1332 assertInvalid(f, b'\x00' * 16) 1333 self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) 1334 1335 self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) 1336 self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) 1337 self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) 1338 assertInvalid(g, b'\x00' * 3) 1339 assertInvalid(g, b'\x00' * 5) 1340 assertInvalid(g, b'\x00' * 16) 1341 self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) 1342 1343 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1344 'test needs socket.inet_ntop()') 1345 def testStringToIPv6(self): 1346 try: 1347 from socket import inet_ntop, AF_INET6, has_ipv6 1348 if not has_ipv6: 1349 self.skipTest('IPv6 not available') 1350 except ImportError: 1351 self.skipTest('could not import needed symbols from socket') 1352 1353 if sys.platform == "win32": 1354 try: 1355 inet_ntop(AF_INET6, b'\x00' * 16) 1356 except OSError as e: 1357 if e.winerror == 10022: 1358 self.skipTest('IPv6 might not be supported') 1359 1360 f = lambda a: inet_ntop(AF_INET6, a) 1361 assertInvalid = lambda a: self.assertRaises( 1362 (OSError, ValueError), f, a 1363 ) 1364 1365 self.assertEqual('::', f(b'\x00' * 16)) 1366 self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) 1367 self.assertEqual( 1368 'aef:b01:506:1001:ffff:9997:55:170', 1369 f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 1370 ) 1371 self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) 1372 1373 assertInvalid(b'\x12' * 15) 1374 assertInvalid(b'\x12' * 17) 1375 assertInvalid(b'\x12' * 4) 1376 1377 # XXX The following don't test module-level functionality... 1378 1379 def testSockName(self): 1380 # Testing getsockname() 1381 port = socket_helper.find_unused_port() 1382 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1383 self.addCleanup(sock.close) 1384 sock.bind(("0.0.0.0", port)) 1385 name = sock.getsockname() 1386 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 1387 # it reasonable to get the host's addr in addition to 0.0.0.0. 1388 # At least for eCos. This is required for the S/390 to pass. 1389 try: 1390 my_ip_addr = socket.gethostbyname(socket.gethostname()) 1391 except OSError: 1392 # Probably name lookup wasn't set up right; skip this test 1393 self.skipTest('name lookup failure') 1394 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 1395 self.assertEqual(name[1], port) 1396 1397 def testGetSockOpt(self): 1398 # Testing getsockopt() 1399 # We know a socket should start without reuse==0 1400 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1401 self.addCleanup(sock.close) 1402 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1403 self.assertFalse(reuse != 0, "initial mode is reuse") 1404 1405 def testSetSockOpt(self): 1406 # Testing setsockopt() 1407 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1408 self.addCleanup(sock.close) 1409 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1410 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1411 self.assertFalse(reuse == 0, "failed to set reuse mode") 1412 1413 def testSendAfterClose(self): 1414 # testing send() after close() with timeout 1415 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1416 sock.settimeout(1) 1417 self.assertRaises(OSError, sock.send, b"spam") 1418 1419 def testCloseException(self): 1420 sock = socket.socket() 1421 sock.bind((socket._LOCALHOST, 0)) 1422 socket.socket(fileno=sock.fileno()).close() 1423 try: 1424 sock.close() 1425 except OSError as err: 1426 # Winsock apparently raises ENOTSOCK 1427 self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) 1428 else: 1429 self.fail("close() should raise EBADF/ENOTSOCK") 1430 1431 def testNewAttributes(self): 1432 # testing .family, .type and .protocol 1433 1434 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1435 self.assertEqual(sock.family, socket.AF_INET) 1436 if hasattr(socket, 'SOCK_CLOEXEC'): 1437 self.assertIn(sock.type, 1438 (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, 1439 socket.SOCK_STREAM)) 1440 else: 1441 self.assertEqual(sock.type, socket.SOCK_STREAM) 1442 self.assertEqual(sock.proto, 0) 1443 1444 def test_getsockaddrarg(self): 1445 sock = socket.socket() 1446 self.addCleanup(sock.close) 1447 port = socket_helper.find_unused_port() 1448 big_port = port + 65536 1449 neg_port = port - 65536 1450 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 1451 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 1452 # Since find_unused_port() is inherently subject to race conditions, we 1453 # call it a couple times if necessary. 1454 for i in itertools.count(): 1455 port = socket_helper.find_unused_port() 1456 try: 1457 sock.bind((HOST, port)) 1458 except OSError as e: 1459 if e.errno != errno.EADDRINUSE or i == 5: 1460 raise 1461 else: 1462 break 1463 1464 @unittest.skipUnless(os.name == "nt", "Windows specific") 1465 def test_sock_ioctl(self): 1466 self.assertTrue(hasattr(socket.socket, 'ioctl')) 1467 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 1468 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 1469 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 1470 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 1471 s = socket.socket() 1472 self.addCleanup(s.close) 1473 self.assertRaises(ValueError, s.ioctl, -1, None) 1474 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 1475 1476 @unittest.skipUnless(os.name == "nt", "Windows specific") 1477 @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), 1478 'Loopback fast path support required for this test') 1479 def test_sio_loopback_fast_path(self): 1480 s = socket.socket() 1481 self.addCleanup(s.close) 1482 try: 1483 s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) 1484 except OSError as exc: 1485 WSAEOPNOTSUPP = 10045 1486 if exc.winerror == WSAEOPNOTSUPP: 1487 self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " 1488 "doesn't implemented in this Windows version") 1489 raise 1490 self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) 1491 1492 def testGetaddrinfo(self): 1493 try: 1494 socket.getaddrinfo('localhost', 80) 1495 except socket.gaierror as err: 1496 if err.errno == socket.EAI_SERVICE: 1497 # see http://bugs.python.org/issue1282647 1498 self.skipTest("buggy libc version") 1499 raise 1500 # len of every sequence is supposed to be == 5 1501 for info in socket.getaddrinfo(HOST, None): 1502 self.assertEqual(len(info), 5) 1503 # host can be a domain name, a string representation of an 1504 # IPv4/v6 address or None 1505 socket.getaddrinfo('localhost', 80) 1506 socket.getaddrinfo('127.0.0.1', 80) 1507 socket.getaddrinfo(None, 80) 1508 if socket_helper.IPV6_ENABLED: 1509 socket.getaddrinfo('::1', 80) 1510 # port can be a string service name such as "http", a numeric 1511 # port number or None 1512 # Issue #26936: Android getaddrinfo() was broken before API level 23. 1513 if (not hasattr(sys, 'getandroidapilevel') or 1514 sys.getandroidapilevel() >= 23): 1515 socket.getaddrinfo(HOST, "http") 1516 socket.getaddrinfo(HOST, 80) 1517 socket.getaddrinfo(HOST, None) 1518 # test family and socktype filters 1519 infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) 1520 for family, type, _, _, _ in infos: 1521 self.assertEqual(family, socket.AF_INET) 1522 self.assertEqual(str(family), 'AddressFamily.AF_INET') 1523 self.assertEqual(type, socket.SOCK_STREAM) 1524 self.assertEqual(str(type), 'SocketKind.SOCK_STREAM') 1525 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1526 for _, socktype, _, _, _ in infos: 1527 self.assertEqual(socktype, socket.SOCK_STREAM) 1528 # test proto and flags arguments 1529 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1530 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1531 # a server willing to support both IPv4 and IPv6 will 1532 # usually do this 1533 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1534 socket.AI_PASSIVE) 1535 # test keyword arguments 1536 a = socket.getaddrinfo(HOST, None) 1537 b = socket.getaddrinfo(host=HOST, port=None) 1538 self.assertEqual(a, b) 1539 a = socket.getaddrinfo(HOST, None, socket.AF_INET) 1540 b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) 1541 self.assertEqual(a, b) 1542 a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1543 b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) 1544 self.assertEqual(a, b) 1545 a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1546 b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) 1547 self.assertEqual(a, b) 1548 a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1549 b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) 1550 self.assertEqual(a, b) 1551 a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1552 socket.AI_PASSIVE) 1553 b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, 1554 type=socket.SOCK_STREAM, proto=0, 1555 flags=socket.AI_PASSIVE) 1556 self.assertEqual(a, b) 1557 # Issue #6697. 1558 self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') 1559 1560 # Issue 17269: test workaround for OS X platform bug segfault 1561 if hasattr(socket, 'AI_NUMERICSERV'): 1562 try: 1563 # The arguments here are undefined and the call may succeed 1564 # or fail. All we care here is that it doesn't segfault. 1565 socket.getaddrinfo("localhost", None, 0, 0, 0, 1566 socket.AI_NUMERICSERV) 1567 except socket.gaierror: 1568 pass 1569 1570 def test_getnameinfo(self): 1571 # only IP addresses are allowed 1572 self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) 1573 1574 @unittest.skipUnless(support.is_resource_enabled('network'), 1575 'network is not enabled') 1576 def test_idna(self): 1577 # Check for internet access before running test 1578 # (issue #12804, issue #25138). 1579 with socket_helper.transient_internet('python.org'): 1580 socket.gethostbyname('python.org') 1581 1582 # these should all be successful 1583 domain = 'испытание.pythontest.net' 1584 socket.gethostbyname(domain) 1585 socket.gethostbyname_ex(domain) 1586 socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) 1587 # this may not work if the forward lookup chooses the IPv6 address, as that doesn't 1588 # have a reverse entry yet 1589 # socket.gethostbyaddr('испытание.python.org') 1590 1591 def check_sendall_interrupted(self, with_timeout): 1592 # socketpair() is not strictly required, but it makes things easier. 1593 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 1594 self.skipTest("signal.alarm and socket.socketpair required for this test") 1595 # Our signal handlers clobber the C errno by calling a math function 1596 # with an invalid domain value. 1597 def ok_handler(*args): 1598 self.assertRaises(ValueError, math.acosh, 0) 1599 def raising_handler(*args): 1600 self.assertRaises(ValueError, math.acosh, 0) 1601 1 // 0 1602 c, s = socket.socketpair() 1603 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 1604 try: 1605 if with_timeout: 1606 # Just above the one second minimum for signal.alarm 1607 c.settimeout(1.5) 1608 with self.assertRaises(ZeroDivisionError): 1609 signal.alarm(1) 1610 c.sendall(b"x" * support.SOCK_MAX_SIZE) 1611 if with_timeout: 1612 signal.signal(signal.SIGALRM, ok_handler) 1613 signal.alarm(1) 1614 self.assertRaises(socket.timeout, c.sendall, 1615 b"x" * support.SOCK_MAX_SIZE) 1616 finally: 1617 signal.alarm(0) 1618 signal.signal(signal.SIGALRM, old_alarm) 1619 c.close() 1620 s.close() 1621 1622 def test_sendall_interrupted(self): 1623 self.check_sendall_interrupted(False) 1624 1625 def test_sendall_interrupted_with_timeout(self): 1626 self.check_sendall_interrupted(True) 1627 1628 def test_dealloc_warn(self): 1629 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1630 r = repr(sock) 1631 with self.assertWarns(ResourceWarning) as cm: 1632 sock = None 1633 support.gc_collect() 1634 self.assertIn(r, str(cm.warning.args[0])) 1635 # An open socket file object gets dereferenced after the socket 1636 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1637 f = sock.makefile('rb') 1638 r = repr(sock) 1639 sock = None 1640 support.gc_collect() 1641 with self.assertWarns(ResourceWarning): 1642 f = None 1643 support.gc_collect() 1644 1645 def test_name_closed_socketio(self): 1646 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1647 fp = sock.makefile("rb") 1648 fp.close() 1649 self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") 1650 1651 def test_unusable_closed_socketio(self): 1652 with socket.socket() as sock: 1653 fp = sock.makefile("rb", buffering=0) 1654 self.assertTrue(fp.readable()) 1655 self.assertFalse(fp.writable()) 1656 self.assertFalse(fp.seekable()) 1657 fp.close() 1658 self.assertRaises(ValueError, fp.readable) 1659 self.assertRaises(ValueError, fp.writable) 1660 self.assertRaises(ValueError, fp.seekable) 1661 1662 def test_socket_close(self): 1663 sock = socket.socket() 1664 try: 1665 sock.bind((HOST, 0)) 1666 socket.close(sock.fileno()) 1667 with self.assertRaises(OSError): 1668 sock.listen(1) 1669 finally: 1670 with self.assertRaises(OSError): 1671 # sock.close() fails with EBADF 1672 sock.close() 1673 with self.assertRaises(TypeError): 1674 socket.close(None) 1675 with self.assertRaises(OSError): 1676 socket.close(-1) 1677 1678 def test_makefile_mode(self): 1679 for mode in 'r', 'rb', 'rw', 'w', 'wb': 1680 with self.subTest(mode=mode): 1681 with socket.socket() as sock: 1682 with sock.makefile(mode) as fp: 1683 self.assertEqual(fp.mode, mode) 1684 1685 def test_makefile_invalid_mode(self): 1686 for mode in 'rt', 'x', '+', 'a': 1687 with self.subTest(mode=mode): 1688 with socket.socket() as sock: 1689 with self.assertRaisesRegex(ValueError, 'invalid mode'): 1690 sock.makefile(mode) 1691 1692 def test_pickle(self): 1693 sock = socket.socket() 1694 with sock: 1695 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1696 self.assertRaises(TypeError, pickle.dumps, sock, protocol) 1697 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1698 family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) 1699 self.assertEqual(family, socket.AF_INET) 1700 type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) 1701 self.assertEqual(type, socket.SOCK_STREAM) 1702 1703 def test_listen_backlog(self): 1704 for backlog in 0, -1: 1705 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1706 srv.bind((HOST, 0)) 1707 srv.listen(backlog) 1708 1709 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1710 srv.bind((HOST, 0)) 1711 srv.listen() 1712 1713 @support.cpython_only 1714 def test_listen_backlog_overflow(self): 1715 # Issue 15989 1716 import _testcapi 1717 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1718 srv.bind((HOST, 0)) 1719 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 1720 1721 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1722 def test_flowinfo(self): 1723 self.assertRaises(OverflowError, socket.getnameinfo, 1724 (socket_helper.HOSTv6, 0, 0xffffffff), 0) 1725 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 1726 self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10)) 1727 1728 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1729 def test_getaddrinfo_ipv6_basic(self): 1730 ((*_, sockaddr),) = socket.getaddrinfo( 1731 'ff02::1de:c0:face:8D', # Note capital letter `D`. 1732 1234, socket.AF_INET6, 1733 socket.SOCK_DGRAM, 1734 socket.IPPROTO_UDP 1735 ) 1736 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) 1737 1738 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1739 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1740 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1741 def test_getaddrinfo_ipv6_scopeid_symbolic(self): 1742 # Just pick up any network interface (Linux, Mac OS X) 1743 (ifindex, test_interface) = socket.if_nameindex()[0] 1744 ((*_, sockaddr),) = socket.getaddrinfo( 1745 'ff02::1de:c0:face:8D%' + test_interface, 1746 1234, socket.AF_INET6, 1747 socket.SOCK_DGRAM, 1748 socket.IPPROTO_UDP 1749 ) 1750 # Note missing interface name part in IPv6 address 1751 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1752 1753 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1754 @unittest.skipUnless( 1755 sys.platform == 'win32', 1756 'Numeric scope id does not work or undocumented') 1757 def test_getaddrinfo_ipv6_scopeid_numeric(self): 1758 # Also works on Linux and Mac OS X, but is not documented (?) 1759 # Windows, Linux and Max OS X allow nonexistent interface numbers here. 1760 ifindex = 42 1761 ((*_, sockaddr),) = socket.getaddrinfo( 1762 'ff02::1de:c0:face:8D%' + str(ifindex), 1763 1234, socket.AF_INET6, 1764 socket.SOCK_DGRAM, 1765 socket.IPPROTO_UDP 1766 ) 1767 # Note missing interface name part in IPv6 address 1768 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1769 1770 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1771 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1772 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1773 def test_getnameinfo_ipv6_scopeid_symbolic(self): 1774 # Just pick up any network interface. 1775 (ifindex, test_interface) = socket.if_nameindex()[0] 1776 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1777 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1778 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) 1779 1780 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1781 @unittest.skipUnless( sys.platform == 'win32', 1782 'Numeric scope id does not work or undocumented') 1783 def test_getnameinfo_ipv6_scopeid_numeric(self): 1784 # Also works on Linux (undocumented), but does not work on Mac OS X 1785 # Windows and Linux allow nonexistent interface numbers here. 1786 ifindex = 42 1787 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1788 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1789 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234')) 1790 1791 def test_str_for_enums(self): 1792 # Make sure that the AF_* and SOCK_* constants have enum-like string 1793 # reprs. 1794 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1795 self.assertEqual(str(s.family), 'AddressFamily.AF_INET') 1796 self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM') 1797 1798 def test_socket_consistent_sock_type(self): 1799 SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) 1800 SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0) 1801 sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC 1802 1803 with socket.socket(socket.AF_INET, sock_type) as s: 1804 self.assertEqual(s.type, socket.SOCK_STREAM) 1805 s.settimeout(1) 1806 self.assertEqual(s.type, socket.SOCK_STREAM) 1807 s.settimeout(0) 1808 self.assertEqual(s.type, socket.SOCK_STREAM) 1809 s.setblocking(True) 1810 self.assertEqual(s.type, socket.SOCK_STREAM) 1811 s.setblocking(False) 1812 self.assertEqual(s.type, socket.SOCK_STREAM) 1813 1814 def test_unknown_socket_family_repr(self): 1815 # Test that when created with a family that's not one of the known 1816 # AF_*/SOCK_* constants, socket.family just returns the number. 1817 # 1818 # To do this we fool socket.socket into believing it already has an 1819 # open fd because on this path it doesn't actually verify the family and 1820 # type and populates the socket object. 1821 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1822 fd = sock.detach() 1823 unknown_family = max(socket.AddressFamily.__members__.values()) + 1 1824 1825 unknown_type = max( 1826 kind 1827 for name, kind in socket.SocketKind.__members__.items() 1828 if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'} 1829 ) + 1 1830 1831 with socket.socket( 1832 family=unknown_family, type=unknown_type, proto=23, 1833 fileno=fd) as s: 1834 self.assertEqual(s.family, unknown_family) 1835 self.assertEqual(s.type, unknown_type) 1836 # some OS like macOS ignore proto 1837 self.assertIn(s.proto, {0, 23}) 1838 1839 @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') 1840 def test__sendfile_use_sendfile(self): 1841 class File: 1842 def __init__(self, fd): 1843 self.fd = fd 1844 1845 def fileno(self): 1846 return self.fd 1847 with socket.socket() as sock: 1848 fd = os.open(os.curdir, os.O_RDONLY) 1849 os.close(fd) 1850 with self.assertRaises(socket._GiveupOnSendfile): 1851 sock._sendfile_use_sendfile(File(fd)) 1852 with self.assertRaises(OverflowError): 1853 sock._sendfile_use_sendfile(File(2**1000)) 1854 with self.assertRaises(TypeError): 1855 sock._sendfile_use_sendfile(File(None)) 1856 1857 def _test_socket_fileno(self, s, family, stype): 1858 self.assertEqual(s.family, family) 1859 self.assertEqual(s.type, stype) 1860 1861 fd = s.fileno() 1862 s2 = socket.socket(fileno=fd) 1863 self.addCleanup(s2.close) 1864 # detach old fd to avoid double close 1865 s.detach() 1866 self.assertEqual(s2.family, family) 1867 self.assertEqual(s2.type, stype) 1868 self.assertEqual(s2.fileno(), fd) 1869 1870 def test_socket_fileno(self): 1871 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1872 self.addCleanup(s.close) 1873 s.bind((socket_helper.HOST, 0)) 1874 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) 1875 1876 if hasattr(socket, "SOCK_DGRAM"): 1877 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1878 self.addCleanup(s.close) 1879 s.bind((socket_helper.HOST, 0)) 1880 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) 1881 1882 if socket_helper.IPV6_ENABLED: 1883 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 1884 self.addCleanup(s.close) 1885 s.bind((socket_helper.HOSTv6, 0, 0, 0)) 1886 self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) 1887 1888 if hasattr(socket, "AF_UNIX"): 1889 tmpdir = tempfile.mkdtemp() 1890 self.addCleanup(shutil.rmtree, tmpdir) 1891 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1892 self.addCleanup(s.close) 1893 try: 1894 s.bind(os.path.join(tmpdir, 'socket')) 1895 except PermissionError: 1896 pass 1897 else: 1898 self._test_socket_fileno(s, socket.AF_UNIX, 1899 socket.SOCK_STREAM) 1900 1901 def test_socket_fileno_rejects_float(self): 1902 with self.assertRaisesRegex(TypeError, "integer argument expected"): 1903 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5) 1904 1905 def test_socket_fileno_rejects_other_types(self): 1906 with self.assertRaisesRegex(TypeError, "integer is required"): 1907 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo") 1908 1909 def test_socket_fileno_rejects_invalid_socket(self): 1910 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1911 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1) 1912 1913 @unittest.skipIf(os.name == "nt", "Windows disallows -1 only") 1914 def test_socket_fileno_rejects_negative(self): 1915 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1916 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42) 1917 1918 def test_socket_fileno_requires_valid_fd(self): 1919 WSAENOTSOCK = 10038 1920 with self.assertRaises(OSError) as cm: 1921 socket.socket(fileno=support.make_bad_fd()) 1922 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1923 1924 with self.assertRaises(OSError) as cm: 1925 socket.socket( 1926 socket.AF_INET, 1927 socket.SOCK_STREAM, 1928 fileno=support.make_bad_fd()) 1929 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1930 1931 def test_socket_fileno_requires_socket_fd(self): 1932 with tempfile.NamedTemporaryFile() as afile: 1933 with self.assertRaises(OSError): 1934 socket.socket(fileno=afile.fileno()) 1935 1936 with self.assertRaises(OSError) as cm: 1937 socket.socket( 1938 socket.AF_INET, 1939 socket.SOCK_STREAM, 1940 fileno=afile.fileno()) 1941 self.assertEqual(cm.exception.errno, errno.ENOTSOCK) 1942 1943 1944@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1945class BasicCANTest(unittest.TestCase): 1946 1947 def testCrucialConstants(self): 1948 socket.AF_CAN 1949 socket.PF_CAN 1950 socket.CAN_RAW 1951 1952 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1953 'socket.CAN_BCM required for this test.') 1954 def testBCMConstants(self): 1955 socket.CAN_BCM 1956 1957 # opcodes 1958 socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task 1959 socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task 1960 socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task 1961 socket.CAN_BCM_TX_SEND # send one CAN frame 1962 socket.CAN_BCM_RX_SETUP # create RX content filter subscription 1963 socket.CAN_BCM_RX_DELETE # remove RX content filter subscription 1964 socket.CAN_BCM_RX_READ # read properties of RX content filter subscription 1965 socket.CAN_BCM_TX_STATUS # reply to TX_READ request 1966 socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) 1967 socket.CAN_BCM_RX_STATUS # reply to RX_READ request 1968 socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent 1969 socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) 1970 1971 # flags 1972 socket.CAN_BCM_SETTIMER 1973 socket.CAN_BCM_STARTTIMER 1974 socket.CAN_BCM_TX_COUNTEVT 1975 socket.CAN_BCM_TX_ANNOUNCE 1976 socket.CAN_BCM_TX_CP_CAN_ID 1977 socket.CAN_BCM_RX_FILTER_ID 1978 socket.CAN_BCM_RX_CHECK_DLC 1979 socket.CAN_BCM_RX_NO_AUTOTIMER 1980 socket.CAN_BCM_RX_ANNOUNCE_RESUME 1981 socket.CAN_BCM_TX_RESET_MULTI_IDX 1982 socket.CAN_BCM_RX_RTR_FRAME 1983 1984 def testCreateSocket(self): 1985 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1986 pass 1987 1988 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1989 'socket.CAN_BCM required for this test.') 1990 def testCreateBCMSocket(self): 1991 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: 1992 pass 1993 1994 def testBindAny(self): 1995 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1996 address = ('', ) 1997 s.bind(address) 1998 self.assertEqual(s.getsockname(), address) 1999 2000 def testTooLongInterfaceName(self): 2001 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2002 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2003 self.assertRaisesRegex(OSError, 'interface name too long', 2004 s.bind, ('x' * 1024,)) 2005 2006 @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), 2007 'socket.CAN_RAW_LOOPBACK required for this test.') 2008 def testLoopback(self): 2009 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2010 for loopback in (0, 1): 2011 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, 2012 loopback) 2013 self.assertEqual(loopback, 2014 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) 2015 2016 @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), 2017 'socket.CAN_RAW_FILTER required for this test.') 2018 def testFilter(self): 2019 can_id, can_mask = 0x200, 0x700 2020 can_filter = struct.pack("=II", can_id, can_mask) 2021 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2022 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) 2023 self.assertEqual(can_filter, 2024 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) 2025 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) 2026 2027 2028@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 2029class CANTest(ThreadedCANSocketTest): 2030 2031 def __init__(self, methodName='runTest'): 2032 ThreadedCANSocketTest.__init__(self, methodName=methodName) 2033 2034 @classmethod 2035 def build_can_frame(cls, can_id, data): 2036 """Build a CAN frame.""" 2037 can_dlc = len(data) 2038 data = data.ljust(8, b'\x00') 2039 return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) 2040 2041 @classmethod 2042 def dissect_can_frame(cls, frame): 2043 """Dissect a CAN frame.""" 2044 can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) 2045 return (can_id, can_dlc, data[:can_dlc]) 2046 2047 def testSendFrame(self): 2048 cf, addr = self.s.recvfrom(self.bufsize) 2049 self.assertEqual(self.cf, cf) 2050 self.assertEqual(addr[0], self.interface) 2051 self.assertEqual(addr[1], socket.AF_CAN) 2052 2053 def _testSendFrame(self): 2054 self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') 2055 self.cli.send(self.cf) 2056 2057 def testSendMaxFrame(self): 2058 cf, addr = self.s.recvfrom(self.bufsize) 2059 self.assertEqual(self.cf, cf) 2060 2061 def _testSendMaxFrame(self): 2062 self.cf = self.build_can_frame(0x00, b'\x07' * 8) 2063 self.cli.send(self.cf) 2064 2065 def testSendMultiFrames(self): 2066 cf, addr = self.s.recvfrom(self.bufsize) 2067 self.assertEqual(self.cf1, cf) 2068 2069 cf, addr = self.s.recvfrom(self.bufsize) 2070 self.assertEqual(self.cf2, cf) 2071 2072 def _testSendMultiFrames(self): 2073 self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') 2074 self.cli.send(self.cf1) 2075 2076 self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') 2077 self.cli.send(self.cf2) 2078 2079 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2080 'socket.CAN_BCM required for this test.') 2081 def _testBCM(self): 2082 cf, addr = self.cli.recvfrom(self.bufsize) 2083 self.assertEqual(self.cf, cf) 2084 can_id, can_dlc, data = self.dissect_can_frame(cf) 2085 self.assertEqual(self.can_id, can_id) 2086 self.assertEqual(self.data, data) 2087 2088 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2089 'socket.CAN_BCM required for this test.') 2090 def testBCM(self): 2091 bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) 2092 self.addCleanup(bcm.close) 2093 bcm.connect((self.interface,)) 2094 self.can_id = 0x123 2095 self.data = bytes([0xc0, 0xff, 0xee]) 2096 self.cf = self.build_can_frame(self.can_id, self.data) 2097 opcode = socket.CAN_BCM_TX_SEND 2098 flags = 0 2099 count = 0 2100 ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 2101 bcm_can_id = 0x0222 2102 nframes = 1 2103 assert len(self.cf) == 16 2104 header = struct.pack(self.bcm_cmd_msg_fmt, 2105 opcode, 2106 flags, 2107 count, 2108 ival1_seconds, 2109 ival1_usec, 2110 ival2_seconds, 2111 ival2_usec, 2112 bcm_can_id, 2113 nframes, 2114 ) 2115 header_plus_frame = header + self.cf 2116 bytes_sent = bcm.send(header_plus_frame) 2117 self.assertEqual(bytes_sent, len(header_plus_frame)) 2118 2119 2120@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.') 2121class ISOTPTest(unittest.TestCase): 2122 2123 def __init__(self, *args, **kwargs): 2124 super().__init__(*args, **kwargs) 2125 self.interface = "vcan0" 2126 2127 def testCrucialConstants(self): 2128 socket.AF_CAN 2129 socket.PF_CAN 2130 socket.CAN_ISOTP 2131 socket.SOCK_DGRAM 2132 2133 def testCreateSocket(self): 2134 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2135 pass 2136 2137 @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"), 2138 'socket.CAN_ISOTP required for this test.') 2139 def testCreateISOTPSocket(self): 2140 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2141 pass 2142 2143 def testTooLongInterfaceName(self): 2144 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2145 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2146 with self.assertRaisesRegex(OSError, 'interface name too long'): 2147 s.bind(('x' * 1024, 1, 2)) 2148 2149 def testBind(self): 2150 try: 2151 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2152 addr = self.interface, 0x123, 0x456 2153 s.bind(addr) 2154 self.assertEqual(s.getsockname(), addr) 2155 except OSError as e: 2156 if e.errno == errno.ENODEV: 2157 self.skipTest('network interface `%s` does not exist' % 2158 self.interface) 2159 else: 2160 raise 2161 2162 2163@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.') 2164class J1939Test(unittest.TestCase): 2165 2166 def __init__(self, *args, **kwargs): 2167 super().__init__(*args, **kwargs) 2168 self.interface = "vcan0" 2169 2170 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2171 'socket.CAN_J1939 required for this test.') 2172 def testJ1939Constants(self): 2173 socket.CAN_J1939 2174 2175 socket.J1939_MAX_UNICAST_ADDR 2176 socket.J1939_IDLE_ADDR 2177 socket.J1939_NO_ADDR 2178 socket.J1939_NO_NAME 2179 socket.J1939_PGN_REQUEST 2180 socket.J1939_PGN_ADDRESS_CLAIMED 2181 socket.J1939_PGN_ADDRESS_COMMANDED 2182 socket.J1939_PGN_PDU1_MAX 2183 socket.J1939_PGN_MAX 2184 socket.J1939_NO_PGN 2185 2186 # J1939 socket options 2187 socket.SO_J1939_FILTER 2188 socket.SO_J1939_PROMISC 2189 socket.SO_J1939_SEND_PRIO 2190 socket.SO_J1939_ERRQUEUE 2191 2192 socket.SCM_J1939_DEST_ADDR 2193 socket.SCM_J1939_DEST_NAME 2194 socket.SCM_J1939_PRIO 2195 socket.SCM_J1939_ERRQUEUE 2196 2197 socket.J1939_NLA_PAD 2198 socket.J1939_NLA_BYTES_ACKED 2199 2200 socket.J1939_EE_INFO_NONE 2201 socket.J1939_EE_INFO_TX_ABORT 2202 2203 socket.J1939_FILTER_MAX 2204 2205 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2206 'socket.CAN_J1939 required for this test.') 2207 def testCreateJ1939Socket(self): 2208 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2209 pass 2210 2211 def testBind(self): 2212 try: 2213 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2214 addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR 2215 s.bind(addr) 2216 self.assertEqual(s.getsockname(), addr) 2217 except OSError as e: 2218 if e.errno == errno.ENODEV: 2219 self.skipTest('network interface `%s` does not exist' % 2220 self.interface) 2221 else: 2222 raise 2223 2224 2225@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2226class BasicRDSTest(unittest.TestCase): 2227 2228 def testCrucialConstants(self): 2229 socket.AF_RDS 2230 socket.PF_RDS 2231 2232 def testCreateSocket(self): 2233 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2234 pass 2235 2236 def testSocketBufferSize(self): 2237 bufsize = 16384 2238 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2239 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) 2240 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) 2241 2242 2243@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2244class RDSTest(ThreadedRDSSocketTest): 2245 2246 def __init__(self, methodName='runTest'): 2247 ThreadedRDSSocketTest.__init__(self, methodName=methodName) 2248 2249 def setUp(self): 2250 super().setUp() 2251 self.evt = threading.Event() 2252 2253 def testSendAndRecv(self): 2254 data, addr = self.serv.recvfrom(self.bufsize) 2255 self.assertEqual(self.data, data) 2256 self.assertEqual(self.cli_addr, addr) 2257 2258 def _testSendAndRecv(self): 2259 self.data = b'spam' 2260 self.cli.sendto(self.data, 0, (HOST, self.port)) 2261 2262 def testPeek(self): 2263 data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) 2264 self.assertEqual(self.data, data) 2265 data, addr = self.serv.recvfrom(self.bufsize) 2266 self.assertEqual(self.data, data) 2267 2268 def _testPeek(self): 2269 self.data = b'spam' 2270 self.cli.sendto(self.data, 0, (HOST, self.port)) 2271 2272 @requireAttrs(socket.socket, 'recvmsg') 2273 def testSendAndRecvMsg(self): 2274 data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) 2275 self.assertEqual(self.data, data) 2276 2277 @requireAttrs(socket.socket, 'sendmsg') 2278 def _testSendAndRecvMsg(self): 2279 self.data = b'hello ' * 10 2280 self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) 2281 2282 def testSendAndRecvMulti(self): 2283 data, addr = self.serv.recvfrom(self.bufsize) 2284 self.assertEqual(self.data1, data) 2285 2286 data, addr = self.serv.recvfrom(self.bufsize) 2287 self.assertEqual(self.data2, data) 2288 2289 def _testSendAndRecvMulti(self): 2290 self.data1 = b'bacon' 2291 self.cli.sendto(self.data1, 0, (HOST, self.port)) 2292 2293 self.data2 = b'egg' 2294 self.cli.sendto(self.data2, 0, (HOST, self.port)) 2295 2296 def testSelect(self): 2297 r, w, x = select.select([self.serv], [], [], 3.0) 2298 self.assertIn(self.serv, r) 2299 data, addr = self.serv.recvfrom(self.bufsize) 2300 self.assertEqual(self.data, data) 2301 2302 def _testSelect(self): 2303 self.data = b'select' 2304 self.cli.sendto(self.data, 0, (HOST, self.port)) 2305 2306@unittest.skipUnless(HAVE_SOCKET_QIPCRTR, 2307 'QIPCRTR sockets required for this test.') 2308class BasicQIPCRTRTest(unittest.TestCase): 2309 2310 def testCrucialConstants(self): 2311 socket.AF_QIPCRTR 2312 2313 def testCreateSocket(self): 2314 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2315 pass 2316 2317 def testUnbound(self): 2318 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2319 self.assertEqual(s.getsockname()[1], 0) 2320 2321 def testBindSock(self): 2322 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2323 socket_helper.bind_port(s, host=s.getsockname()[0]) 2324 self.assertNotEqual(s.getsockname()[1], 0) 2325 2326 def testInvalidBindSock(self): 2327 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2328 self.assertRaises(OSError, socket_helper.bind_port, s, host=-2) 2329 2330 def testAutoBindSock(self): 2331 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2332 s.connect((123, 123)) 2333 self.assertNotEqual(s.getsockname()[1], 0) 2334 2335@unittest.skipIf(fcntl is None, "need fcntl") 2336@unittest.skipUnless(HAVE_SOCKET_VSOCK, 2337 'VSOCK sockets required for this test.') 2338class BasicVSOCKTest(unittest.TestCase): 2339 2340 def testCrucialConstants(self): 2341 socket.AF_VSOCK 2342 2343 def testVSOCKConstants(self): 2344 socket.SO_VM_SOCKETS_BUFFER_SIZE 2345 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE 2346 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE 2347 socket.VMADDR_CID_ANY 2348 socket.VMADDR_PORT_ANY 2349 socket.VMADDR_CID_HOST 2350 socket.VM_SOCKETS_INVALID_VERSION 2351 socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID 2352 2353 def testCreateSocket(self): 2354 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2355 pass 2356 2357 def testSocketBufferSize(self): 2358 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2359 orig_max = s.getsockopt(socket.AF_VSOCK, 2360 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE) 2361 orig = s.getsockopt(socket.AF_VSOCK, 2362 socket.SO_VM_SOCKETS_BUFFER_SIZE) 2363 orig_min = s.getsockopt(socket.AF_VSOCK, 2364 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE) 2365 2366 s.setsockopt(socket.AF_VSOCK, 2367 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2) 2368 s.setsockopt(socket.AF_VSOCK, 2369 socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2) 2370 s.setsockopt(socket.AF_VSOCK, 2371 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2) 2372 2373 self.assertEqual(orig_max * 2, 2374 s.getsockopt(socket.AF_VSOCK, 2375 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)) 2376 self.assertEqual(orig * 2, 2377 s.getsockopt(socket.AF_VSOCK, 2378 socket.SO_VM_SOCKETS_BUFFER_SIZE)) 2379 self.assertEqual(orig_min * 2, 2380 s.getsockopt(socket.AF_VSOCK, 2381 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)) 2382 2383 2384@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH, 2385 'Bluetooth sockets required for this test.') 2386class BasicBluetoothTest(unittest.TestCase): 2387 2388 def testBluetoothConstants(self): 2389 socket.BDADDR_ANY 2390 socket.BDADDR_LOCAL 2391 socket.AF_BLUETOOTH 2392 socket.BTPROTO_RFCOMM 2393 2394 if sys.platform != "win32": 2395 socket.BTPROTO_HCI 2396 socket.SOL_HCI 2397 socket.BTPROTO_L2CAP 2398 2399 if not sys.platform.startswith("freebsd"): 2400 socket.BTPROTO_SCO 2401 2402 def testCreateRfcommSocket(self): 2403 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: 2404 pass 2405 2406 @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets") 2407 def testCreateL2capSocket(self): 2408 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s: 2409 pass 2410 2411 @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets") 2412 def testCreateHciSocket(self): 2413 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: 2414 pass 2415 2416 @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"), 2417 "windows and freebsd do not support SCO sockets") 2418 def testCreateScoSocket(self): 2419 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: 2420 pass 2421 2422 2423class BasicTCPTest(SocketConnectedTest): 2424 2425 def __init__(self, methodName='runTest'): 2426 SocketConnectedTest.__init__(self, methodName=methodName) 2427 2428 def testRecv(self): 2429 # Testing large receive over TCP 2430 msg = self.cli_conn.recv(1024) 2431 self.assertEqual(msg, MSG) 2432 2433 def _testRecv(self): 2434 self.serv_conn.send(MSG) 2435 2436 def testOverFlowRecv(self): 2437 # Testing receive in chunks over TCP 2438 seg1 = self.cli_conn.recv(len(MSG) - 3) 2439 seg2 = self.cli_conn.recv(1024) 2440 msg = seg1 + seg2 2441 self.assertEqual(msg, MSG) 2442 2443 def _testOverFlowRecv(self): 2444 self.serv_conn.send(MSG) 2445 2446 def testRecvFrom(self): 2447 # Testing large recvfrom() over TCP 2448 msg, addr = self.cli_conn.recvfrom(1024) 2449 self.assertEqual(msg, MSG) 2450 2451 def _testRecvFrom(self): 2452 self.serv_conn.send(MSG) 2453 2454 def testOverFlowRecvFrom(self): 2455 # Testing recvfrom() in chunks over TCP 2456 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 2457 seg2, addr = self.cli_conn.recvfrom(1024) 2458 msg = seg1 + seg2 2459 self.assertEqual(msg, MSG) 2460 2461 def _testOverFlowRecvFrom(self): 2462 self.serv_conn.send(MSG) 2463 2464 def testSendAll(self): 2465 # Testing sendall() with a 2048 byte string over TCP 2466 msg = b'' 2467 while 1: 2468 read = self.cli_conn.recv(1024) 2469 if not read: 2470 break 2471 msg += read 2472 self.assertEqual(msg, b'f' * 2048) 2473 2474 def _testSendAll(self): 2475 big_chunk = b'f' * 2048 2476 self.serv_conn.sendall(big_chunk) 2477 2478 def testFromFd(self): 2479 # Testing fromfd() 2480 fd = self.cli_conn.fileno() 2481 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 2482 self.addCleanup(sock.close) 2483 self.assertIsInstance(sock, socket.socket) 2484 msg = sock.recv(1024) 2485 self.assertEqual(msg, MSG) 2486 2487 def _testFromFd(self): 2488 self.serv_conn.send(MSG) 2489 2490 def testDup(self): 2491 # Testing dup() 2492 sock = self.cli_conn.dup() 2493 self.addCleanup(sock.close) 2494 msg = sock.recv(1024) 2495 self.assertEqual(msg, MSG) 2496 2497 def _testDup(self): 2498 self.serv_conn.send(MSG) 2499 2500 def testShutdown(self): 2501 # Testing shutdown() 2502 msg = self.cli_conn.recv(1024) 2503 self.assertEqual(msg, MSG) 2504 # wait for _testShutdown to finish: on OS X, when the server 2505 # closes the connection the client also becomes disconnected, 2506 # and the client's shutdown call will fail. (Issue #4397.) 2507 self.done.wait() 2508 2509 def _testShutdown(self): 2510 self.serv_conn.send(MSG) 2511 self.serv_conn.shutdown(2) 2512 2513 testShutdown_overflow = support.cpython_only(testShutdown) 2514 2515 @support.cpython_only 2516 def _testShutdown_overflow(self): 2517 import _testcapi 2518 self.serv_conn.send(MSG) 2519 # Issue 15989 2520 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2521 _testcapi.INT_MAX + 1) 2522 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2523 2 + (_testcapi.UINT_MAX + 1)) 2524 self.serv_conn.shutdown(2) 2525 2526 def testDetach(self): 2527 # Testing detach() 2528 fileno = self.cli_conn.fileno() 2529 f = self.cli_conn.detach() 2530 self.assertEqual(f, fileno) 2531 # cli_conn cannot be used anymore... 2532 self.assertTrue(self.cli_conn._closed) 2533 self.assertRaises(OSError, self.cli_conn.recv, 1024) 2534 self.cli_conn.close() 2535 # ...but we can create another socket using the (still open) 2536 # file descriptor 2537 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) 2538 self.addCleanup(sock.close) 2539 msg = sock.recv(1024) 2540 self.assertEqual(msg, MSG) 2541 2542 def _testDetach(self): 2543 self.serv_conn.send(MSG) 2544 2545 2546class BasicUDPTest(ThreadedUDPSocketTest): 2547 2548 def __init__(self, methodName='runTest'): 2549 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 2550 2551 def testSendtoAndRecv(self): 2552 # Testing sendto() and Recv() over UDP 2553 msg = self.serv.recv(len(MSG)) 2554 self.assertEqual(msg, MSG) 2555 2556 def _testSendtoAndRecv(self): 2557 self.cli.sendto(MSG, 0, (HOST, self.port)) 2558 2559 def testRecvFrom(self): 2560 # Testing recvfrom() over UDP 2561 msg, addr = self.serv.recvfrom(len(MSG)) 2562 self.assertEqual(msg, MSG) 2563 2564 def _testRecvFrom(self): 2565 self.cli.sendto(MSG, 0, (HOST, self.port)) 2566 2567 def testRecvFromNegative(self): 2568 # Negative lengths passed to recvfrom should give ValueError. 2569 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2570 2571 def _testRecvFromNegative(self): 2572 self.cli.sendto(MSG, 0, (HOST, self.port)) 2573 2574 2575@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 2576 'UDPLITE sockets required for this test.') 2577class BasicUDPLITETest(ThreadedUDPLITESocketTest): 2578 2579 def __init__(self, methodName='runTest'): 2580 ThreadedUDPLITESocketTest.__init__(self, methodName=methodName) 2581 2582 def testSendtoAndRecv(self): 2583 # Testing sendto() and Recv() over UDPLITE 2584 msg = self.serv.recv(len(MSG)) 2585 self.assertEqual(msg, MSG) 2586 2587 def _testSendtoAndRecv(self): 2588 self.cli.sendto(MSG, 0, (HOST, self.port)) 2589 2590 def testRecvFrom(self): 2591 # Testing recvfrom() over UDPLITE 2592 msg, addr = self.serv.recvfrom(len(MSG)) 2593 self.assertEqual(msg, MSG) 2594 2595 def _testRecvFrom(self): 2596 self.cli.sendto(MSG, 0, (HOST, self.port)) 2597 2598 def testRecvFromNegative(self): 2599 # Negative lengths passed to recvfrom should give ValueError. 2600 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2601 2602 def _testRecvFromNegative(self): 2603 self.cli.sendto(MSG, 0, (HOST, self.port)) 2604 2605# Tests for the sendmsg()/recvmsg() interface. Where possible, the 2606# same test code is used with different families and types of socket 2607# (e.g. stream, datagram), and tests using recvmsg() are repeated 2608# using recvmsg_into(). 2609# 2610# The generic test classes such as SendmsgTests and 2611# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be 2612# supplied with sockets cli_sock and serv_sock representing the 2613# client's and the server's end of the connection respectively, and 2614# attributes cli_addr and serv_addr holding their (numeric where 2615# appropriate) addresses. 2616# 2617# The final concrete test classes combine these with subclasses of 2618# SocketTestBase which set up client and server sockets of a specific 2619# type, and with subclasses of SendrecvmsgBase such as 2620# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these 2621# sockets to cli_sock and serv_sock and override the methods and 2622# attributes of SendrecvmsgBase to fill in destination addresses if 2623# needed when sending, check for specific flags in msg_flags, etc. 2624# 2625# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using 2626# recvmsg_into(). 2627 2628# XXX: like the other datagram (UDP) tests in this module, the code 2629# here assumes that datagram delivery on the local machine will be 2630# reliable. 2631 2632class SendrecvmsgBase(ThreadSafeCleanupTestCase): 2633 # Base class for sendmsg()/recvmsg() tests. 2634 2635 # Time in seconds to wait before considering a test failed, or 2636 # None for no timeout. Not all tests actually set a timeout. 2637 fail_timeout = support.LOOPBACK_TIMEOUT 2638 2639 def setUp(self): 2640 self.misc_event = threading.Event() 2641 super().setUp() 2642 2643 def sendToServer(self, msg): 2644 # Send msg to the server. 2645 return self.cli_sock.send(msg) 2646 2647 # Tuple of alternative default arguments for sendmsg() when called 2648 # via sendmsgToServer() (e.g. to include a destination address). 2649 sendmsg_to_server_defaults = () 2650 2651 def sendmsgToServer(self, *args): 2652 # Call sendmsg() on self.cli_sock with the given arguments, 2653 # filling in any arguments which are not supplied with the 2654 # corresponding items of self.sendmsg_to_server_defaults, if 2655 # any. 2656 return self.cli_sock.sendmsg( 2657 *(args + self.sendmsg_to_server_defaults[len(args):])) 2658 2659 def doRecvmsg(self, sock, bufsize, *args): 2660 # Call recvmsg() on sock with given arguments and return its 2661 # result. Should be used for tests which can use either 2662 # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides 2663 # this method with one which emulates it using recvmsg_into(), 2664 # thus allowing the same test to be used for both methods. 2665 result = sock.recvmsg(bufsize, *args) 2666 self.registerRecvmsgResult(result) 2667 return result 2668 2669 def registerRecvmsgResult(self, result): 2670 # Called by doRecvmsg() with the return value of recvmsg() or 2671 # recvmsg_into(). Can be overridden to arrange cleanup based 2672 # on the returned ancillary data, for instance. 2673 pass 2674 2675 def checkRecvmsgAddress(self, addr1, addr2): 2676 # Called to compare the received address with the address of 2677 # the peer. 2678 self.assertEqual(addr1, addr2) 2679 2680 # Flags that are normally unset in msg_flags 2681 msg_flags_common_unset = 0 2682 for name in ("MSG_CTRUNC", "MSG_OOB"): 2683 msg_flags_common_unset |= getattr(socket, name, 0) 2684 2685 # Flags that are normally set 2686 msg_flags_common_set = 0 2687 2688 # Flags set when a complete record has been received (e.g. MSG_EOR 2689 # for SCTP) 2690 msg_flags_eor_indicator = 0 2691 2692 # Flags set when a complete record has not been received 2693 # (e.g. MSG_TRUNC for datagram sockets) 2694 msg_flags_non_eor_indicator = 0 2695 2696 def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): 2697 # Method to check the value of msg_flags returned by recvmsg[_into](). 2698 # 2699 # Checks that all bits in msg_flags_common_set attribute are 2700 # set in "flags" and all bits in msg_flags_common_unset are 2701 # unset. 2702 # 2703 # The "eor" argument specifies whether the flags should 2704 # indicate that a full record (or datagram) has been received. 2705 # If "eor" is None, no checks are done; otherwise, checks 2706 # that: 2707 # 2708 # * if "eor" is true, all bits in msg_flags_eor_indicator are 2709 # set and all bits in msg_flags_non_eor_indicator are unset 2710 # 2711 # * if "eor" is false, all bits in msg_flags_non_eor_indicator 2712 # are set and all bits in msg_flags_eor_indicator are unset 2713 # 2714 # If "checkset" and/or "checkunset" are supplied, they require 2715 # the given bits to be set or unset respectively, overriding 2716 # what the attributes require for those bits. 2717 # 2718 # If any bits are set in "ignore", they will not be checked, 2719 # regardless of the other inputs. 2720 # 2721 # Will raise Exception if the inputs require a bit to be both 2722 # set and unset, and it is not ignored. 2723 2724 defaultset = self.msg_flags_common_set 2725 defaultunset = self.msg_flags_common_unset 2726 2727 if eor: 2728 defaultset |= self.msg_flags_eor_indicator 2729 defaultunset |= self.msg_flags_non_eor_indicator 2730 elif eor is not None: 2731 defaultset |= self.msg_flags_non_eor_indicator 2732 defaultunset |= self.msg_flags_eor_indicator 2733 2734 # Function arguments override defaults 2735 defaultset &= ~checkunset 2736 defaultunset &= ~checkset 2737 2738 # Merge arguments with remaining defaults, and check for conflicts 2739 checkset |= defaultset 2740 checkunset |= defaultunset 2741 inboth = checkset & checkunset & ~ignore 2742 if inboth: 2743 raise Exception("contradictory set, unset requirements for flags " 2744 "{0:#x}".format(inboth)) 2745 2746 # Compare with given msg_flags value 2747 mask = (checkset | checkunset) & ~ignore 2748 self.assertEqual(flags & mask, checkset & mask) 2749 2750 2751class RecvmsgIntoMixin(SendrecvmsgBase): 2752 # Mixin to implement doRecvmsg() using recvmsg_into(). 2753 2754 def doRecvmsg(self, sock, bufsize, *args): 2755 buf = bytearray(bufsize) 2756 result = sock.recvmsg_into([buf], *args) 2757 self.registerRecvmsgResult(result) 2758 self.assertGreaterEqual(result[0], 0) 2759 self.assertLessEqual(result[0], bufsize) 2760 return (bytes(buf[:result[0]]),) + result[1:] 2761 2762 2763class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): 2764 # Defines flags to be checked in msg_flags for datagram sockets. 2765 2766 @property 2767 def msg_flags_non_eor_indicator(self): 2768 return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC 2769 2770 2771class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): 2772 # Defines flags to be checked in msg_flags for SCTP sockets. 2773 2774 @property 2775 def msg_flags_eor_indicator(self): 2776 return super().msg_flags_eor_indicator | socket.MSG_EOR 2777 2778 2779class SendrecvmsgConnectionlessBase(SendrecvmsgBase): 2780 # Base class for tests on connectionless-mode sockets. Users must 2781 # supply sockets on attributes cli and serv to be mapped to 2782 # cli_sock and serv_sock respectively. 2783 2784 @property 2785 def serv_sock(self): 2786 return self.serv 2787 2788 @property 2789 def cli_sock(self): 2790 return self.cli 2791 2792 @property 2793 def sendmsg_to_server_defaults(self): 2794 return ([], [], 0, self.serv_addr) 2795 2796 def sendToServer(self, msg): 2797 return self.cli_sock.sendto(msg, self.serv_addr) 2798 2799 2800class SendrecvmsgConnectedBase(SendrecvmsgBase): 2801 # Base class for tests on connected sockets. Users must supply 2802 # sockets on attributes serv_conn and cli_conn (representing the 2803 # connections *to* the server and the client), to be mapped to 2804 # cli_sock and serv_sock respectively. 2805 2806 @property 2807 def serv_sock(self): 2808 return self.cli_conn 2809 2810 @property 2811 def cli_sock(self): 2812 return self.serv_conn 2813 2814 def checkRecvmsgAddress(self, addr1, addr2): 2815 # Address is currently "unspecified" for a connected socket, 2816 # so we don't examine it 2817 pass 2818 2819 2820class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): 2821 # Base class to set a timeout on server's socket. 2822 2823 def setUp(self): 2824 super().setUp() 2825 self.serv_sock.settimeout(self.fail_timeout) 2826 2827 2828class SendmsgTests(SendrecvmsgServerTimeoutBase): 2829 # Tests for sendmsg() which can use any socket type and do not 2830 # involve recvmsg() or recvmsg_into(). 2831 2832 def testSendmsg(self): 2833 # Send a simple message with sendmsg(). 2834 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2835 2836 def _testSendmsg(self): 2837 self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) 2838 2839 def testSendmsgDataGenerator(self): 2840 # Send from buffer obtained from a generator (not a sequence). 2841 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2842 2843 def _testSendmsgDataGenerator(self): 2844 self.assertEqual(self.sendmsgToServer((o for o in [MSG])), 2845 len(MSG)) 2846 2847 def testSendmsgAncillaryGenerator(self): 2848 # Gather (empty) ancillary data from a generator. 2849 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2850 2851 def _testSendmsgAncillaryGenerator(self): 2852 self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), 2853 len(MSG)) 2854 2855 def testSendmsgArray(self): 2856 # Send data from an array instead of the usual bytes object. 2857 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2858 2859 def _testSendmsgArray(self): 2860 self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), 2861 len(MSG)) 2862 2863 def testSendmsgGather(self): 2864 # Send message data from more than one buffer (gather write). 2865 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2866 2867 def _testSendmsgGather(self): 2868 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2869 2870 def testSendmsgBadArgs(self): 2871 # Check that sendmsg() rejects invalid arguments. 2872 self.assertEqual(self.serv_sock.recv(1000), b"done") 2873 2874 def _testSendmsgBadArgs(self): 2875 self.assertRaises(TypeError, self.cli_sock.sendmsg) 2876 self.assertRaises(TypeError, self.sendmsgToServer, 2877 b"not in an iterable") 2878 self.assertRaises(TypeError, self.sendmsgToServer, 2879 object()) 2880 self.assertRaises(TypeError, self.sendmsgToServer, 2881 [object()]) 2882 self.assertRaises(TypeError, self.sendmsgToServer, 2883 [MSG, object()]) 2884 self.assertRaises(TypeError, self.sendmsgToServer, 2885 [MSG], object()) 2886 self.assertRaises(TypeError, self.sendmsgToServer, 2887 [MSG], [], object()) 2888 self.assertRaises(TypeError, self.sendmsgToServer, 2889 [MSG], [], 0, object()) 2890 self.sendToServer(b"done") 2891 2892 def testSendmsgBadCmsg(self): 2893 # Check that invalid ancillary data items are rejected. 2894 self.assertEqual(self.serv_sock.recv(1000), b"done") 2895 2896 def _testSendmsgBadCmsg(self): 2897 self.assertRaises(TypeError, self.sendmsgToServer, 2898 [MSG], [object()]) 2899 self.assertRaises(TypeError, self.sendmsgToServer, 2900 [MSG], [(object(), 0, b"data")]) 2901 self.assertRaises(TypeError, self.sendmsgToServer, 2902 [MSG], [(0, object(), b"data")]) 2903 self.assertRaises(TypeError, self.sendmsgToServer, 2904 [MSG], [(0, 0, object())]) 2905 self.assertRaises(TypeError, self.sendmsgToServer, 2906 [MSG], [(0, 0)]) 2907 self.assertRaises(TypeError, self.sendmsgToServer, 2908 [MSG], [(0, 0, b"data", 42)]) 2909 self.sendToServer(b"done") 2910 2911 @requireAttrs(socket, "CMSG_SPACE") 2912 def testSendmsgBadMultiCmsg(self): 2913 # Check that invalid ancillary data items are rejected when 2914 # more than one item is present. 2915 self.assertEqual(self.serv_sock.recv(1000), b"done") 2916 2917 @testSendmsgBadMultiCmsg.client_skip 2918 def _testSendmsgBadMultiCmsg(self): 2919 self.assertRaises(TypeError, self.sendmsgToServer, 2920 [MSG], [0, 0, b""]) 2921 self.assertRaises(TypeError, self.sendmsgToServer, 2922 [MSG], [(0, 0, b""), object()]) 2923 self.sendToServer(b"done") 2924 2925 def testSendmsgExcessCmsgReject(self): 2926 # Check that sendmsg() rejects excess ancillary data items 2927 # when the number that can be sent is limited. 2928 self.assertEqual(self.serv_sock.recv(1000), b"done") 2929 2930 def _testSendmsgExcessCmsgReject(self): 2931 if not hasattr(socket, "CMSG_SPACE"): 2932 # Can only send one item 2933 with self.assertRaises(OSError) as cm: 2934 self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) 2935 self.assertIsNone(cm.exception.errno) 2936 self.sendToServer(b"done") 2937 2938 def testSendmsgAfterClose(self): 2939 # Check that sendmsg() fails on a closed socket. 2940 pass 2941 2942 def _testSendmsgAfterClose(self): 2943 self.cli_sock.close() 2944 self.assertRaises(OSError, self.sendmsgToServer, [MSG]) 2945 2946 2947class SendmsgStreamTests(SendmsgTests): 2948 # Tests for sendmsg() which require a stream socket and do not 2949 # involve recvmsg() or recvmsg_into(). 2950 2951 def testSendmsgExplicitNoneAddr(self): 2952 # Check that peer address can be specified as None. 2953 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2954 2955 def _testSendmsgExplicitNoneAddr(self): 2956 self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) 2957 2958 def testSendmsgTimeout(self): 2959 # Check that timeout works with sendmsg(). 2960 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2961 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2962 2963 def _testSendmsgTimeout(self): 2964 try: 2965 self.cli_sock.settimeout(0.03) 2966 try: 2967 while True: 2968 self.sendmsgToServer([b"a"*512]) 2969 except socket.timeout: 2970 pass 2971 except OSError as exc: 2972 if exc.errno != errno.ENOMEM: 2973 raise 2974 # bpo-33937 the test randomly fails on Travis CI with 2975 # "OSError: [Errno 12] Cannot allocate memory" 2976 else: 2977 self.fail("socket.timeout not raised") 2978 finally: 2979 self.misc_event.set() 2980 2981 # XXX: would be nice to have more tests for sendmsg flags argument. 2982 2983 # Linux supports MSG_DONTWAIT when sending, but in general, it 2984 # only works when receiving. Could add other platforms if they 2985 # support it too. 2986 @skipWithClientIf(sys.platform not in {"linux"}, 2987 "MSG_DONTWAIT not known to work on this platform when " 2988 "sending") 2989 def testSendmsgDontWait(self): 2990 # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. 2991 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2992 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2993 2994 @testSendmsgDontWait.client_skip 2995 def _testSendmsgDontWait(self): 2996 try: 2997 with self.assertRaises(OSError) as cm: 2998 while True: 2999 self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) 3000 # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI 3001 # with "OSError: [Errno 12] Cannot allocate memory" 3002 self.assertIn(cm.exception.errno, 3003 (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM)) 3004 finally: 3005 self.misc_event.set() 3006 3007 3008class SendmsgConnectionlessTests(SendmsgTests): 3009 # Tests for sendmsg() which require a connectionless-mode 3010 # (e.g. datagram) socket, and do not involve recvmsg() or 3011 # recvmsg_into(). 3012 3013 def testSendmsgNoDestAddr(self): 3014 # Check that sendmsg() fails when no destination address is 3015 # given for unconnected socket. 3016 pass 3017 3018 def _testSendmsgNoDestAddr(self): 3019 self.assertRaises(OSError, self.cli_sock.sendmsg, 3020 [MSG]) 3021 self.assertRaises(OSError, self.cli_sock.sendmsg, 3022 [MSG], [], 0, None) 3023 3024 3025class RecvmsgGenericTests(SendrecvmsgBase): 3026 # Tests for recvmsg() which can also be emulated using 3027 # recvmsg_into(), and can use any socket type. 3028 3029 def testRecvmsg(self): 3030 # Receive a simple message with recvmsg[_into](). 3031 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3032 self.assertEqual(msg, MSG) 3033 self.checkRecvmsgAddress(addr, self.cli_addr) 3034 self.assertEqual(ancdata, []) 3035 self.checkFlags(flags, eor=True) 3036 3037 def _testRecvmsg(self): 3038 self.sendToServer(MSG) 3039 3040 def testRecvmsgExplicitDefaults(self): 3041 # Test recvmsg[_into]() with default arguments provided explicitly. 3042 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3043 len(MSG), 0, 0) 3044 self.assertEqual(msg, MSG) 3045 self.checkRecvmsgAddress(addr, self.cli_addr) 3046 self.assertEqual(ancdata, []) 3047 self.checkFlags(flags, eor=True) 3048 3049 def _testRecvmsgExplicitDefaults(self): 3050 self.sendToServer(MSG) 3051 3052 def testRecvmsgShorter(self): 3053 # Receive a message smaller than buffer. 3054 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3055 len(MSG) + 42) 3056 self.assertEqual(msg, MSG) 3057 self.checkRecvmsgAddress(addr, self.cli_addr) 3058 self.assertEqual(ancdata, []) 3059 self.checkFlags(flags, eor=True) 3060 3061 def _testRecvmsgShorter(self): 3062 self.sendToServer(MSG) 3063 3064 def testRecvmsgTrunc(self): 3065 # Receive part of message, check for truncation indicators. 3066 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3067 len(MSG) - 3) 3068 self.assertEqual(msg, MSG[:-3]) 3069 self.checkRecvmsgAddress(addr, self.cli_addr) 3070 self.assertEqual(ancdata, []) 3071 self.checkFlags(flags, eor=False) 3072 3073 def _testRecvmsgTrunc(self): 3074 self.sendToServer(MSG) 3075 3076 def testRecvmsgShortAncillaryBuf(self): 3077 # Test ancillary data buffer too small to hold any ancillary data. 3078 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3079 len(MSG), 1) 3080 self.assertEqual(msg, MSG) 3081 self.checkRecvmsgAddress(addr, self.cli_addr) 3082 self.assertEqual(ancdata, []) 3083 self.checkFlags(flags, eor=True) 3084 3085 def _testRecvmsgShortAncillaryBuf(self): 3086 self.sendToServer(MSG) 3087 3088 def testRecvmsgLongAncillaryBuf(self): 3089 # Test large ancillary data buffer. 3090 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3091 len(MSG), 10240) 3092 self.assertEqual(msg, MSG) 3093 self.checkRecvmsgAddress(addr, self.cli_addr) 3094 self.assertEqual(ancdata, []) 3095 self.checkFlags(flags, eor=True) 3096 3097 def _testRecvmsgLongAncillaryBuf(self): 3098 self.sendToServer(MSG) 3099 3100 def testRecvmsgAfterClose(self): 3101 # Check that recvmsg[_into]() fails on a closed socket. 3102 self.serv_sock.close() 3103 self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) 3104 3105 def _testRecvmsgAfterClose(self): 3106 pass 3107 3108 def testRecvmsgTimeout(self): 3109 # Check that timeout works. 3110 try: 3111 self.serv_sock.settimeout(0.03) 3112 self.assertRaises(socket.timeout, 3113 self.doRecvmsg, self.serv_sock, len(MSG)) 3114 finally: 3115 self.misc_event.set() 3116 3117 def _testRecvmsgTimeout(self): 3118 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3119 3120 @requireAttrs(socket, "MSG_PEEK") 3121 def testRecvmsgPeek(self): 3122 # Check that MSG_PEEK in flags enables examination of pending 3123 # data without consuming it. 3124 3125 # Receive part of data with MSG_PEEK. 3126 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3127 len(MSG) - 3, 0, 3128 socket.MSG_PEEK) 3129 self.assertEqual(msg, MSG[:-3]) 3130 self.checkRecvmsgAddress(addr, self.cli_addr) 3131 self.assertEqual(ancdata, []) 3132 # Ignoring MSG_TRUNC here (so this test is the same for stream 3133 # and datagram sockets). Some wording in POSIX seems to 3134 # suggest that it needn't be set when peeking, but that may 3135 # just be a slip. 3136 self.checkFlags(flags, eor=False, 3137 ignore=getattr(socket, "MSG_TRUNC", 0)) 3138 3139 # Receive all data with MSG_PEEK. 3140 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3141 len(MSG), 0, 3142 socket.MSG_PEEK) 3143 self.assertEqual(msg, MSG) 3144 self.checkRecvmsgAddress(addr, self.cli_addr) 3145 self.assertEqual(ancdata, []) 3146 self.checkFlags(flags, eor=True) 3147 3148 # Check that the same data can still be received normally. 3149 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3150 self.assertEqual(msg, MSG) 3151 self.checkRecvmsgAddress(addr, self.cli_addr) 3152 self.assertEqual(ancdata, []) 3153 self.checkFlags(flags, eor=True) 3154 3155 @testRecvmsgPeek.client_skip 3156 def _testRecvmsgPeek(self): 3157 self.sendToServer(MSG) 3158 3159 @requireAttrs(socket.socket, "sendmsg") 3160 def testRecvmsgFromSendmsg(self): 3161 # Test receiving with recvmsg[_into]() when message is sent 3162 # using sendmsg(). 3163 self.serv_sock.settimeout(self.fail_timeout) 3164 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3165 self.assertEqual(msg, MSG) 3166 self.checkRecvmsgAddress(addr, self.cli_addr) 3167 self.assertEqual(ancdata, []) 3168 self.checkFlags(flags, eor=True) 3169 3170 @testRecvmsgFromSendmsg.client_skip 3171 def _testRecvmsgFromSendmsg(self): 3172 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 3173 3174 3175class RecvmsgGenericStreamTests(RecvmsgGenericTests): 3176 # Tests which require a stream socket and can use either recvmsg() 3177 # or recvmsg_into(). 3178 3179 def testRecvmsgEOF(self): 3180 # Receive end-of-stream indicator (b"", peer socket closed). 3181 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3182 self.assertEqual(msg, b"") 3183 self.checkRecvmsgAddress(addr, self.cli_addr) 3184 self.assertEqual(ancdata, []) 3185 self.checkFlags(flags, eor=None) # Might not have end-of-record marker 3186 3187 def _testRecvmsgEOF(self): 3188 self.cli_sock.close() 3189 3190 def testRecvmsgOverflow(self): 3191 # Receive a message in more than one chunk. 3192 seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3193 len(MSG) - 3) 3194 self.checkRecvmsgAddress(addr, self.cli_addr) 3195 self.assertEqual(ancdata, []) 3196 self.checkFlags(flags, eor=False) 3197 3198 seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3199 self.checkRecvmsgAddress(addr, self.cli_addr) 3200 self.assertEqual(ancdata, []) 3201 self.checkFlags(flags, eor=True) 3202 3203 msg = seg1 + seg2 3204 self.assertEqual(msg, MSG) 3205 3206 def _testRecvmsgOverflow(self): 3207 self.sendToServer(MSG) 3208 3209 3210class RecvmsgTests(RecvmsgGenericTests): 3211 # Tests for recvmsg() which can use any socket type. 3212 3213 def testRecvmsgBadArgs(self): 3214 # Check that recvmsg() rejects invalid arguments. 3215 self.assertRaises(TypeError, self.serv_sock.recvmsg) 3216 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3217 -1, 0, 0) 3218 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3219 len(MSG), -1, 0) 3220 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3221 [bytearray(10)], 0, 0) 3222 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3223 object(), 0, 0) 3224 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3225 len(MSG), object(), 0) 3226 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3227 len(MSG), 0, object()) 3228 3229 msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) 3230 self.assertEqual(msg, MSG) 3231 self.checkRecvmsgAddress(addr, self.cli_addr) 3232 self.assertEqual(ancdata, []) 3233 self.checkFlags(flags, eor=True) 3234 3235 def _testRecvmsgBadArgs(self): 3236 self.sendToServer(MSG) 3237 3238 3239class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): 3240 # Tests for recvmsg_into() which can use any socket type. 3241 3242 def testRecvmsgIntoBadArgs(self): 3243 # Check that recvmsg_into() rejects invalid arguments. 3244 buf = bytearray(len(MSG)) 3245 self.assertRaises(TypeError, self.serv_sock.recvmsg_into) 3246 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3247 len(MSG), 0, 0) 3248 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3249 buf, 0, 0) 3250 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3251 [object()], 0, 0) 3252 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3253 [b"I'm not writable"], 0, 0) 3254 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3255 [buf, object()], 0, 0) 3256 self.assertRaises(ValueError, self.serv_sock.recvmsg_into, 3257 [buf], -1, 0) 3258 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3259 [buf], object(), 0) 3260 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3261 [buf], 0, object()) 3262 3263 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) 3264 self.assertEqual(nbytes, len(MSG)) 3265 self.assertEqual(buf, bytearray(MSG)) 3266 self.checkRecvmsgAddress(addr, self.cli_addr) 3267 self.assertEqual(ancdata, []) 3268 self.checkFlags(flags, eor=True) 3269 3270 def _testRecvmsgIntoBadArgs(self): 3271 self.sendToServer(MSG) 3272 3273 def testRecvmsgIntoGenerator(self): 3274 # Receive into buffer obtained from a generator (not a sequence). 3275 buf = bytearray(len(MSG)) 3276 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3277 (o for o in [buf])) 3278 self.assertEqual(nbytes, len(MSG)) 3279 self.assertEqual(buf, bytearray(MSG)) 3280 self.checkRecvmsgAddress(addr, self.cli_addr) 3281 self.assertEqual(ancdata, []) 3282 self.checkFlags(flags, eor=True) 3283 3284 def _testRecvmsgIntoGenerator(self): 3285 self.sendToServer(MSG) 3286 3287 def testRecvmsgIntoArray(self): 3288 # Receive into an array rather than the usual bytearray. 3289 buf = array.array("B", [0] * len(MSG)) 3290 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) 3291 self.assertEqual(nbytes, len(MSG)) 3292 self.assertEqual(buf.tobytes(), MSG) 3293 self.checkRecvmsgAddress(addr, self.cli_addr) 3294 self.assertEqual(ancdata, []) 3295 self.checkFlags(flags, eor=True) 3296 3297 def _testRecvmsgIntoArray(self): 3298 self.sendToServer(MSG) 3299 3300 def testRecvmsgIntoScatter(self): 3301 # Receive into multiple buffers (scatter write). 3302 b1 = bytearray(b"----") 3303 b2 = bytearray(b"0123456789") 3304 b3 = bytearray(b"--------------") 3305 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3306 [b1, memoryview(b2)[2:9], b3]) 3307 self.assertEqual(nbytes, len(b"Mary had a little lamb")) 3308 self.assertEqual(b1, bytearray(b"Mary")) 3309 self.assertEqual(b2, bytearray(b"01 had a 9")) 3310 self.assertEqual(b3, bytearray(b"little lamb---")) 3311 self.checkRecvmsgAddress(addr, self.cli_addr) 3312 self.assertEqual(ancdata, []) 3313 self.checkFlags(flags, eor=True) 3314 3315 def _testRecvmsgIntoScatter(self): 3316 self.sendToServer(b"Mary had a little lamb") 3317 3318 3319class CmsgMacroTests(unittest.TestCase): 3320 # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests 3321 # assumptions used by sendmsg() and recvmsg[_into](), which share 3322 # code with these functions. 3323 3324 # Match the definition in socketmodule.c 3325 try: 3326 import _testcapi 3327 except ImportError: 3328 socklen_t_limit = 0x7fffffff 3329 else: 3330 socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) 3331 3332 @requireAttrs(socket, "CMSG_LEN") 3333 def testCMSG_LEN(self): 3334 # Test CMSG_LEN() with various valid and invalid values, 3335 # checking the assumptions used by recvmsg() and sendmsg(). 3336 toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 3337 values = list(range(257)) + list(range(toobig - 257, toobig)) 3338 3339 # struct cmsghdr has at least three members, two of which are ints 3340 self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) 3341 for n in values: 3342 ret = socket.CMSG_LEN(n) 3343 # This is how recvmsg() calculates the data size 3344 self.assertEqual(ret - socket.CMSG_LEN(0), n) 3345 self.assertLessEqual(ret, self.socklen_t_limit) 3346 3347 self.assertRaises(OverflowError, socket.CMSG_LEN, -1) 3348 # sendmsg() shares code with these functions, and requires 3349 # that it reject values over the limit. 3350 self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) 3351 self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) 3352 3353 @requireAttrs(socket, "CMSG_SPACE") 3354 def testCMSG_SPACE(self): 3355 # Test CMSG_SPACE() with various valid and invalid values, 3356 # checking the assumptions used by sendmsg(). 3357 toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 3358 values = list(range(257)) + list(range(toobig - 257, toobig)) 3359 3360 last = socket.CMSG_SPACE(0) 3361 # struct cmsghdr has at least three members, two of which are ints 3362 self.assertGreater(last, array.array("i").itemsize * 2) 3363 for n in values: 3364 ret = socket.CMSG_SPACE(n) 3365 self.assertGreaterEqual(ret, last) 3366 self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) 3367 self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) 3368 self.assertLessEqual(ret, self.socklen_t_limit) 3369 last = ret 3370 3371 self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) 3372 # sendmsg() shares code with these functions, and requires 3373 # that it reject values over the limit. 3374 self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) 3375 self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) 3376 3377 3378class SCMRightsTest(SendrecvmsgServerTimeoutBase): 3379 # Tests for file descriptor passing on Unix-domain sockets. 3380 3381 # Invalid file descriptor value that's unlikely to evaluate to a 3382 # real FD even if one of its bytes is replaced with a different 3383 # value (which shouldn't actually happen). 3384 badfd = -0x5555 3385 3386 def newFDs(self, n): 3387 # Return a list of n file descriptors for newly-created files 3388 # containing their list indices as ASCII numbers. 3389 fds = [] 3390 for i in range(n): 3391 fd, path = tempfile.mkstemp() 3392 self.addCleanup(os.unlink, path) 3393 self.addCleanup(os.close, fd) 3394 os.write(fd, str(i).encode()) 3395 fds.append(fd) 3396 return fds 3397 3398 def checkFDs(self, fds): 3399 # Check that the file descriptors in the given list contain 3400 # their correct list indices as ASCII numbers. 3401 for n, fd in enumerate(fds): 3402 os.lseek(fd, 0, os.SEEK_SET) 3403 self.assertEqual(os.read(fd, 1024), str(n).encode()) 3404 3405 def registerRecvmsgResult(self, result): 3406 self.addCleanup(self.closeRecvmsgFDs, result) 3407 3408 def closeRecvmsgFDs(self, recvmsg_result): 3409 # Close all file descriptors specified in the ancillary data 3410 # of the given return value from recvmsg() or recvmsg_into(). 3411 for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: 3412 if (cmsg_level == socket.SOL_SOCKET and 3413 cmsg_type == socket.SCM_RIGHTS): 3414 fds = array.array("i") 3415 fds.frombytes(cmsg_data[: 3416 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3417 for fd in fds: 3418 os.close(fd) 3419 3420 def createAndSendFDs(self, n): 3421 # Send n new file descriptors created by newFDs() to the 3422 # server, with the constant MSG as the non-ancillary data. 3423 self.assertEqual( 3424 self.sendmsgToServer([MSG], 3425 [(socket.SOL_SOCKET, 3426 socket.SCM_RIGHTS, 3427 array.array("i", self.newFDs(n)))]), 3428 len(MSG)) 3429 3430 def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): 3431 # Check that constant MSG was received with numfds file 3432 # descriptors in a maximum of maxcmsgs control messages (which 3433 # must contain only complete integers). By default, check 3434 # that MSG_CTRUNC is unset, but ignore any flags in 3435 # ignoreflags. 3436 msg, ancdata, flags, addr = result 3437 self.assertEqual(msg, MSG) 3438 self.checkRecvmsgAddress(addr, self.cli_addr) 3439 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3440 ignore=ignoreflags) 3441 3442 self.assertIsInstance(ancdata, list) 3443 self.assertLessEqual(len(ancdata), maxcmsgs) 3444 fds = array.array("i") 3445 for item in ancdata: 3446 self.assertIsInstance(item, tuple) 3447 cmsg_level, cmsg_type, cmsg_data = item 3448 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3449 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3450 self.assertIsInstance(cmsg_data, bytes) 3451 self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) 3452 fds.frombytes(cmsg_data) 3453 3454 self.assertEqual(len(fds), numfds) 3455 self.checkFDs(fds) 3456 3457 def testFDPassSimple(self): 3458 # Pass a single FD (array read from bytes object). 3459 self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, 3460 len(MSG), 10240)) 3461 3462 def _testFDPassSimple(self): 3463 self.assertEqual( 3464 self.sendmsgToServer( 3465 [MSG], 3466 [(socket.SOL_SOCKET, 3467 socket.SCM_RIGHTS, 3468 array.array("i", self.newFDs(1)).tobytes())]), 3469 len(MSG)) 3470 3471 def testMultipleFDPass(self): 3472 # Pass multiple FDs in a single array. 3473 self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, 3474 len(MSG), 10240)) 3475 3476 def _testMultipleFDPass(self): 3477 self.createAndSendFDs(4) 3478 3479 @requireAttrs(socket, "CMSG_SPACE") 3480 def testFDPassCMSG_SPACE(self): 3481 # Test using CMSG_SPACE() to calculate ancillary buffer size. 3482 self.checkRecvmsgFDs( 3483 4, self.doRecvmsg(self.serv_sock, len(MSG), 3484 socket.CMSG_SPACE(4 * SIZEOF_INT))) 3485 3486 @testFDPassCMSG_SPACE.client_skip 3487 def _testFDPassCMSG_SPACE(self): 3488 self.createAndSendFDs(4) 3489 3490 def testFDPassCMSG_LEN(self): 3491 # Test using CMSG_LEN() to calculate ancillary buffer size. 3492 self.checkRecvmsgFDs(1, 3493 self.doRecvmsg(self.serv_sock, len(MSG), 3494 socket.CMSG_LEN(4 * SIZEOF_INT)), 3495 # RFC 3542 says implementations may set 3496 # MSG_CTRUNC if there isn't enough space 3497 # for trailing padding. 3498 ignoreflags=socket.MSG_CTRUNC) 3499 3500 def _testFDPassCMSG_LEN(self): 3501 self.createAndSendFDs(1) 3502 3503 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3504 @unittest.skipIf(AIX, "skipping, see issue #22397") 3505 @requireAttrs(socket, "CMSG_SPACE") 3506 def testFDPassSeparate(self): 3507 # Pass two FDs in two separate arrays. Arrays may be combined 3508 # into a single control message by the OS. 3509 self.checkRecvmsgFDs(2, 3510 self.doRecvmsg(self.serv_sock, len(MSG), 10240), 3511 maxcmsgs=2) 3512 3513 @testFDPassSeparate.client_skip 3514 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3515 @unittest.skipIf(AIX, "skipping, see issue #22397") 3516 def _testFDPassSeparate(self): 3517 fd0, fd1 = self.newFDs(2) 3518 self.assertEqual( 3519 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3520 socket.SCM_RIGHTS, 3521 array.array("i", [fd0])), 3522 (socket.SOL_SOCKET, 3523 socket.SCM_RIGHTS, 3524 array.array("i", [fd1]))]), 3525 len(MSG)) 3526 3527 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3528 @unittest.skipIf(AIX, "skipping, see issue #22397") 3529 @requireAttrs(socket, "CMSG_SPACE") 3530 def testFDPassSeparateMinSpace(self): 3531 # Pass two FDs in two separate arrays, receiving them into the 3532 # minimum space for two arrays. 3533 num_fds = 2 3534 self.checkRecvmsgFDs(num_fds, 3535 self.doRecvmsg(self.serv_sock, len(MSG), 3536 socket.CMSG_SPACE(SIZEOF_INT) + 3537 socket.CMSG_LEN(SIZEOF_INT * num_fds)), 3538 maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) 3539 3540 @testFDPassSeparateMinSpace.client_skip 3541 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3542 @unittest.skipIf(AIX, "skipping, see issue #22397") 3543 def _testFDPassSeparateMinSpace(self): 3544 fd0, fd1 = self.newFDs(2) 3545 self.assertEqual( 3546 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3547 socket.SCM_RIGHTS, 3548 array.array("i", [fd0])), 3549 (socket.SOL_SOCKET, 3550 socket.SCM_RIGHTS, 3551 array.array("i", [fd1]))]), 3552 len(MSG)) 3553 3554 def sendAncillaryIfPossible(self, msg, ancdata): 3555 # Try to send msg and ancdata to server, but if the system 3556 # call fails, just send msg with no ancillary data. 3557 try: 3558 nbytes = self.sendmsgToServer([msg], ancdata) 3559 except OSError as e: 3560 # Check that it was the system call that failed 3561 self.assertIsInstance(e.errno, int) 3562 nbytes = self.sendmsgToServer([msg]) 3563 self.assertEqual(nbytes, len(msg)) 3564 3565 @unittest.skipIf(sys.platform == "darwin", "see issue #24725") 3566 def testFDPassEmpty(self): 3567 # Try to pass an empty FD array. Can receive either no array 3568 # or an empty array. 3569 self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, 3570 len(MSG), 10240), 3571 ignoreflags=socket.MSG_CTRUNC) 3572 3573 def _testFDPassEmpty(self): 3574 self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, 3575 socket.SCM_RIGHTS, 3576 b"")]) 3577 3578 def testFDPassPartialInt(self): 3579 # Try to pass a truncated FD array. 3580 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3581 len(MSG), 10240) 3582 self.assertEqual(msg, MSG) 3583 self.checkRecvmsgAddress(addr, self.cli_addr) 3584 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3585 self.assertLessEqual(len(ancdata), 1) 3586 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3587 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3588 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3589 self.assertLess(len(cmsg_data), SIZEOF_INT) 3590 3591 def _testFDPassPartialInt(self): 3592 self.sendAncillaryIfPossible( 3593 MSG, 3594 [(socket.SOL_SOCKET, 3595 socket.SCM_RIGHTS, 3596 array.array("i", [self.badfd]).tobytes()[:-1])]) 3597 3598 @requireAttrs(socket, "CMSG_SPACE") 3599 def testFDPassPartialIntInMiddle(self): 3600 # Try to pass two FD arrays, the first of which is truncated. 3601 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3602 len(MSG), 10240) 3603 self.assertEqual(msg, MSG) 3604 self.checkRecvmsgAddress(addr, self.cli_addr) 3605 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3606 self.assertLessEqual(len(ancdata), 2) 3607 fds = array.array("i") 3608 # Arrays may have been combined in a single control message 3609 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3610 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3611 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3612 fds.frombytes(cmsg_data[: 3613 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3614 self.assertLessEqual(len(fds), 2) 3615 self.checkFDs(fds) 3616 3617 @testFDPassPartialIntInMiddle.client_skip 3618 def _testFDPassPartialIntInMiddle(self): 3619 fd0, fd1 = self.newFDs(2) 3620 self.sendAncillaryIfPossible( 3621 MSG, 3622 [(socket.SOL_SOCKET, 3623 socket.SCM_RIGHTS, 3624 array.array("i", [fd0, self.badfd]).tobytes()[:-1]), 3625 (socket.SOL_SOCKET, 3626 socket.SCM_RIGHTS, 3627 array.array("i", [fd1]))]) 3628 3629 def checkTruncatedHeader(self, result, ignoreflags=0): 3630 # Check that no ancillary data items are returned when data is 3631 # truncated inside the cmsghdr structure. 3632 msg, ancdata, flags, addr = result 3633 self.assertEqual(msg, MSG) 3634 self.checkRecvmsgAddress(addr, self.cli_addr) 3635 self.assertEqual(ancdata, []) 3636 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3637 ignore=ignoreflags) 3638 3639 def testCmsgTruncNoBufSize(self): 3640 # Check that no ancillary data is received when no buffer size 3641 # is specified. 3642 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), 3643 # BSD seems to set MSG_CTRUNC only 3644 # if an item has been partially 3645 # received. 3646 ignoreflags=socket.MSG_CTRUNC) 3647 3648 def _testCmsgTruncNoBufSize(self): 3649 self.createAndSendFDs(1) 3650 3651 def testCmsgTrunc0(self): 3652 # Check that no ancillary data is received when buffer size is 0. 3653 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), 3654 ignoreflags=socket.MSG_CTRUNC) 3655 3656 def _testCmsgTrunc0(self): 3657 self.createAndSendFDs(1) 3658 3659 # Check that no ancillary data is returned for various non-zero 3660 # (but still too small) buffer sizes. 3661 3662 def testCmsgTrunc1(self): 3663 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) 3664 3665 def _testCmsgTrunc1(self): 3666 self.createAndSendFDs(1) 3667 3668 def testCmsgTrunc2Int(self): 3669 # The cmsghdr structure has at least three members, two of 3670 # which are ints, so we still shouldn't see any ancillary 3671 # data. 3672 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3673 SIZEOF_INT * 2)) 3674 3675 def _testCmsgTrunc2Int(self): 3676 self.createAndSendFDs(1) 3677 3678 def testCmsgTruncLen0Minus1(self): 3679 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3680 socket.CMSG_LEN(0) - 1)) 3681 3682 def _testCmsgTruncLen0Minus1(self): 3683 self.createAndSendFDs(1) 3684 3685 # The following tests try to truncate the control message in the 3686 # middle of the FD array. 3687 3688 def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): 3689 # Check that file descriptor data is truncated to between 3690 # mindata and maxdata bytes when received with buffer size 3691 # ancbuf, and that any complete file descriptor numbers are 3692 # valid. 3693 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3694 len(MSG), ancbuf) 3695 self.assertEqual(msg, MSG) 3696 self.checkRecvmsgAddress(addr, self.cli_addr) 3697 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3698 3699 if mindata == 0 and ancdata == []: 3700 return 3701 self.assertEqual(len(ancdata), 1) 3702 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3703 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3704 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3705 self.assertGreaterEqual(len(cmsg_data), mindata) 3706 self.assertLessEqual(len(cmsg_data), maxdata) 3707 fds = array.array("i") 3708 fds.frombytes(cmsg_data[: 3709 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3710 self.checkFDs(fds) 3711 3712 def testCmsgTruncLen0(self): 3713 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) 3714 3715 def _testCmsgTruncLen0(self): 3716 self.createAndSendFDs(1) 3717 3718 def testCmsgTruncLen0Plus1(self): 3719 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) 3720 3721 def _testCmsgTruncLen0Plus1(self): 3722 self.createAndSendFDs(2) 3723 3724 def testCmsgTruncLen1(self): 3725 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), 3726 maxdata=SIZEOF_INT) 3727 3728 def _testCmsgTruncLen1(self): 3729 self.createAndSendFDs(2) 3730 3731 def testCmsgTruncLen2Minus1(self): 3732 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, 3733 maxdata=(2 * SIZEOF_INT) - 1) 3734 3735 def _testCmsgTruncLen2Minus1(self): 3736 self.createAndSendFDs(2) 3737 3738 3739class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): 3740 # Test sendmsg() and recvmsg[_into]() using the ancillary data 3741 # features of the RFC 3542 Advanced Sockets API for IPv6. 3742 # Currently we can only handle certain data items (e.g. traffic 3743 # class, hop limit, MTU discovery and fragmentation settings) 3744 # without resorting to unportable means such as the struct module, 3745 # but the tests here are aimed at testing the ancillary data 3746 # handling in sendmsg() and recvmsg() rather than the IPv6 API 3747 # itself. 3748 3749 # Test value to use when setting hop limit of packet 3750 hop_limit = 2 3751 3752 # Test value to use when setting traffic class of packet. 3753 # -1 means "use kernel default". 3754 traffic_class = -1 3755 3756 def ancillaryMapping(self, ancdata): 3757 # Given ancillary data list ancdata, return a mapping from 3758 # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. 3759 # Check that no (level, type) pair appears more than once. 3760 d = {} 3761 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3762 self.assertNotIn((cmsg_level, cmsg_type), d) 3763 d[(cmsg_level, cmsg_type)] = cmsg_data 3764 return d 3765 3766 def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): 3767 # Receive hop limit into ancbufsize bytes of ancillary data 3768 # space. Check that data is MSG, ancillary data is not 3769 # truncated (but ignore any flags in ignoreflags), and hop 3770 # limit is between 0 and maxhop inclusive. 3771 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3772 socket.IPV6_RECVHOPLIMIT, 1) 3773 self.misc_event.set() 3774 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3775 len(MSG), ancbufsize) 3776 3777 self.assertEqual(msg, MSG) 3778 self.checkRecvmsgAddress(addr, self.cli_addr) 3779 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3780 ignore=ignoreflags) 3781 3782 self.assertEqual(len(ancdata), 1) 3783 self.assertIsInstance(ancdata[0], tuple) 3784 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3785 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3786 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3787 self.assertIsInstance(cmsg_data, bytes) 3788 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3789 a = array.array("i") 3790 a.frombytes(cmsg_data) 3791 self.assertGreaterEqual(a[0], 0) 3792 self.assertLessEqual(a[0], maxhop) 3793 3794 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3795 def testRecvHopLimit(self): 3796 # Test receiving the packet hop limit as ancillary data. 3797 self.checkHopLimit(ancbufsize=10240) 3798 3799 @testRecvHopLimit.client_skip 3800 def _testRecvHopLimit(self): 3801 # Need to wait until server has asked to receive ancillary 3802 # data, as implementations are not required to buffer it 3803 # otherwise. 3804 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3805 self.sendToServer(MSG) 3806 3807 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3808 def testRecvHopLimitCMSG_SPACE(self): 3809 # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. 3810 self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) 3811 3812 @testRecvHopLimitCMSG_SPACE.client_skip 3813 def _testRecvHopLimitCMSG_SPACE(self): 3814 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3815 self.sendToServer(MSG) 3816 3817 # Could test receiving into buffer sized using CMSG_LEN, but RFC 3818 # 3542 says portable applications must provide space for trailing 3819 # padding. Implementations may set MSG_CTRUNC if there isn't 3820 # enough space for the padding. 3821 3822 @requireAttrs(socket.socket, "sendmsg") 3823 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3824 def testSetHopLimit(self): 3825 # Test setting hop limit on outgoing packet and receiving it 3826 # at the other end. 3827 self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) 3828 3829 @testSetHopLimit.client_skip 3830 def _testSetHopLimit(self): 3831 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3832 self.assertEqual( 3833 self.sendmsgToServer([MSG], 3834 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3835 array.array("i", [self.hop_limit]))]), 3836 len(MSG)) 3837 3838 def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, 3839 ignoreflags=0): 3840 # Receive traffic class and hop limit into ancbufsize bytes of 3841 # ancillary data space. Check that data is MSG, ancillary 3842 # data is not truncated (but ignore any flags in ignoreflags), 3843 # and traffic class and hop limit are in range (hop limit no 3844 # more than maxhop). 3845 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3846 socket.IPV6_RECVHOPLIMIT, 1) 3847 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3848 socket.IPV6_RECVTCLASS, 1) 3849 self.misc_event.set() 3850 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3851 len(MSG), ancbufsize) 3852 3853 self.assertEqual(msg, MSG) 3854 self.checkRecvmsgAddress(addr, self.cli_addr) 3855 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3856 ignore=ignoreflags) 3857 self.assertEqual(len(ancdata), 2) 3858 ancmap = self.ancillaryMapping(ancdata) 3859 3860 tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] 3861 self.assertEqual(len(tcdata), SIZEOF_INT) 3862 a = array.array("i") 3863 a.frombytes(tcdata) 3864 self.assertGreaterEqual(a[0], 0) 3865 self.assertLessEqual(a[0], 255) 3866 3867 hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] 3868 self.assertEqual(len(hldata), SIZEOF_INT) 3869 a = array.array("i") 3870 a.frombytes(hldata) 3871 self.assertGreaterEqual(a[0], 0) 3872 self.assertLessEqual(a[0], maxhop) 3873 3874 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3875 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3876 def testRecvTrafficClassAndHopLimit(self): 3877 # Test receiving traffic class and hop limit as ancillary data. 3878 self.checkTrafficClassAndHopLimit(ancbufsize=10240) 3879 3880 @testRecvTrafficClassAndHopLimit.client_skip 3881 def _testRecvTrafficClassAndHopLimit(self): 3882 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3883 self.sendToServer(MSG) 3884 3885 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3886 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3887 def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3888 # Test receiving traffic class and hop limit, using 3889 # CMSG_SPACE() to calculate buffer size. 3890 self.checkTrafficClassAndHopLimit( 3891 ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) 3892 3893 @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip 3894 def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3895 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3896 self.sendToServer(MSG) 3897 3898 @requireAttrs(socket.socket, "sendmsg") 3899 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3900 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3901 def testSetTrafficClassAndHopLimit(self): 3902 # Test setting traffic class and hop limit on outgoing packet, 3903 # and receiving them at the other end. 3904 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3905 maxhop=self.hop_limit) 3906 3907 @testSetTrafficClassAndHopLimit.client_skip 3908 def _testSetTrafficClassAndHopLimit(self): 3909 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3910 self.assertEqual( 3911 self.sendmsgToServer([MSG], 3912 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3913 array.array("i", [self.traffic_class])), 3914 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3915 array.array("i", [self.hop_limit]))]), 3916 len(MSG)) 3917 3918 @requireAttrs(socket.socket, "sendmsg") 3919 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3920 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3921 def testOddCmsgSize(self): 3922 # Try to send ancillary data with first item one byte too 3923 # long. Fall back to sending with correct size if this fails, 3924 # and check that second item was handled correctly. 3925 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3926 maxhop=self.hop_limit) 3927 3928 @testOddCmsgSize.client_skip 3929 def _testOddCmsgSize(self): 3930 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3931 try: 3932 nbytes = self.sendmsgToServer( 3933 [MSG], 3934 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3935 array.array("i", [self.traffic_class]).tobytes() + b"\x00"), 3936 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3937 array.array("i", [self.hop_limit]))]) 3938 except OSError as e: 3939 self.assertIsInstance(e.errno, int) 3940 nbytes = self.sendmsgToServer( 3941 [MSG], 3942 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3943 array.array("i", [self.traffic_class])), 3944 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3945 array.array("i", [self.hop_limit]))]) 3946 self.assertEqual(nbytes, len(MSG)) 3947 3948 # Tests for proper handling of truncated ancillary data 3949 3950 def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): 3951 # Receive hop limit into ancbufsize bytes of ancillary data 3952 # space, which should be too small to contain the ancillary 3953 # data header (if ancbufsize is None, pass no second argument 3954 # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set 3955 # (unless included in ignoreflags), and no ancillary data is 3956 # returned. 3957 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3958 socket.IPV6_RECVHOPLIMIT, 1) 3959 self.misc_event.set() 3960 args = () if ancbufsize is None else (ancbufsize,) 3961 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3962 len(MSG), *args) 3963 3964 self.assertEqual(msg, MSG) 3965 self.checkRecvmsgAddress(addr, self.cli_addr) 3966 self.assertEqual(ancdata, []) 3967 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3968 ignore=ignoreflags) 3969 3970 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3971 def testCmsgTruncNoBufSize(self): 3972 # Check that no ancillary data is received when no ancillary 3973 # buffer size is provided. 3974 self.checkHopLimitTruncatedHeader(ancbufsize=None, 3975 # BSD seems to set 3976 # MSG_CTRUNC only if an item 3977 # has been partially 3978 # received. 3979 ignoreflags=socket.MSG_CTRUNC) 3980 3981 @testCmsgTruncNoBufSize.client_skip 3982 def _testCmsgTruncNoBufSize(self): 3983 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3984 self.sendToServer(MSG) 3985 3986 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3987 def testSingleCmsgTrunc0(self): 3988 # Check that no ancillary data is received when ancillary 3989 # buffer size is zero. 3990 self.checkHopLimitTruncatedHeader(ancbufsize=0, 3991 ignoreflags=socket.MSG_CTRUNC) 3992 3993 @testSingleCmsgTrunc0.client_skip 3994 def _testSingleCmsgTrunc0(self): 3995 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3996 self.sendToServer(MSG) 3997 3998 # Check that no ancillary data is returned for various non-zero 3999 # (but still too small) buffer sizes. 4000 4001 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4002 def testSingleCmsgTrunc1(self): 4003 self.checkHopLimitTruncatedHeader(ancbufsize=1) 4004 4005 @testSingleCmsgTrunc1.client_skip 4006 def _testSingleCmsgTrunc1(self): 4007 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4008 self.sendToServer(MSG) 4009 4010 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4011 def testSingleCmsgTrunc2Int(self): 4012 self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) 4013 4014 @testSingleCmsgTrunc2Int.client_skip 4015 def _testSingleCmsgTrunc2Int(self): 4016 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4017 self.sendToServer(MSG) 4018 4019 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4020 def testSingleCmsgTruncLen0Minus1(self): 4021 self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) 4022 4023 @testSingleCmsgTruncLen0Minus1.client_skip 4024 def _testSingleCmsgTruncLen0Minus1(self): 4025 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4026 self.sendToServer(MSG) 4027 4028 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4029 def testSingleCmsgTruncInData(self): 4030 # Test truncation of a control message inside its associated 4031 # data. The message may be returned with its data truncated, 4032 # or not returned at all. 4033 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4034 socket.IPV6_RECVHOPLIMIT, 1) 4035 self.misc_event.set() 4036 msg, ancdata, flags, addr = self.doRecvmsg( 4037 self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) 4038 4039 self.assertEqual(msg, MSG) 4040 self.checkRecvmsgAddress(addr, self.cli_addr) 4041 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4042 4043 self.assertLessEqual(len(ancdata), 1) 4044 if ancdata: 4045 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4046 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4047 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 4048 self.assertLess(len(cmsg_data), SIZEOF_INT) 4049 4050 @testSingleCmsgTruncInData.client_skip 4051 def _testSingleCmsgTruncInData(self): 4052 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4053 self.sendToServer(MSG) 4054 4055 def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): 4056 # Receive traffic class and hop limit into ancbufsize bytes of 4057 # ancillary data space, which should be large enough to 4058 # contain the first item, but too small to contain the header 4059 # of the second. Check that data is MSG, MSG_CTRUNC is set 4060 # (unless included in ignoreflags), and only one ancillary 4061 # data item is returned. 4062 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4063 socket.IPV6_RECVHOPLIMIT, 1) 4064 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4065 socket.IPV6_RECVTCLASS, 1) 4066 self.misc_event.set() 4067 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 4068 len(MSG), ancbufsize) 4069 4070 self.assertEqual(msg, MSG) 4071 self.checkRecvmsgAddress(addr, self.cli_addr) 4072 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 4073 ignore=ignoreflags) 4074 4075 self.assertEqual(len(ancdata), 1) 4076 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4077 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4078 self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) 4079 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4080 a = array.array("i") 4081 a.frombytes(cmsg_data) 4082 self.assertGreaterEqual(a[0], 0) 4083 self.assertLessEqual(a[0], 255) 4084 4085 # Try the above test with various buffer sizes. 4086 4087 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4088 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4089 def testSecondCmsgTrunc0(self): 4090 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), 4091 ignoreflags=socket.MSG_CTRUNC) 4092 4093 @testSecondCmsgTrunc0.client_skip 4094 def _testSecondCmsgTrunc0(self): 4095 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4096 self.sendToServer(MSG) 4097 4098 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4099 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4100 def testSecondCmsgTrunc1(self): 4101 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) 4102 4103 @testSecondCmsgTrunc1.client_skip 4104 def _testSecondCmsgTrunc1(self): 4105 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4106 self.sendToServer(MSG) 4107 4108 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4109 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4110 def testSecondCmsgTrunc2Int(self): 4111 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4112 2 * SIZEOF_INT) 4113 4114 @testSecondCmsgTrunc2Int.client_skip 4115 def _testSecondCmsgTrunc2Int(self): 4116 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4117 self.sendToServer(MSG) 4118 4119 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4120 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4121 def testSecondCmsgTruncLen0Minus1(self): 4122 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4123 socket.CMSG_LEN(0) - 1) 4124 4125 @testSecondCmsgTruncLen0Minus1.client_skip 4126 def _testSecondCmsgTruncLen0Minus1(self): 4127 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4128 self.sendToServer(MSG) 4129 4130 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4131 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4132 def testSecomdCmsgTruncInData(self): 4133 # Test truncation of the second of two control messages inside 4134 # its associated data. 4135 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4136 socket.IPV6_RECVHOPLIMIT, 1) 4137 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4138 socket.IPV6_RECVTCLASS, 1) 4139 self.misc_event.set() 4140 msg, ancdata, flags, addr = self.doRecvmsg( 4141 self.serv_sock, len(MSG), 4142 socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) 4143 4144 self.assertEqual(msg, MSG) 4145 self.checkRecvmsgAddress(addr, self.cli_addr) 4146 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4147 4148 cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} 4149 4150 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4151 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4152 cmsg_types.remove(cmsg_type) 4153 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4154 a = array.array("i") 4155 a.frombytes(cmsg_data) 4156 self.assertGreaterEqual(a[0], 0) 4157 self.assertLessEqual(a[0], 255) 4158 4159 if ancdata: 4160 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4161 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4162 cmsg_types.remove(cmsg_type) 4163 self.assertLess(len(cmsg_data), SIZEOF_INT) 4164 4165 self.assertEqual(ancdata, []) 4166 4167 @testSecomdCmsgTruncInData.client_skip 4168 def _testSecomdCmsgTruncInData(self): 4169 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4170 self.sendToServer(MSG) 4171 4172 4173# Derive concrete test classes for different socket types. 4174 4175class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, 4176 SendrecvmsgConnectionlessBase, 4177 ThreadedSocketTestMixin, UDPTestBase): 4178 pass 4179 4180@requireAttrs(socket.socket, "sendmsg") 4181class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): 4182 pass 4183 4184@requireAttrs(socket.socket, "recvmsg") 4185class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): 4186 pass 4187 4188@requireAttrs(socket.socket, "recvmsg_into") 4189class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): 4190 pass 4191 4192 4193class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, 4194 SendrecvmsgConnectionlessBase, 4195 ThreadedSocketTestMixin, UDP6TestBase): 4196 4197 def checkRecvmsgAddress(self, addr1, addr2): 4198 # Called to compare the received address with the address of 4199 # the peer, ignoring scope ID 4200 self.assertEqual(addr1[:-1], addr2[:-1]) 4201 4202@requireAttrs(socket.socket, "sendmsg") 4203@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4204@requireSocket("AF_INET6", "SOCK_DGRAM") 4205class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): 4206 pass 4207 4208@requireAttrs(socket.socket, "recvmsg") 4209@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4210@requireSocket("AF_INET6", "SOCK_DGRAM") 4211class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): 4212 pass 4213 4214@requireAttrs(socket.socket, "recvmsg_into") 4215@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4216@requireSocket("AF_INET6", "SOCK_DGRAM") 4217class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): 4218 pass 4219 4220@requireAttrs(socket.socket, "recvmsg") 4221@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4222@requireAttrs(socket, "IPPROTO_IPV6") 4223@requireSocket("AF_INET6", "SOCK_DGRAM") 4224class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, 4225 SendrecvmsgUDP6TestBase): 4226 pass 4227 4228@requireAttrs(socket.socket, "recvmsg_into") 4229@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4230@requireAttrs(socket, "IPPROTO_IPV6") 4231@requireSocket("AF_INET6", "SOCK_DGRAM") 4232class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, 4233 RFC3542AncillaryTest, 4234 SendrecvmsgUDP6TestBase): 4235 pass 4236 4237 4238@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4239 'UDPLITE sockets required for this test.') 4240class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase, 4241 SendrecvmsgConnectionlessBase, 4242 ThreadedSocketTestMixin, UDPLITETestBase): 4243 pass 4244 4245@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4246 'UDPLITE sockets required for this test.') 4247@requireAttrs(socket.socket, "sendmsg") 4248class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase): 4249 pass 4250 4251@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4252 'UDPLITE sockets required for this test.') 4253@requireAttrs(socket.socket, "recvmsg") 4254class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase): 4255 pass 4256 4257@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4258 'UDPLITE sockets required for this test.') 4259@requireAttrs(socket.socket, "recvmsg_into") 4260class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase): 4261 pass 4262 4263 4264@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4265 'UDPLITE sockets required for this test.') 4266class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase, 4267 SendrecvmsgConnectionlessBase, 4268 ThreadedSocketTestMixin, UDPLITE6TestBase): 4269 4270 def checkRecvmsgAddress(self, addr1, addr2): 4271 # Called to compare the received address with the address of 4272 # the peer, ignoring scope ID 4273 self.assertEqual(addr1[:-1], addr2[:-1]) 4274 4275@requireAttrs(socket.socket, "sendmsg") 4276@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4277@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4278 'UDPLITE sockets required for this test.') 4279@requireSocket("AF_INET6", "SOCK_DGRAM") 4280class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase): 4281 pass 4282 4283@requireAttrs(socket.socket, "recvmsg") 4284@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4285@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4286 'UDPLITE sockets required for this test.') 4287@requireSocket("AF_INET6", "SOCK_DGRAM") 4288class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase): 4289 pass 4290 4291@requireAttrs(socket.socket, "recvmsg_into") 4292@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4293@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4294 'UDPLITE sockets required for this test.') 4295@requireSocket("AF_INET6", "SOCK_DGRAM") 4296class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase): 4297 pass 4298 4299@requireAttrs(socket.socket, "recvmsg") 4300@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4301@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4302 'UDPLITE sockets required for this test.') 4303@requireAttrs(socket, "IPPROTO_IPV6") 4304@requireSocket("AF_INET6", "SOCK_DGRAM") 4305class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest, 4306 SendrecvmsgUDPLITE6TestBase): 4307 pass 4308 4309@requireAttrs(socket.socket, "recvmsg_into") 4310@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4311@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4312 'UDPLITE sockets required for this test.') 4313@requireAttrs(socket, "IPPROTO_IPV6") 4314@requireSocket("AF_INET6", "SOCK_DGRAM") 4315class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin, 4316 RFC3542AncillaryTest, 4317 SendrecvmsgUDPLITE6TestBase): 4318 pass 4319 4320 4321class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, 4322 ConnectedStreamTestMixin, TCPTestBase): 4323 pass 4324 4325@requireAttrs(socket.socket, "sendmsg") 4326class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): 4327 pass 4328 4329@requireAttrs(socket.socket, "recvmsg") 4330class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, 4331 SendrecvmsgTCPTestBase): 4332 pass 4333 4334@requireAttrs(socket.socket, "recvmsg_into") 4335class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4336 SendrecvmsgTCPTestBase): 4337 pass 4338 4339 4340class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, 4341 SendrecvmsgConnectedBase, 4342 ConnectedStreamTestMixin, SCTPStreamBase): 4343 pass 4344 4345@requireAttrs(socket.socket, "sendmsg") 4346@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4347@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4348class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): 4349 pass 4350 4351@requireAttrs(socket.socket, "recvmsg") 4352@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4353@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4354class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4355 SendrecvmsgSCTPStreamTestBase): 4356 4357 def testRecvmsgEOF(self): 4358 try: 4359 super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() 4360 except OSError as e: 4361 if e.errno != errno.ENOTCONN: 4362 raise 4363 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4364 4365@requireAttrs(socket.socket, "recvmsg_into") 4366@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4367@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4368class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4369 SendrecvmsgSCTPStreamTestBase): 4370 4371 def testRecvmsgEOF(self): 4372 try: 4373 super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() 4374 except OSError as e: 4375 if e.errno != errno.ENOTCONN: 4376 raise 4377 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4378 4379 4380class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, 4381 ConnectedStreamTestMixin, UnixStreamBase): 4382 pass 4383 4384@requireAttrs(socket.socket, "sendmsg") 4385@requireAttrs(socket, "AF_UNIX") 4386class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): 4387 pass 4388 4389@requireAttrs(socket.socket, "recvmsg") 4390@requireAttrs(socket, "AF_UNIX") 4391class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4392 SendrecvmsgUnixStreamTestBase): 4393 pass 4394 4395@requireAttrs(socket.socket, "recvmsg_into") 4396@requireAttrs(socket, "AF_UNIX") 4397class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4398 SendrecvmsgUnixStreamTestBase): 4399 pass 4400 4401@requireAttrs(socket.socket, "sendmsg", "recvmsg") 4402@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4403class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): 4404 pass 4405 4406@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") 4407@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4408class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, 4409 SendrecvmsgUnixStreamTestBase): 4410 pass 4411 4412 4413# Test interrupting the interruptible send/receive methods with a 4414# signal when a timeout is set. These tests avoid having multiple 4415# threads alive during the test so that the OS cannot deliver the 4416# signal to the wrong one. 4417 4418class InterruptedTimeoutBase(unittest.TestCase): 4419 # Base class for interrupted send/receive tests. Installs an 4420 # empty handler for SIGALRM and removes it on teardown, along with 4421 # any scheduled alarms. 4422 4423 def setUp(self): 4424 super().setUp() 4425 orig_alrm_handler = signal.signal(signal.SIGALRM, 4426 lambda signum, frame: 1 / 0) 4427 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) 4428 4429 # Timeout for socket operations 4430 timeout = support.LOOPBACK_TIMEOUT 4431 4432 # Provide setAlarm() method to schedule delivery of SIGALRM after 4433 # given number of seconds, or cancel it if zero, and an 4434 # appropriate time value to use. Use setitimer() if available. 4435 if hasattr(signal, "setitimer"): 4436 alarm_time = 0.05 4437 4438 def setAlarm(self, seconds): 4439 signal.setitimer(signal.ITIMER_REAL, seconds) 4440 else: 4441 # Old systems may deliver the alarm up to one second early 4442 alarm_time = 2 4443 4444 def setAlarm(self, seconds): 4445 signal.alarm(seconds) 4446 4447 4448# Require siginterrupt() in order to ensure that system calls are 4449# interrupted by default. 4450@requireAttrs(signal, "siginterrupt") 4451@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4452 "Don't have signal.alarm or signal.setitimer") 4453class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): 4454 # Test interrupting the recv*() methods with signals when a 4455 # timeout is set. 4456 4457 def setUp(self): 4458 super().setUp() 4459 self.serv.settimeout(self.timeout) 4460 4461 def checkInterruptedRecv(self, func, *args, **kwargs): 4462 # Check that func(*args, **kwargs) raises 4463 # errno of EINTR when interrupted by a signal. 4464 try: 4465 self.setAlarm(self.alarm_time) 4466 with self.assertRaises(ZeroDivisionError) as cm: 4467 func(*args, **kwargs) 4468 finally: 4469 self.setAlarm(0) 4470 4471 def testInterruptedRecvTimeout(self): 4472 self.checkInterruptedRecv(self.serv.recv, 1024) 4473 4474 def testInterruptedRecvIntoTimeout(self): 4475 self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) 4476 4477 def testInterruptedRecvfromTimeout(self): 4478 self.checkInterruptedRecv(self.serv.recvfrom, 1024) 4479 4480 def testInterruptedRecvfromIntoTimeout(self): 4481 self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) 4482 4483 @requireAttrs(socket.socket, "recvmsg") 4484 def testInterruptedRecvmsgTimeout(self): 4485 self.checkInterruptedRecv(self.serv.recvmsg, 1024) 4486 4487 @requireAttrs(socket.socket, "recvmsg_into") 4488 def testInterruptedRecvmsgIntoTimeout(self): 4489 self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) 4490 4491 4492# Require siginterrupt() in order to ensure that system calls are 4493# interrupted by default. 4494@requireAttrs(signal, "siginterrupt") 4495@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4496 "Don't have signal.alarm or signal.setitimer") 4497class InterruptedSendTimeoutTest(InterruptedTimeoutBase, 4498 ThreadSafeCleanupTestCase, 4499 SocketListeningTestMixin, TCPTestBase): 4500 # Test interrupting the interruptible send*() methods with signals 4501 # when a timeout is set. 4502 4503 def setUp(self): 4504 super().setUp() 4505 self.serv_conn = self.newSocket() 4506 self.addCleanup(self.serv_conn.close) 4507 # Use a thread to complete the connection, but wait for it to 4508 # terminate before running the test, so that there is only one 4509 # thread to accept the signal. 4510 cli_thread = threading.Thread(target=self.doConnect) 4511 cli_thread.start() 4512 self.cli_conn, addr = self.serv.accept() 4513 self.addCleanup(self.cli_conn.close) 4514 cli_thread.join() 4515 self.serv_conn.settimeout(self.timeout) 4516 4517 def doConnect(self): 4518 self.serv_conn.connect(self.serv_addr) 4519 4520 def checkInterruptedSend(self, func, *args, **kwargs): 4521 # Check that func(*args, **kwargs), run in a loop, raises 4522 # OSError with an errno of EINTR when interrupted by a 4523 # signal. 4524 try: 4525 with self.assertRaises(ZeroDivisionError) as cm: 4526 while True: 4527 self.setAlarm(self.alarm_time) 4528 func(*args, **kwargs) 4529 finally: 4530 self.setAlarm(0) 4531 4532 # Issue #12958: The following tests have problems on OS X prior to 10.7 4533 @support.requires_mac_ver(10, 7) 4534 def testInterruptedSendTimeout(self): 4535 self.checkInterruptedSend(self.serv_conn.send, b"a"*512) 4536 4537 @support.requires_mac_ver(10, 7) 4538 def testInterruptedSendtoTimeout(self): 4539 # Passing an actual address here as Python's wrapper for 4540 # sendto() doesn't allow passing a zero-length one; POSIX 4541 # requires that the address is ignored since the socket is 4542 # connection-mode, however. 4543 self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, 4544 self.serv_addr) 4545 4546 @support.requires_mac_ver(10, 7) 4547 @requireAttrs(socket.socket, "sendmsg") 4548 def testInterruptedSendmsgTimeout(self): 4549 self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) 4550 4551 4552class TCPCloserTest(ThreadedTCPSocketTest): 4553 4554 def testClose(self): 4555 conn, addr = self.serv.accept() 4556 conn.close() 4557 4558 sd = self.cli 4559 read, write, err = select.select([sd], [], [], 1.0) 4560 self.assertEqual(read, [sd]) 4561 self.assertEqual(sd.recv(1), b'') 4562 4563 # Calling close() many times should be safe. 4564 conn.close() 4565 conn.close() 4566 4567 def _testClose(self): 4568 self.cli.connect((HOST, self.port)) 4569 time.sleep(1.0) 4570 4571 4572class BasicSocketPairTest(SocketPairTest): 4573 4574 def __init__(self, methodName='runTest'): 4575 SocketPairTest.__init__(self, methodName=methodName) 4576 4577 def _check_defaults(self, sock): 4578 self.assertIsInstance(sock, socket.socket) 4579 if hasattr(socket, 'AF_UNIX'): 4580 self.assertEqual(sock.family, socket.AF_UNIX) 4581 else: 4582 self.assertEqual(sock.family, socket.AF_INET) 4583 self.assertEqual(sock.type, socket.SOCK_STREAM) 4584 self.assertEqual(sock.proto, 0) 4585 4586 def _testDefaults(self): 4587 self._check_defaults(self.cli) 4588 4589 def testDefaults(self): 4590 self._check_defaults(self.serv) 4591 4592 def testRecv(self): 4593 msg = self.serv.recv(1024) 4594 self.assertEqual(msg, MSG) 4595 4596 def _testRecv(self): 4597 self.cli.send(MSG) 4598 4599 def testSend(self): 4600 self.serv.send(MSG) 4601 4602 def _testSend(self): 4603 msg = self.cli.recv(1024) 4604 self.assertEqual(msg, MSG) 4605 4606 4607class NonBlockingTCPTests(ThreadedTCPSocketTest): 4608 4609 def __init__(self, methodName='runTest'): 4610 self.event = threading.Event() 4611 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 4612 4613 def assert_sock_timeout(self, sock, timeout): 4614 self.assertEqual(self.serv.gettimeout(), timeout) 4615 4616 blocking = (timeout != 0.0) 4617 self.assertEqual(sock.getblocking(), blocking) 4618 4619 if fcntl is not None: 4620 # When a Python socket has a non-zero timeout, it's switched 4621 # internally to a non-blocking mode. Later, sock.sendall(), 4622 # sock.recv(), and other socket operations use a select() call and 4623 # handle EWOULDBLOCK/EGAIN on all socket operations. That's how 4624 # timeouts are enforced. 4625 fd_blocking = (timeout is None) 4626 4627 flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK) 4628 self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking) 4629 4630 def testSetBlocking(self): 4631 # Test setblocking() and settimeout() methods 4632 self.serv.setblocking(True) 4633 self.assert_sock_timeout(self.serv, None) 4634 4635 self.serv.setblocking(False) 4636 self.assert_sock_timeout(self.serv, 0.0) 4637 4638 self.serv.settimeout(None) 4639 self.assert_sock_timeout(self.serv, None) 4640 4641 self.serv.settimeout(0) 4642 self.assert_sock_timeout(self.serv, 0) 4643 4644 self.serv.settimeout(10) 4645 self.assert_sock_timeout(self.serv, 10) 4646 4647 self.serv.settimeout(0) 4648 self.assert_sock_timeout(self.serv, 0) 4649 4650 def _testSetBlocking(self): 4651 pass 4652 4653 @support.cpython_only 4654 def testSetBlocking_overflow(self): 4655 # Issue 15989 4656 import _testcapi 4657 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 4658 self.skipTest('needs UINT_MAX < ULONG_MAX') 4659 4660 self.serv.setblocking(False) 4661 self.assertEqual(self.serv.gettimeout(), 0.0) 4662 4663 self.serv.setblocking(_testcapi.UINT_MAX + 1) 4664 self.assertIsNone(self.serv.gettimeout()) 4665 4666 _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) 4667 4668 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 4669 'test needs socket.SOCK_NONBLOCK') 4670 @support.requires_linux_version(2, 6, 28) 4671 def testInitNonBlocking(self): 4672 # create a socket with SOCK_NONBLOCK 4673 self.serv.close() 4674 self.serv = socket.socket(socket.AF_INET, 4675 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 4676 self.assert_sock_timeout(self.serv, 0) 4677 4678 def _testInitNonBlocking(self): 4679 pass 4680 4681 def testInheritFlagsBlocking(self): 4682 # bpo-7995: accept() on a listening socket with a timeout and the 4683 # default timeout is None, the resulting socket must be blocking. 4684 with socket_setdefaulttimeout(None): 4685 self.serv.settimeout(10) 4686 conn, addr = self.serv.accept() 4687 self.addCleanup(conn.close) 4688 self.assertIsNone(conn.gettimeout()) 4689 4690 def _testInheritFlagsBlocking(self): 4691 self.cli.connect((HOST, self.port)) 4692 4693 def testInheritFlagsTimeout(self): 4694 # bpo-7995: accept() on a listening socket with a timeout and the 4695 # default timeout is None, the resulting socket must inherit 4696 # the default timeout. 4697 default_timeout = 20.0 4698 with socket_setdefaulttimeout(default_timeout): 4699 self.serv.settimeout(10) 4700 conn, addr = self.serv.accept() 4701 self.addCleanup(conn.close) 4702 self.assertEqual(conn.gettimeout(), default_timeout) 4703 4704 def _testInheritFlagsTimeout(self): 4705 self.cli.connect((HOST, self.port)) 4706 4707 def testAccept(self): 4708 # Testing non-blocking accept 4709 self.serv.setblocking(False) 4710 4711 # connect() didn't start: non-blocking accept() fails 4712 start_time = time.monotonic() 4713 with self.assertRaises(BlockingIOError): 4714 conn, addr = self.serv.accept() 4715 dt = time.monotonic() - start_time 4716 self.assertLess(dt, 1.0) 4717 4718 self.event.set() 4719 4720 read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT) 4721 if self.serv not in read: 4722 self.fail("Error trying to do accept after select.") 4723 4724 # connect() completed: non-blocking accept() doesn't block 4725 conn, addr = self.serv.accept() 4726 self.addCleanup(conn.close) 4727 self.assertIsNone(conn.gettimeout()) 4728 4729 def _testAccept(self): 4730 # don't connect before event is set to check 4731 # that non-blocking accept() raises BlockingIOError 4732 self.event.wait() 4733 4734 self.cli.connect((HOST, self.port)) 4735 4736 def testRecv(self): 4737 # Testing non-blocking recv 4738 conn, addr = self.serv.accept() 4739 self.addCleanup(conn.close) 4740 conn.setblocking(False) 4741 4742 # the server didn't send data yet: non-blocking recv() fails 4743 with self.assertRaises(BlockingIOError): 4744 msg = conn.recv(len(MSG)) 4745 4746 self.event.set() 4747 4748 read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT) 4749 if conn not in read: 4750 self.fail("Error during select call to non-blocking socket.") 4751 4752 # the server sent data yet: non-blocking recv() doesn't block 4753 msg = conn.recv(len(MSG)) 4754 self.assertEqual(msg, MSG) 4755 4756 def _testRecv(self): 4757 self.cli.connect((HOST, self.port)) 4758 4759 # don't send anything before event is set to check 4760 # that non-blocking recv() raises BlockingIOError 4761 self.event.wait() 4762 4763 # send data: recv() will no longer block 4764 self.cli.sendall(MSG) 4765 4766 4767class FileObjectClassTestCase(SocketConnectedTest): 4768 """Unit tests for the object returned by socket.makefile() 4769 4770 self.read_file is the io object returned by makefile() on 4771 the client connection. You can read from this file to 4772 get output from the server. 4773 4774 self.write_file is the io object returned by makefile() on the 4775 server connection. You can write to this file to send output 4776 to the client. 4777 """ 4778 4779 bufsize = -1 # Use default buffer size 4780 encoding = 'utf-8' 4781 errors = 'strict' 4782 newline = None 4783 4784 read_mode = 'rb' 4785 read_msg = MSG 4786 write_mode = 'wb' 4787 write_msg = MSG 4788 4789 def __init__(self, methodName='runTest'): 4790 SocketConnectedTest.__init__(self, methodName=methodName) 4791 4792 def setUp(self): 4793 self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ 4794 threading.Event() for i in range(4)] 4795 SocketConnectedTest.setUp(self) 4796 self.read_file = self.cli_conn.makefile( 4797 self.read_mode, self.bufsize, 4798 encoding = self.encoding, 4799 errors = self.errors, 4800 newline = self.newline) 4801 4802 def tearDown(self): 4803 self.serv_finished.set() 4804 self.read_file.close() 4805 self.assertTrue(self.read_file.closed) 4806 self.read_file = None 4807 SocketConnectedTest.tearDown(self) 4808 4809 def clientSetUp(self): 4810 SocketConnectedTest.clientSetUp(self) 4811 self.write_file = self.serv_conn.makefile( 4812 self.write_mode, self.bufsize, 4813 encoding = self.encoding, 4814 errors = self.errors, 4815 newline = self.newline) 4816 4817 def clientTearDown(self): 4818 self.cli_finished.set() 4819 self.write_file.close() 4820 self.assertTrue(self.write_file.closed) 4821 self.write_file = None 4822 SocketConnectedTest.clientTearDown(self) 4823 4824 def testReadAfterTimeout(self): 4825 # Issue #7322: A file object must disallow further reads 4826 # after a timeout has occurred. 4827 self.cli_conn.settimeout(1) 4828 self.read_file.read(3) 4829 # First read raises a timeout 4830 self.assertRaises(socket.timeout, self.read_file.read, 1) 4831 # Second read is disallowed 4832 with self.assertRaises(OSError) as ctx: 4833 self.read_file.read(1) 4834 self.assertIn("cannot read from timed out object", str(ctx.exception)) 4835 4836 def _testReadAfterTimeout(self): 4837 self.write_file.write(self.write_msg[0:3]) 4838 self.write_file.flush() 4839 self.serv_finished.wait() 4840 4841 def testSmallRead(self): 4842 # Performing small file read test 4843 first_seg = self.read_file.read(len(self.read_msg)-3) 4844 second_seg = self.read_file.read(3) 4845 msg = first_seg + second_seg 4846 self.assertEqual(msg, self.read_msg) 4847 4848 def _testSmallRead(self): 4849 self.write_file.write(self.write_msg) 4850 self.write_file.flush() 4851 4852 def testFullRead(self): 4853 # read until EOF 4854 msg = self.read_file.read() 4855 self.assertEqual(msg, self.read_msg) 4856 4857 def _testFullRead(self): 4858 self.write_file.write(self.write_msg) 4859 self.write_file.close() 4860 4861 def testUnbufferedRead(self): 4862 # Performing unbuffered file read test 4863 buf = type(self.read_msg)() 4864 while 1: 4865 char = self.read_file.read(1) 4866 if not char: 4867 break 4868 buf += char 4869 self.assertEqual(buf, self.read_msg) 4870 4871 def _testUnbufferedRead(self): 4872 self.write_file.write(self.write_msg) 4873 self.write_file.flush() 4874 4875 def testReadline(self): 4876 # Performing file readline test 4877 line = self.read_file.readline() 4878 self.assertEqual(line, self.read_msg) 4879 4880 def _testReadline(self): 4881 self.write_file.write(self.write_msg) 4882 self.write_file.flush() 4883 4884 def testCloseAfterMakefile(self): 4885 # The file returned by makefile should keep the socket open. 4886 self.cli_conn.close() 4887 # read until EOF 4888 msg = self.read_file.read() 4889 self.assertEqual(msg, self.read_msg) 4890 4891 def _testCloseAfterMakefile(self): 4892 self.write_file.write(self.write_msg) 4893 self.write_file.flush() 4894 4895 def testMakefileAfterMakefileClose(self): 4896 self.read_file.close() 4897 msg = self.cli_conn.recv(len(MSG)) 4898 if isinstance(self.read_msg, str): 4899 msg = msg.decode() 4900 self.assertEqual(msg, self.read_msg) 4901 4902 def _testMakefileAfterMakefileClose(self): 4903 self.write_file.write(self.write_msg) 4904 self.write_file.flush() 4905 4906 def testClosedAttr(self): 4907 self.assertTrue(not self.read_file.closed) 4908 4909 def _testClosedAttr(self): 4910 self.assertTrue(not self.write_file.closed) 4911 4912 def testAttributes(self): 4913 self.assertEqual(self.read_file.mode, self.read_mode) 4914 self.assertEqual(self.read_file.name, self.cli_conn.fileno()) 4915 4916 def _testAttributes(self): 4917 self.assertEqual(self.write_file.mode, self.write_mode) 4918 self.assertEqual(self.write_file.name, self.serv_conn.fileno()) 4919 4920 def testRealClose(self): 4921 self.read_file.close() 4922 self.assertRaises(ValueError, self.read_file.fileno) 4923 self.cli_conn.close() 4924 self.assertRaises(OSError, self.cli_conn.getsockname) 4925 4926 def _testRealClose(self): 4927 pass 4928 4929 4930class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 4931 4932 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 4933 4934 In this case (and in this case only), it should be possible to 4935 create a file object, read a line from it, create another file 4936 object, read another line from it, without loss of data in the 4937 first file object's buffer. Note that http.client relies on this 4938 when reading multiple requests from the same socket.""" 4939 4940 bufsize = 0 # Use unbuffered mode 4941 4942 def testUnbufferedReadline(self): 4943 # Read a line, create a new file object, read another line with it 4944 line = self.read_file.readline() # first line 4945 self.assertEqual(line, b"A. " + self.write_msg) # first line 4946 self.read_file = self.cli_conn.makefile('rb', 0) 4947 line = self.read_file.readline() # second line 4948 self.assertEqual(line, b"B. " + self.write_msg) # second line 4949 4950 def _testUnbufferedReadline(self): 4951 self.write_file.write(b"A. " + self.write_msg) 4952 self.write_file.write(b"B. " + self.write_msg) 4953 self.write_file.flush() 4954 4955 def testMakefileClose(self): 4956 # The file returned by makefile should keep the socket open... 4957 self.cli_conn.close() 4958 msg = self.cli_conn.recv(1024) 4959 self.assertEqual(msg, self.read_msg) 4960 # ...until the file is itself closed 4961 self.read_file.close() 4962 self.assertRaises(OSError, self.cli_conn.recv, 1024) 4963 4964 def _testMakefileClose(self): 4965 self.write_file.write(self.write_msg) 4966 self.write_file.flush() 4967 4968 def testMakefileCloseSocketDestroy(self): 4969 refcount_before = sys.getrefcount(self.cli_conn) 4970 self.read_file.close() 4971 refcount_after = sys.getrefcount(self.cli_conn) 4972 self.assertEqual(refcount_before - 1, refcount_after) 4973 4974 def _testMakefileCloseSocketDestroy(self): 4975 pass 4976 4977 # Non-blocking ops 4978 # NOTE: to set `read_file` as non-blocking, we must call 4979 # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). 4980 4981 def testSmallReadNonBlocking(self): 4982 self.cli_conn.setblocking(False) 4983 self.assertEqual(self.read_file.readinto(bytearray(10)), None) 4984 self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) 4985 self.evt1.set() 4986 self.evt2.wait(1.0) 4987 first_seg = self.read_file.read(len(self.read_msg) - 3) 4988 if first_seg is None: 4989 # Data not arrived (can happen under Windows), wait a bit 4990 time.sleep(0.5) 4991 first_seg = self.read_file.read(len(self.read_msg) - 3) 4992 buf = bytearray(10) 4993 n = self.read_file.readinto(buf) 4994 self.assertEqual(n, 3) 4995 msg = first_seg + buf[:n] 4996 self.assertEqual(msg, self.read_msg) 4997 self.assertEqual(self.read_file.readinto(bytearray(16)), None) 4998 self.assertEqual(self.read_file.read(1), None) 4999 5000 def _testSmallReadNonBlocking(self): 5001 self.evt1.wait(1.0) 5002 self.write_file.write(self.write_msg) 5003 self.write_file.flush() 5004 self.evt2.set() 5005 # Avoid closing the socket before the server test has finished, 5006 # otherwise system recv() will return 0 instead of EWOULDBLOCK. 5007 self.serv_finished.wait(5.0) 5008 5009 def testWriteNonBlocking(self): 5010 self.cli_finished.wait(5.0) 5011 # The client thread can't skip directly - the SkipTest exception 5012 # would appear as a failure. 5013 if self.serv_skipped: 5014 self.skipTest(self.serv_skipped) 5015 5016 def _testWriteNonBlocking(self): 5017 self.serv_skipped = None 5018 self.serv_conn.setblocking(False) 5019 # Try to saturate the socket buffer pipe with repeated large writes. 5020 BIG = b"x" * support.SOCK_MAX_SIZE 5021 LIMIT = 10 5022 # The first write() succeeds since a chunk of data can be buffered 5023 n = self.write_file.write(BIG) 5024 self.assertGreater(n, 0) 5025 for i in range(LIMIT): 5026 n = self.write_file.write(BIG) 5027 if n is None: 5028 # Succeeded 5029 break 5030 self.assertGreater(n, 0) 5031 else: 5032 # Let us know that this test didn't manage to establish 5033 # the expected conditions. This is not a failure in itself but, 5034 # if it happens repeatedly, the test should be fixed. 5035 self.serv_skipped = "failed to saturate the socket buffer" 5036 5037 5038class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5039 5040 bufsize = 1 # Default-buffered for reading; line-buffered for writing 5041 5042 5043class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5044 5045 bufsize = 2 # Exercise the buffering code 5046 5047 5048class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): 5049 """Tests for socket.makefile() in text mode (rather than binary)""" 5050 5051 read_mode = 'r' 5052 read_msg = MSG.decode('utf-8') 5053 write_mode = 'wb' 5054 write_msg = MSG 5055 newline = '' 5056 5057 5058class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): 5059 """Tests for socket.makefile() in text mode (rather than binary)""" 5060 5061 read_mode = 'rb' 5062 read_msg = MSG 5063 write_mode = 'w' 5064 write_msg = MSG.decode('utf-8') 5065 newline = '' 5066 5067 5068class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): 5069 """Tests for socket.makefile() in text mode (rather than binary)""" 5070 5071 read_mode = 'r' 5072 read_msg = MSG.decode('utf-8') 5073 write_mode = 'w' 5074 write_msg = MSG.decode('utf-8') 5075 newline = '' 5076 5077 5078class NetworkConnectionTest(object): 5079 """Prove network connection.""" 5080 5081 def clientSetUp(self): 5082 # We're inherited below by BasicTCPTest2, which also inherits 5083 # BasicTCPTest, which defines self.port referenced below. 5084 self.cli = socket.create_connection((HOST, self.port)) 5085 self.serv_conn = self.cli 5086 5087class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 5088 """Tests that NetworkConnection does not break existing TCP functionality. 5089 """ 5090 5091class NetworkConnectionNoServer(unittest.TestCase): 5092 5093 class MockSocket(socket.socket): 5094 def connect(self, *args): 5095 raise socket.timeout('timed out') 5096 5097 @contextlib.contextmanager 5098 def mocked_socket_module(self): 5099 """Return a socket which times out on connect""" 5100 old_socket = socket.socket 5101 socket.socket = self.MockSocket 5102 try: 5103 yield 5104 finally: 5105 socket.socket = old_socket 5106 5107 def test_connect(self): 5108 port = socket_helper.find_unused_port() 5109 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5110 self.addCleanup(cli.close) 5111 with self.assertRaises(OSError) as cm: 5112 cli.connect((HOST, port)) 5113 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 5114 5115 def test_create_connection(self): 5116 # Issue #9792: errors raised by create_connection() should have 5117 # a proper errno attribute. 5118 port = socket_helper.find_unused_port() 5119 with self.assertRaises(OSError) as cm: 5120 socket.create_connection((HOST, port)) 5121 5122 # Issue #16257: create_connection() calls getaddrinfo() against 5123 # 'localhost'. This may result in an IPV6 addr being returned 5124 # as well as an IPV4 one: 5125 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 5126 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 5127 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 5128 # 5129 # create_connection() enumerates through all the addresses returned 5130 # and if it doesn't successfully bind to any of them, it propagates 5131 # the last exception it encountered. 5132 # 5133 # On Solaris, ENETUNREACH is returned in this circumstance instead 5134 # of ECONNREFUSED. So, if that errno exists, add it to our list of 5135 # expected errnos. 5136 expected_errnos = socket_helper.get_socket_conn_refused_errs() 5137 self.assertIn(cm.exception.errno, expected_errnos) 5138 5139 def test_create_connection_timeout(self): 5140 # Issue #9792: create_connection() should not recast timeout errors 5141 # as generic socket errors. 5142 with self.mocked_socket_module(): 5143 try: 5144 socket.create_connection((HOST, 1234)) 5145 except socket.timeout: 5146 pass 5147 except OSError as exc: 5148 if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: 5149 raise 5150 else: 5151 self.fail('socket.timeout not raised') 5152 5153 5154class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 5155 5156 def __init__(self, methodName='runTest'): 5157 SocketTCPTest.__init__(self, methodName=methodName) 5158 ThreadableTest.__init__(self) 5159 5160 def clientSetUp(self): 5161 self.source_port = socket_helper.find_unused_port() 5162 5163 def clientTearDown(self): 5164 self.cli.close() 5165 self.cli = None 5166 ThreadableTest.clientTearDown(self) 5167 5168 def _justAccept(self): 5169 conn, addr = self.serv.accept() 5170 conn.close() 5171 5172 testFamily = _justAccept 5173 def _testFamily(self): 5174 self.cli = socket.create_connection((HOST, self.port), 5175 timeout=support.LOOPBACK_TIMEOUT) 5176 self.addCleanup(self.cli.close) 5177 self.assertEqual(self.cli.family, 2) 5178 5179 testSourceAddress = _justAccept 5180 def _testSourceAddress(self): 5181 self.cli = socket.create_connection((HOST, self.port), 5182 timeout=support.LOOPBACK_TIMEOUT, 5183 source_address=('', self.source_port)) 5184 self.addCleanup(self.cli.close) 5185 self.assertEqual(self.cli.getsockname()[1], self.source_port) 5186 # The port number being used is sufficient to show that the bind() 5187 # call happened. 5188 5189 testTimeoutDefault = _justAccept 5190 def _testTimeoutDefault(self): 5191 # passing no explicit timeout uses socket's global default 5192 self.assertTrue(socket.getdefaulttimeout() is None) 5193 socket.setdefaulttimeout(42) 5194 try: 5195 self.cli = socket.create_connection((HOST, self.port)) 5196 self.addCleanup(self.cli.close) 5197 finally: 5198 socket.setdefaulttimeout(None) 5199 self.assertEqual(self.cli.gettimeout(), 42) 5200 5201 testTimeoutNone = _justAccept 5202 def _testTimeoutNone(self): 5203 # None timeout means the same as sock.settimeout(None) 5204 self.assertTrue(socket.getdefaulttimeout() is None) 5205 socket.setdefaulttimeout(30) 5206 try: 5207 self.cli = socket.create_connection((HOST, self.port), timeout=None) 5208 self.addCleanup(self.cli.close) 5209 finally: 5210 socket.setdefaulttimeout(None) 5211 self.assertEqual(self.cli.gettimeout(), None) 5212 5213 testTimeoutValueNamed = _justAccept 5214 def _testTimeoutValueNamed(self): 5215 self.cli = socket.create_connection((HOST, self.port), timeout=30) 5216 self.assertEqual(self.cli.gettimeout(), 30) 5217 5218 testTimeoutValueNonamed = _justAccept 5219 def _testTimeoutValueNonamed(self): 5220 self.cli = socket.create_connection((HOST, self.port), 30) 5221 self.addCleanup(self.cli.close) 5222 self.assertEqual(self.cli.gettimeout(), 30) 5223 5224 5225class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 5226 5227 def __init__(self, methodName='runTest'): 5228 SocketTCPTest.__init__(self, methodName=methodName) 5229 ThreadableTest.__init__(self) 5230 5231 def clientSetUp(self): 5232 pass 5233 5234 def clientTearDown(self): 5235 self.cli.close() 5236 self.cli = None 5237 ThreadableTest.clientTearDown(self) 5238 5239 def testInsideTimeout(self): 5240 conn, addr = self.serv.accept() 5241 self.addCleanup(conn.close) 5242 time.sleep(3) 5243 conn.send(b"done!") 5244 testOutsideTimeout = testInsideTimeout 5245 5246 def _testInsideTimeout(self): 5247 self.cli = sock = socket.create_connection((HOST, self.port)) 5248 data = sock.recv(5) 5249 self.assertEqual(data, b"done!") 5250 5251 def _testOutsideTimeout(self): 5252 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 5253 self.assertRaises(socket.timeout, lambda: sock.recv(5)) 5254 5255 5256class TCPTimeoutTest(SocketTCPTest): 5257 5258 def testTCPTimeout(self): 5259 def raise_timeout(*args, **kwargs): 5260 self.serv.settimeout(1.0) 5261 self.serv.accept() 5262 self.assertRaises(socket.timeout, raise_timeout, 5263 "Error generating a timeout exception (TCP)") 5264 5265 def testTimeoutZero(self): 5266 ok = False 5267 try: 5268 self.serv.settimeout(0.0) 5269 foo = self.serv.accept() 5270 except socket.timeout: 5271 self.fail("caught timeout instead of error (TCP)") 5272 except OSError: 5273 ok = True 5274 except: 5275 self.fail("caught unexpected exception (TCP)") 5276 if not ok: 5277 self.fail("accept() returned success when we did not expect it") 5278 5279 @unittest.skipUnless(hasattr(signal, 'alarm'), 5280 'test needs signal.alarm()') 5281 def testInterruptedTimeout(self): 5282 # XXX I don't know how to do this test on MSWindows or any other 5283 # platform that doesn't support signal.alarm() or os.kill(), though 5284 # the bug should have existed on all platforms. 5285 self.serv.settimeout(5.0) # must be longer than alarm 5286 class Alarm(Exception): 5287 pass 5288 def alarm_handler(signal, frame): 5289 raise Alarm 5290 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 5291 try: 5292 try: 5293 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 5294 foo = self.serv.accept() 5295 except socket.timeout: 5296 self.fail("caught timeout instead of Alarm") 5297 except Alarm: 5298 pass 5299 except: 5300 self.fail("caught other exception instead of Alarm:" 5301 " %s(%s):\n%s" % 5302 (sys.exc_info()[:2] + (traceback.format_exc(),))) 5303 else: 5304 self.fail("nothing caught") 5305 finally: 5306 signal.alarm(0) # shut off alarm 5307 except Alarm: 5308 self.fail("got Alarm in wrong place") 5309 finally: 5310 # no alarm can be pending. Safe to restore old handler. 5311 signal.signal(signal.SIGALRM, old_alarm) 5312 5313class UDPTimeoutTest(SocketUDPTest): 5314 5315 def testUDPTimeout(self): 5316 def raise_timeout(*args, **kwargs): 5317 self.serv.settimeout(1.0) 5318 self.serv.recv(1024) 5319 self.assertRaises(socket.timeout, raise_timeout, 5320 "Error generating a timeout exception (UDP)") 5321 5322 def testTimeoutZero(self): 5323 ok = False 5324 try: 5325 self.serv.settimeout(0.0) 5326 foo = self.serv.recv(1024) 5327 except socket.timeout: 5328 self.fail("caught timeout instead of error (UDP)") 5329 except OSError: 5330 ok = True 5331 except: 5332 self.fail("caught unexpected exception (UDP)") 5333 if not ok: 5334 self.fail("recv() returned success when we did not expect it") 5335 5336@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 5337 'UDPLITE sockets required for this test.') 5338class UDPLITETimeoutTest(SocketUDPLITETest): 5339 5340 def testUDPLITETimeout(self): 5341 def raise_timeout(*args, **kwargs): 5342 self.serv.settimeout(1.0) 5343 self.serv.recv(1024) 5344 self.assertRaises(socket.timeout, raise_timeout, 5345 "Error generating a timeout exception (UDPLITE)") 5346 5347 def testTimeoutZero(self): 5348 ok = False 5349 try: 5350 self.serv.settimeout(0.0) 5351 foo = self.serv.recv(1024) 5352 except socket.timeout: 5353 self.fail("caught timeout instead of error (UDPLITE)") 5354 except OSError: 5355 ok = True 5356 except: 5357 self.fail("caught unexpected exception (UDPLITE)") 5358 if not ok: 5359 self.fail("recv() returned success when we did not expect it") 5360 5361class TestExceptions(unittest.TestCase): 5362 5363 def testExceptionTree(self): 5364 self.assertTrue(issubclass(OSError, Exception)) 5365 self.assertTrue(issubclass(socket.herror, OSError)) 5366 self.assertTrue(issubclass(socket.gaierror, OSError)) 5367 self.assertTrue(issubclass(socket.timeout, OSError)) 5368 5369 def test_setblocking_invalidfd(self): 5370 # Regression test for issue #28471 5371 5372 sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 5373 sock = socket.socket( 5374 socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) 5375 sock0.close() 5376 self.addCleanup(sock.detach) 5377 5378 with self.assertRaises(OSError): 5379 sock.setblocking(False) 5380 5381 5382@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 5383class TestLinuxAbstractNamespace(unittest.TestCase): 5384 5385 UNIX_PATH_MAX = 108 5386 5387 def testLinuxAbstractNamespace(self): 5388 address = b"\x00python-test-hello\x00\xff" 5389 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 5390 s1.bind(address) 5391 s1.listen() 5392 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 5393 s2.connect(s1.getsockname()) 5394 with s1.accept()[0] as s3: 5395 self.assertEqual(s1.getsockname(), address) 5396 self.assertEqual(s2.getpeername(), address) 5397 5398 def testMaxName(self): 5399 address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) 5400 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5401 s.bind(address) 5402 self.assertEqual(s.getsockname(), address) 5403 5404 def testNameOverflow(self): 5405 address = "\x00" + "h" * self.UNIX_PATH_MAX 5406 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5407 self.assertRaises(OSError, s.bind, address) 5408 5409 def testStrName(self): 5410 # Check that an abstract name can be passed as a string. 5411 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5412 try: 5413 s.bind("\x00python\x00test\x00") 5414 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5415 finally: 5416 s.close() 5417 5418 def testBytearrayName(self): 5419 # Check that an abstract name can be passed as a bytearray. 5420 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5421 s.bind(bytearray(b"\x00python\x00test\x00")) 5422 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5423 5424@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') 5425class TestUnixDomain(unittest.TestCase): 5426 5427 def setUp(self): 5428 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5429 5430 def tearDown(self): 5431 self.sock.close() 5432 5433 def encoded(self, path): 5434 # Return the given path encoded in the file system encoding, 5435 # or skip the test if this is not possible. 5436 try: 5437 return os.fsencode(path) 5438 except UnicodeEncodeError: 5439 self.skipTest( 5440 "Pathname {0!a} cannot be represented in file " 5441 "system encoding {1!r}".format( 5442 path, sys.getfilesystemencoding())) 5443 5444 def bind(self, sock, path): 5445 # Bind the socket 5446 try: 5447 socket_helper.bind_unix_socket(sock, path) 5448 except OSError as e: 5449 if str(e) == "AF_UNIX path too long": 5450 self.skipTest( 5451 "Pathname {0!a} is too long to serve as an AF_UNIX path" 5452 .format(path)) 5453 else: 5454 raise 5455 5456 def testUnbound(self): 5457 # Issue #30205 (note getsockname() can return None on OS X) 5458 self.assertIn(self.sock.getsockname(), ('', None)) 5459 5460 def testStrAddr(self): 5461 # Test binding to and retrieving a normal string pathname. 5462 path = os.path.abspath(support.TESTFN) 5463 self.bind(self.sock, path) 5464 self.addCleanup(support.unlink, path) 5465 self.assertEqual(self.sock.getsockname(), path) 5466 5467 def testBytesAddr(self): 5468 # Test binding to a bytes pathname. 5469 path = os.path.abspath(support.TESTFN) 5470 self.bind(self.sock, self.encoded(path)) 5471 self.addCleanup(support.unlink, path) 5472 self.assertEqual(self.sock.getsockname(), path) 5473 5474 def testSurrogateescapeBind(self): 5475 # Test binding to a valid non-ASCII pathname, with the 5476 # non-ASCII bytes supplied using surrogateescape encoding. 5477 path = os.path.abspath(support.TESTFN_UNICODE) 5478 b = self.encoded(path) 5479 self.bind(self.sock, b.decode("ascii", "surrogateescape")) 5480 self.addCleanup(support.unlink, path) 5481 self.assertEqual(self.sock.getsockname(), path) 5482 5483 def testUnencodableAddr(self): 5484 # Test binding to a pathname that cannot be encoded in the 5485 # file system encoding. 5486 if support.TESTFN_UNENCODABLE is None: 5487 self.skipTest("No unencodable filename available") 5488 path = os.path.abspath(support.TESTFN_UNENCODABLE) 5489 self.bind(self.sock, path) 5490 self.addCleanup(support.unlink, path) 5491 self.assertEqual(self.sock.getsockname(), path) 5492 5493 5494class BufferIOTest(SocketConnectedTest): 5495 """ 5496 Test the buffer versions of socket.recv() and socket.send(). 5497 """ 5498 def __init__(self, methodName='runTest'): 5499 SocketConnectedTest.__init__(self, methodName=methodName) 5500 5501 def testRecvIntoArray(self): 5502 buf = array.array("B", [0] * len(MSG)) 5503 nbytes = self.cli_conn.recv_into(buf) 5504 self.assertEqual(nbytes, len(MSG)) 5505 buf = buf.tobytes() 5506 msg = buf[:len(MSG)] 5507 self.assertEqual(msg, MSG) 5508 5509 def _testRecvIntoArray(self): 5510 buf = bytes(MSG) 5511 self.serv_conn.send(buf) 5512 5513 def testRecvIntoBytearray(self): 5514 buf = bytearray(1024) 5515 nbytes = self.cli_conn.recv_into(buf) 5516 self.assertEqual(nbytes, len(MSG)) 5517 msg = buf[:len(MSG)] 5518 self.assertEqual(msg, MSG) 5519 5520 _testRecvIntoBytearray = _testRecvIntoArray 5521 5522 def testRecvIntoMemoryview(self): 5523 buf = bytearray(1024) 5524 nbytes = self.cli_conn.recv_into(memoryview(buf)) 5525 self.assertEqual(nbytes, len(MSG)) 5526 msg = buf[:len(MSG)] 5527 self.assertEqual(msg, MSG) 5528 5529 _testRecvIntoMemoryview = _testRecvIntoArray 5530 5531 def testRecvFromIntoArray(self): 5532 buf = array.array("B", [0] * len(MSG)) 5533 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5534 self.assertEqual(nbytes, len(MSG)) 5535 buf = buf.tobytes() 5536 msg = buf[:len(MSG)] 5537 self.assertEqual(msg, MSG) 5538 5539 def _testRecvFromIntoArray(self): 5540 buf = bytes(MSG) 5541 self.serv_conn.send(buf) 5542 5543 def testRecvFromIntoBytearray(self): 5544 buf = bytearray(1024) 5545 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5546 self.assertEqual(nbytes, len(MSG)) 5547 msg = buf[:len(MSG)] 5548 self.assertEqual(msg, MSG) 5549 5550 _testRecvFromIntoBytearray = _testRecvFromIntoArray 5551 5552 def testRecvFromIntoMemoryview(self): 5553 buf = bytearray(1024) 5554 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 5555 self.assertEqual(nbytes, len(MSG)) 5556 msg = buf[:len(MSG)] 5557 self.assertEqual(msg, MSG) 5558 5559 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 5560 5561 def testRecvFromIntoSmallBuffer(self): 5562 # See issue #20246. 5563 buf = bytearray(8) 5564 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 5565 5566 def _testRecvFromIntoSmallBuffer(self): 5567 self.serv_conn.send(MSG) 5568 5569 def testRecvFromIntoEmptyBuffer(self): 5570 buf = bytearray() 5571 self.cli_conn.recvfrom_into(buf) 5572 self.cli_conn.recvfrom_into(buf, 0) 5573 5574 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 5575 5576 5577TIPC_STYPE = 2000 5578TIPC_LOWER = 200 5579TIPC_UPPER = 210 5580 5581def isTipcAvailable(): 5582 """Check if the TIPC module is loaded 5583 5584 The TIPC module is not loaded automatically on Ubuntu and probably 5585 other Linux distros. 5586 """ 5587 if not hasattr(socket, "AF_TIPC"): 5588 return False 5589 try: 5590 f = open("/proc/modules") 5591 except (FileNotFoundError, IsADirectoryError, PermissionError): 5592 # It's ok if the file does not exist, is a directory or if we 5593 # have not the permission to read it. 5594 return False 5595 with f: 5596 for line in f: 5597 if line.startswith("tipc "): 5598 return True 5599 return False 5600 5601@unittest.skipUnless(isTipcAvailable(), 5602 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5603class TIPCTest(unittest.TestCase): 5604 def testRDM(self): 5605 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5606 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5607 self.addCleanup(srv.close) 5608 self.addCleanup(cli.close) 5609 5610 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5611 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5612 TIPC_LOWER, TIPC_UPPER) 5613 srv.bind(srvaddr) 5614 5615 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5616 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5617 cli.sendto(MSG, sendaddr) 5618 5619 msg, recvaddr = srv.recvfrom(1024) 5620 5621 self.assertEqual(cli.getsockname(), recvaddr) 5622 self.assertEqual(msg, MSG) 5623 5624 5625@unittest.skipUnless(isTipcAvailable(), 5626 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5627class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 5628 def __init__(self, methodName = 'runTest'): 5629 unittest.TestCase.__init__(self, methodName = methodName) 5630 ThreadableTest.__init__(self) 5631 5632 def setUp(self): 5633 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5634 self.addCleanup(self.srv.close) 5635 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5636 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5637 TIPC_LOWER, TIPC_UPPER) 5638 self.srv.bind(srvaddr) 5639 self.srv.listen() 5640 self.serverExplicitReady() 5641 self.conn, self.connaddr = self.srv.accept() 5642 self.addCleanup(self.conn.close) 5643 5644 def clientSetUp(self): 5645 # There is a hittable race between serverExplicitReady() and the 5646 # accept() call; sleep a little while to avoid it, otherwise 5647 # we could get an exception 5648 time.sleep(0.1) 5649 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5650 self.addCleanup(self.cli.close) 5651 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5652 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5653 self.cli.connect(addr) 5654 self.cliaddr = self.cli.getsockname() 5655 5656 def testStream(self): 5657 msg = self.conn.recv(1024) 5658 self.assertEqual(msg, MSG) 5659 self.assertEqual(self.cliaddr, self.connaddr) 5660 5661 def _testStream(self): 5662 self.cli.send(MSG) 5663 self.cli.close() 5664 5665 5666class ContextManagersTest(ThreadedTCPSocketTest): 5667 5668 def _testSocketClass(self): 5669 # base test 5670 with socket.socket() as sock: 5671 self.assertFalse(sock._closed) 5672 self.assertTrue(sock._closed) 5673 # close inside with block 5674 with socket.socket() as sock: 5675 sock.close() 5676 self.assertTrue(sock._closed) 5677 # exception inside with block 5678 with socket.socket() as sock: 5679 self.assertRaises(OSError, sock.sendall, b'foo') 5680 self.assertTrue(sock._closed) 5681 5682 def testCreateConnectionBase(self): 5683 conn, addr = self.serv.accept() 5684 self.addCleanup(conn.close) 5685 data = conn.recv(1024) 5686 conn.sendall(data) 5687 5688 def _testCreateConnectionBase(self): 5689 address = self.serv.getsockname() 5690 with socket.create_connection(address) as sock: 5691 self.assertFalse(sock._closed) 5692 sock.sendall(b'foo') 5693 self.assertEqual(sock.recv(1024), b'foo') 5694 self.assertTrue(sock._closed) 5695 5696 def testCreateConnectionClose(self): 5697 conn, addr = self.serv.accept() 5698 self.addCleanup(conn.close) 5699 data = conn.recv(1024) 5700 conn.sendall(data) 5701 5702 def _testCreateConnectionClose(self): 5703 address = self.serv.getsockname() 5704 with socket.create_connection(address) as sock: 5705 sock.close() 5706 self.assertTrue(sock._closed) 5707 self.assertRaises(OSError, sock.sendall, b'foo') 5708 5709 5710class InheritanceTest(unittest.TestCase): 5711 @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), 5712 "SOCK_CLOEXEC not defined") 5713 @support.requires_linux_version(2, 6, 28) 5714 def test_SOCK_CLOEXEC(self): 5715 with socket.socket(socket.AF_INET, 5716 socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: 5717 self.assertEqual(s.type, socket.SOCK_STREAM) 5718 self.assertFalse(s.get_inheritable()) 5719 5720 def test_default_inheritable(self): 5721 sock = socket.socket() 5722 with sock: 5723 self.assertEqual(sock.get_inheritable(), False) 5724 5725 def test_dup(self): 5726 sock = socket.socket() 5727 with sock: 5728 newsock = sock.dup() 5729 sock.close() 5730 with newsock: 5731 self.assertEqual(newsock.get_inheritable(), False) 5732 5733 def test_set_inheritable(self): 5734 sock = socket.socket() 5735 with sock: 5736 sock.set_inheritable(True) 5737 self.assertEqual(sock.get_inheritable(), True) 5738 5739 sock.set_inheritable(False) 5740 self.assertEqual(sock.get_inheritable(), False) 5741 5742 @unittest.skipIf(fcntl is None, "need fcntl") 5743 def test_get_inheritable_cloexec(self): 5744 sock = socket.socket() 5745 with sock: 5746 fd = sock.fileno() 5747 self.assertEqual(sock.get_inheritable(), False) 5748 5749 # clear FD_CLOEXEC flag 5750 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 5751 flags &= ~fcntl.FD_CLOEXEC 5752 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 5753 5754 self.assertEqual(sock.get_inheritable(), True) 5755 5756 @unittest.skipIf(fcntl is None, "need fcntl") 5757 def test_set_inheritable_cloexec(self): 5758 sock = socket.socket() 5759 with sock: 5760 fd = sock.fileno() 5761 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5762 fcntl.FD_CLOEXEC) 5763 5764 sock.set_inheritable(True) 5765 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5766 0) 5767 5768 5769 def test_socketpair(self): 5770 s1, s2 = socket.socketpair() 5771 self.addCleanup(s1.close) 5772 self.addCleanup(s2.close) 5773 self.assertEqual(s1.get_inheritable(), False) 5774 self.assertEqual(s2.get_inheritable(), False) 5775 5776 5777@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), 5778 "SOCK_NONBLOCK not defined") 5779class NonblockConstantTest(unittest.TestCase): 5780 def checkNonblock(self, s, nonblock=True, timeout=0.0): 5781 if nonblock: 5782 self.assertEqual(s.type, socket.SOCK_STREAM) 5783 self.assertEqual(s.gettimeout(), timeout) 5784 self.assertTrue( 5785 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5786 if timeout == 0: 5787 # timeout == 0: means that getblocking() must be False. 5788 self.assertFalse(s.getblocking()) 5789 else: 5790 # If timeout > 0, the socket will be in a "blocking" mode 5791 # from the standpoint of the Python API. For Python socket 5792 # object, "blocking" means that operations like 'sock.recv()' 5793 # will block. Internally, file descriptors for 5794 # "blocking" Python sockets *with timeouts* are in a 5795 # *non-blocking* mode, and 'sock.recv()' uses 'select()' 5796 # and handles EWOULDBLOCK/EAGAIN to enforce the timeout. 5797 self.assertTrue(s.getblocking()) 5798 else: 5799 self.assertEqual(s.type, socket.SOCK_STREAM) 5800 self.assertEqual(s.gettimeout(), None) 5801 self.assertFalse( 5802 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5803 self.assertTrue(s.getblocking()) 5804 5805 @support.requires_linux_version(2, 6, 28) 5806 def test_SOCK_NONBLOCK(self): 5807 # a lot of it seems silly and redundant, but I wanted to test that 5808 # changing back and forth worked ok 5809 with socket.socket(socket.AF_INET, 5810 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: 5811 self.checkNonblock(s) 5812 s.setblocking(True) 5813 self.checkNonblock(s, nonblock=False) 5814 s.setblocking(False) 5815 self.checkNonblock(s) 5816 s.settimeout(None) 5817 self.checkNonblock(s, nonblock=False) 5818 s.settimeout(2.0) 5819 self.checkNonblock(s, timeout=2.0) 5820 s.setblocking(True) 5821 self.checkNonblock(s, nonblock=False) 5822 # defaulttimeout 5823 t = socket.getdefaulttimeout() 5824 socket.setdefaulttimeout(0.0) 5825 with socket.socket() as s: 5826 self.checkNonblock(s) 5827 socket.setdefaulttimeout(None) 5828 with socket.socket() as s: 5829 self.checkNonblock(s, False) 5830 socket.setdefaulttimeout(2.0) 5831 with socket.socket() as s: 5832 self.checkNonblock(s, timeout=2.0) 5833 socket.setdefaulttimeout(None) 5834 with socket.socket() as s: 5835 self.checkNonblock(s, False) 5836 socket.setdefaulttimeout(t) 5837 5838 5839@unittest.skipUnless(os.name == "nt", "Windows specific") 5840@unittest.skipUnless(multiprocessing, "need multiprocessing") 5841class TestSocketSharing(SocketTCPTest): 5842 # This must be classmethod and not staticmethod or multiprocessing 5843 # won't be able to bootstrap it. 5844 @classmethod 5845 def remoteProcessServer(cls, q): 5846 # Recreate socket from shared data 5847 sdata = q.get() 5848 message = q.get() 5849 5850 s = socket.fromshare(sdata) 5851 s2, c = s.accept() 5852 5853 # Send the message 5854 s2.sendall(message) 5855 s2.close() 5856 s.close() 5857 5858 def testShare(self): 5859 # Transfer the listening server socket to another process 5860 # and service it from there. 5861 5862 # Create process: 5863 q = multiprocessing.Queue() 5864 p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) 5865 p.start() 5866 5867 # Get the shared socket data 5868 data = self.serv.share(p.pid) 5869 5870 # Pass the shared socket to the other process 5871 addr = self.serv.getsockname() 5872 self.serv.close() 5873 q.put(data) 5874 5875 # The data that the server will send us 5876 message = b"slapmahfro" 5877 q.put(message) 5878 5879 # Connect 5880 s = socket.create_connection(addr) 5881 # listen for the data 5882 m = [] 5883 while True: 5884 data = s.recv(100) 5885 if not data: 5886 break 5887 m.append(data) 5888 s.close() 5889 received = b"".join(m) 5890 self.assertEqual(received, message) 5891 p.join() 5892 5893 def testShareLength(self): 5894 data = self.serv.share(os.getpid()) 5895 self.assertRaises(ValueError, socket.fromshare, data[:-1]) 5896 self.assertRaises(ValueError, socket.fromshare, data+b"foo") 5897 5898 def compareSockets(self, org, other): 5899 # socket sharing is expected to work only for blocking socket 5900 # since the internal python timeout value isn't transferred. 5901 self.assertEqual(org.gettimeout(), None) 5902 self.assertEqual(org.gettimeout(), other.gettimeout()) 5903 5904 self.assertEqual(org.family, other.family) 5905 self.assertEqual(org.type, other.type) 5906 # If the user specified "0" for proto, then 5907 # internally windows will have picked the correct value. 5908 # Python introspection on the socket however will still return 5909 # 0. For the shared socket, the python value is recreated 5910 # from the actual value, so it may not compare correctly. 5911 if org.proto != 0: 5912 self.assertEqual(org.proto, other.proto) 5913 5914 def testShareLocal(self): 5915 data = self.serv.share(os.getpid()) 5916 s = socket.fromshare(data) 5917 try: 5918 self.compareSockets(self.serv, s) 5919 finally: 5920 s.close() 5921 5922 def testTypes(self): 5923 families = [socket.AF_INET, socket.AF_INET6] 5924 types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] 5925 for f in families: 5926 for t in types: 5927 try: 5928 source = socket.socket(f, t) 5929 except OSError: 5930 continue # This combination is not supported 5931 try: 5932 data = source.share(os.getpid()) 5933 shared = socket.fromshare(data) 5934 try: 5935 self.compareSockets(source, shared) 5936 finally: 5937 shared.close() 5938 finally: 5939 source.close() 5940 5941 5942class SendfileUsingSendTest(ThreadedTCPSocketTest): 5943 """ 5944 Test the send() implementation of socket.sendfile(). 5945 """ 5946 5947 FILESIZE = (10 * 1024 * 1024) # 10 MiB 5948 BUFSIZE = 8192 5949 FILEDATA = b"" 5950 TIMEOUT = support.LOOPBACK_TIMEOUT 5951 5952 @classmethod 5953 def setUpClass(cls): 5954 def chunks(total, step): 5955 assert total >= step 5956 while total > step: 5957 yield step 5958 total -= step 5959 if total: 5960 yield total 5961 5962 chunk = b"".join([random.choice(string.ascii_letters).encode() 5963 for i in range(cls.BUFSIZE)]) 5964 with open(support.TESTFN, 'wb') as f: 5965 for csize in chunks(cls.FILESIZE, cls.BUFSIZE): 5966 f.write(chunk) 5967 with open(support.TESTFN, 'rb') as f: 5968 cls.FILEDATA = f.read() 5969 assert len(cls.FILEDATA) == cls.FILESIZE 5970 5971 @classmethod 5972 def tearDownClass(cls): 5973 support.unlink(support.TESTFN) 5974 5975 def accept_conn(self): 5976 self.serv.settimeout(support.LONG_TIMEOUT) 5977 conn, addr = self.serv.accept() 5978 conn.settimeout(self.TIMEOUT) 5979 self.addCleanup(conn.close) 5980 return conn 5981 5982 def recv_data(self, conn): 5983 received = [] 5984 while True: 5985 chunk = conn.recv(self.BUFSIZE) 5986 if not chunk: 5987 break 5988 received.append(chunk) 5989 return b''.join(received) 5990 5991 def meth_from_sock(self, sock): 5992 # Depending on the mixin class being run return either send() 5993 # or sendfile() method implementation. 5994 return getattr(sock, "_sendfile_use_send") 5995 5996 # regular file 5997 5998 def _testRegularFile(self): 5999 address = self.serv.getsockname() 6000 file = open(support.TESTFN, 'rb') 6001 with socket.create_connection(address) as sock, file as file: 6002 meth = self.meth_from_sock(sock) 6003 sent = meth(file) 6004 self.assertEqual(sent, self.FILESIZE) 6005 self.assertEqual(file.tell(), self.FILESIZE) 6006 6007 def testRegularFile(self): 6008 conn = self.accept_conn() 6009 data = self.recv_data(conn) 6010 self.assertEqual(len(data), self.FILESIZE) 6011 self.assertEqual(data, self.FILEDATA) 6012 6013 # non regular file 6014 6015 def _testNonRegularFile(self): 6016 address = self.serv.getsockname() 6017 file = io.BytesIO(self.FILEDATA) 6018 with socket.create_connection(address) as sock, file as file: 6019 sent = sock.sendfile(file) 6020 self.assertEqual(sent, self.FILESIZE) 6021 self.assertEqual(file.tell(), self.FILESIZE) 6022 self.assertRaises(socket._GiveupOnSendfile, 6023 sock._sendfile_use_sendfile, file) 6024 6025 def testNonRegularFile(self): 6026 conn = self.accept_conn() 6027 data = self.recv_data(conn) 6028 self.assertEqual(len(data), self.FILESIZE) 6029 self.assertEqual(data, self.FILEDATA) 6030 6031 # empty file 6032 6033 def _testEmptyFileSend(self): 6034 address = self.serv.getsockname() 6035 filename = support.TESTFN + "2" 6036 with open(filename, 'wb'): 6037 self.addCleanup(support.unlink, filename) 6038 file = open(filename, 'rb') 6039 with socket.create_connection(address) as sock, file as file: 6040 meth = self.meth_from_sock(sock) 6041 sent = meth(file) 6042 self.assertEqual(sent, 0) 6043 self.assertEqual(file.tell(), 0) 6044 6045 def testEmptyFileSend(self): 6046 conn = self.accept_conn() 6047 data = self.recv_data(conn) 6048 self.assertEqual(data, b"") 6049 6050 # offset 6051 6052 def _testOffset(self): 6053 address = self.serv.getsockname() 6054 file = open(support.TESTFN, 'rb') 6055 with socket.create_connection(address) as sock, file as file: 6056 meth = self.meth_from_sock(sock) 6057 sent = meth(file, offset=5000) 6058 self.assertEqual(sent, self.FILESIZE - 5000) 6059 self.assertEqual(file.tell(), self.FILESIZE) 6060 6061 def testOffset(self): 6062 conn = self.accept_conn() 6063 data = self.recv_data(conn) 6064 self.assertEqual(len(data), self.FILESIZE - 5000) 6065 self.assertEqual(data, self.FILEDATA[5000:]) 6066 6067 # count 6068 6069 def _testCount(self): 6070 address = self.serv.getsockname() 6071 file = open(support.TESTFN, 'rb') 6072 sock = socket.create_connection(address, 6073 timeout=support.LOOPBACK_TIMEOUT) 6074 with sock, file: 6075 count = 5000007 6076 meth = self.meth_from_sock(sock) 6077 sent = meth(file, count=count) 6078 self.assertEqual(sent, count) 6079 self.assertEqual(file.tell(), count) 6080 6081 def testCount(self): 6082 count = 5000007 6083 conn = self.accept_conn() 6084 data = self.recv_data(conn) 6085 self.assertEqual(len(data), count) 6086 self.assertEqual(data, self.FILEDATA[:count]) 6087 6088 # count small 6089 6090 def _testCountSmall(self): 6091 address = self.serv.getsockname() 6092 file = open(support.TESTFN, 'rb') 6093 sock = socket.create_connection(address, 6094 timeout=support.LOOPBACK_TIMEOUT) 6095 with sock, file: 6096 count = 1 6097 meth = self.meth_from_sock(sock) 6098 sent = meth(file, count=count) 6099 self.assertEqual(sent, count) 6100 self.assertEqual(file.tell(), count) 6101 6102 def testCountSmall(self): 6103 count = 1 6104 conn = self.accept_conn() 6105 data = self.recv_data(conn) 6106 self.assertEqual(len(data), count) 6107 self.assertEqual(data, self.FILEDATA[:count]) 6108 6109 # count + offset 6110 6111 def _testCountWithOffset(self): 6112 address = self.serv.getsockname() 6113 file = open(support.TESTFN, 'rb') 6114 with socket.create_connection(address, timeout=2) as sock, file as file: 6115 count = 100007 6116 meth = self.meth_from_sock(sock) 6117 sent = meth(file, offset=2007, count=count) 6118 self.assertEqual(sent, count) 6119 self.assertEqual(file.tell(), count + 2007) 6120 6121 def testCountWithOffset(self): 6122 count = 100007 6123 conn = self.accept_conn() 6124 data = self.recv_data(conn) 6125 self.assertEqual(len(data), count) 6126 self.assertEqual(data, self.FILEDATA[2007:count+2007]) 6127 6128 # non blocking sockets are not supposed to work 6129 6130 def _testNonBlocking(self): 6131 address = self.serv.getsockname() 6132 file = open(support.TESTFN, 'rb') 6133 with socket.create_connection(address) as sock, file as file: 6134 sock.setblocking(False) 6135 meth = self.meth_from_sock(sock) 6136 self.assertRaises(ValueError, meth, file) 6137 self.assertRaises(ValueError, sock.sendfile, file) 6138 6139 def testNonBlocking(self): 6140 conn = self.accept_conn() 6141 if conn.recv(8192): 6142 self.fail('was not supposed to receive any data') 6143 6144 # timeout (non-triggered) 6145 6146 def _testWithTimeout(self): 6147 address = self.serv.getsockname() 6148 file = open(support.TESTFN, 'rb') 6149 sock = socket.create_connection(address, 6150 timeout=support.LOOPBACK_TIMEOUT) 6151 with sock, file: 6152 meth = self.meth_from_sock(sock) 6153 sent = meth(file) 6154 self.assertEqual(sent, self.FILESIZE) 6155 6156 def testWithTimeout(self): 6157 conn = self.accept_conn() 6158 data = self.recv_data(conn) 6159 self.assertEqual(len(data), self.FILESIZE) 6160 self.assertEqual(data, self.FILEDATA) 6161 6162 # timeout (triggered) 6163 6164 def _testWithTimeoutTriggeredSend(self): 6165 address = self.serv.getsockname() 6166 with open(support.TESTFN, 'rb') as file: 6167 with socket.create_connection(address) as sock: 6168 sock.settimeout(0.01) 6169 meth = self.meth_from_sock(sock) 6170 self.assertRaises(socket.timeout, meth, file) 6171 6172 def testWithTimeoutTriggeredSend(self): 6173 conn = self.accept_conn() 6174 conn.recv(88192) 6175 6176 # errors 6177 6178 def _test_errors(self): 6179 pass 6180 6181 def test_errors(self): 6182 with open(support.TESTFN, 'rb') as file: 6183 with socket.socket(type=socket.SOCK_DGRAM) as s: 6184 meth = self.meth_from_sock(s) 6185 self.assertRaisesRegex( 6186 ValueError, "SOCK_STREAM", meth, file) 6187 with open(support.TESTFN, 'rt') as file: 6188 with socket.socket() as s: 6189 meth = self.meth_from_sock(s) 6190 self.assertRaisesRegex( 6191 ValueError, "binary mode", meth, file) 6192 with open(support.TESTFN, 'rb') as file: 6193 with socket.socket() as s: 6194 meth = self.meth_from_sock(s) 6195 self.assertRaisesRegex(TypeError, "positive integer", 6196 meth, file, count='2') 6197 self.assertRaisesRegex(TypeError, "positive integer", 6198 meth, file, count=0.1) 6199 self.assertRaisesRegex(ValueError, "positive integer", 6200 meth, file, count=0) 6201 self.assertRaisesRegex(ValueError, "positive integer", 6202 meth, file, count=-1) 6203 6204 6205@unittest.skipUnless(hasattr(os, "sendfile"), 6206 'os.sendfile() required for this test.') 6207class SendfileUsingSendfileTest(SendfileUsingSendTest): 6208 """ 6209 Test the sendfile() implementation of socket.sendfile(). 6210 """ 6211 def meth_from_sock(self, sock): 6212 return getattr(sock, "_sendfile_use_sendfile") 6213 6214 6215@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') 6216class LinuxKernelCryptoAPI(unittest.TestCase): 6217 # tests for AF_ALG 6218 def create_alg(self, typ, name): 6219 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6220 try: 6221 sock.bind((typ, name)) 6222 except FileNotFoundError as e: 6223 # type / algorithm is not available 6224 sock.close() 6225 raise unittest.SkipTest(str(e), typ, name) 6226 else: 6227 return sock 6228 6229 # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY, 6230 # at least on ppc64le architecture 6231 @support.requires_linux_version(4, 5) 6232 def test_sha256(self): 6233 expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" 6234 "177a9cb410ff61f20015ad") 6235 with self.create_alg('hash', 'sha256') as algo: 6236 op, _ = algo.accept() 6237 with op: 6238 op.sendall(b"abc") 6239 self.assertEqual(op.recv(512), expected) 6240 6241 op, _ = algo.accept() 6242 with op: 6243 op.send(b'a', socket.MSG_MORE) 6244 op.send(b'b', socket.MSG_MORE) 6245 op.send(b'c', socket.MSG_MORE) 6246 op.send(b'') 6247 self.assertEqual(op.recv(512), expected) 6248 6249 def test_hmac_sha1(self): 6250 expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") 6251 with self.create_alg('hash', 'hmac(sha1)') as algo: 6252 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") 6253 op, _ = algo.accept() 6254 with op: 6255 op.sendall(b"what do ya want for nothing?") 6256 self.assertEqual(op.recv(512), expected) 6257 6258 # Although it should work with 3.19 and newer the test blocks on 6259 # Ubuntu 15.10 with Kernel 4.2.0-19. 6260 @support.requires_linux_version(4, 3) 6261 def test_aes_cbc(self): 6262 key = bytes.fromhex('06a9214036b8a15b512e03d534120006') 6263 iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') 6264 msg = b"Single block msg" 6265 ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') 6266 msglen = len(msg) 6267 with self.create_alg('skcipher', 'cbc(aes)') as algo: 6268 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6269 op, _ = algo.accept() 6270 with op: 6271 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6272 flags=socket.MSG_MORE) 6273 op.sendall(msg) 6274 self.assertEqual(op.recv(msglen), ciphertext) 6275 6276 op, _ = algo.accept() 6277 with op: 6278 op.sendmsg_afalg([ciphertext], 6279 op=socket.ALG_OP_DECRYPT, iv=iv) 6280 self.assertEqual(op.recv(msglen), msg) 6281 6282 # long message 6283 multiplier = 1024 6284 longmsg = [msg] * multiplier 6285 op, _ = algo.accept() 6286 with op: 6287 op.sendmsg_afalg(longmsg, 6288 op=socket.ALG_OP_ENCRYPT, iv=iv) 6289 enc = op.recv(msglen * multiplier) 6290 self.assertEqual(len(enc), msglen * multiplier) 6291 self.assertEqual(enc[:msglen], ciphertext) 6292 6293 op, _ = algo.accept() 6294 with op: 6295 op.sendmsg_afalg([enc], 6296 op=socket.ALG_OP_DECRYPT, iv=iv) 6297 dec = op.recv(msglen * multiplier) 6298 self.assertEqual(len(dec), msglen * multiplier) 6299 self.assertEqual(dec, msg * multiplier) 6300 6301 @support.requires_linux_version(4, 9) # see issue29324 6302 def test_aead_aes_gcm(self): 6303 key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') 6304 iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') 6305 plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') 6306 assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') 6307 expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') 6308 expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') 6309 6310 taglen = len(expected_tag) 6311 assoclen = len(assoc) 6312 6313 with self.create_alg('aead', 'gcm(aes)') as algo: 6314 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6315 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, 6316 None, taglen) 6317 6318 # send assoc, plain and tag buffer in separate steps 6319 op, _ = algo.accept() 6320 with op: 6321 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6322 assoclen=assoclen, flags=socket.MSG_MORE) 6323 op.sendall(assoc, socket.MSG_MORE) 6324 op.sendall(plain) 6325 res = op.recv(assoclen + len(plain) + taglen) 6326 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6327 self.assertEqual(expected_tag, res[-taglen:]) 6328 6329 # now with msg 6330 op, _ = algo.accept() 6331 with op: 6332 msg = assoc + plain 6333 op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, 6334 assoclen=assoclen) 6335 res = op.recv(assoclen + len(plain) + taglen) 6336 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6337 self.assertEqual(expected_tag, res[-taglen:]) 6338 6339 # create anc data manually 6340 pack_uint32 = struct.Struct('I').pack 6341 op, _ = algo.accept() 6342 with op: 6343 msg = assoc + plain 6344 op.sendmsg( 6345 [msg], 6346 ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], 6347 [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], 6348 [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], 6349 ) 6350 ) 6351 res = op.recv(len(msg) + taglen) 6352 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6353 self.assertEqual(expected_tag, res[-taglen:]) 6354 6355 # decrypt and verify 6356 op, _ = algo.accept() 6357 with op: 6358 msg = assoc + expected_ct + expected_tag 6359 op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, 6360 assoclen=assoclen) 6361 res = op.recv(len(msg) - taglen) 6362 self.assertEqual(plain, res[assoclen:]) 6363 6364 @support.requires_linux_version(4, 3) # see test_aes_cbc 6365 def test_drbg_pr_sha256(self): 6366 # deterministic random bit generator, prediction resistance, sha256 6367 with self.create_alg('rng', 'drbg_pr_sha256') as algo: 6368 extra_seed = os.urandom(32) 6369 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) 6370 op, _ = algo.accept() 6371 with op: 6372 rn = op.recv(32) 6373 self.assertEqual(len(rn), 32) 6374 6375 def test_sendmsg_afalg_args(self): 6376 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6377 with sock: 6378 with self.assertRaises(TypeError): 6379 sock.sendmsg_afalg() 6380 6381 with self.assertRaises(TypeError): 6382 sock.sendmsg_afalg(op=None) 6383 6384 with self.assertRaises(TypeError): 6385 sock.sendmsg_afalg(1) 6386 6387 with self.assertRaises(TypeError): 6388 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) 6389 6390 with self.assertRaises(TypeError): 6391 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) 6392 6393 def test_length_restriction(self): 6394 # bpo-35050, off-by-one error in length check 6395 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6396 self.addCleanup(sock.close) 6397 6398 # salg_type[14] 6399 with self.assertRaises(FileNotFoundError): 6400 sock.bind(("t" * 13, "name")) 6401 with self.assertRaisesRegex(ValueError, "type too long"): 6402 sock.bind(("t" * 14, "name")) 6403 6404 # salg_name[64] 6405 with self.assertRaises(FileNotFoundError): 6406 sock.bind(("type", "n" * 63)) 6407 with self.assertRaisesRegex(ValueError, "name too long"): 6408 sock.bind(("type", "n" * 64)) 6409 6410 6411@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") 6412class TestMSWindowsTCPFlags(unittest.TestCase): 6413 knownTCPFlags = { 6414 # available since long time ago 6415 'TCP_MAXSEG', 6416 'TCP_NODELAY', 6417 # available starting with Windows 10 1607 6418 'TCP_FASTOPEN', 6419 # available starting with Windows 10 1703 6420 'TCP_KEEPCNT', 6421 # available starting with Windows 10 1709 6422 'TCP_KEEPIDLE', 6423 'TCP_KEEPINTVL' 6424 } 6425 6426 def test_new_tcp_flags(self): 6427 provided = [s for s in dir(socket) if s.startswith('TCP')] 6428 unknown = [s for s in provided if s not in self.knownTCPFlags] 6429 6430 self.assertEqual([], unknown, 6431 "New TCP flags were discovered. See bpo-32394 for more information") 6432 6433 6434class CreateServerTest(unittest.TestCase): 6435 6436 def test_address(self): 6437 port = socket_helper.find_unused_port() 6438 with socket.create_server(("127.0.0.1", port)) as sock: 6439 self.assertEqual(sock.getsockname()[0], "127.0.0.1") 6440 self.assertEqual(sock.getsockname()[1], port) 6441 if socket_helper.IPV6_ENABLED: 6442 with socket.create_server(("::1", port), 6443 family=socket.AF_INET6) as sock: 6444 self.assertEqual(sock.getsockname()[0], "::1") 6445 self.assertEqual(sock.getsockname()[1], port) 6446 6447 def test_family_and_type(self): 6448 with socket.create_server(("127.0.0.1", 0)) as sock: 6449 self.assertEqual(sock.family, socket.AF_INET) 6450 self.assertEqual(sock.type, socket.SOCK_STREAM) 6451 if socket_helper.IPV6_ENABLED: 6452 with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: 6453 self.assertEqual(s.family, socket.AF_INET6) 6454 self.assertEqual(sock.type, socket.SOCK_STREAM) 6455 6456 def test_reuse_port(self): 6457 if not hasattr(socket, "SO_REUSEPORT"): 6458 with self.assertRaises(ValueError): 6459 socket.create_server(("localhost", 0), reuse_port=True) 6460 else: 6461 with socket.create_server(("localhost", 0)) as sock: 6462 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6463 self.assertEqual(opt, 0) 6464 with socket.create_server(("localhost", 0), reuse_port=True) as sock: 6465 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6466 self.assertNotEqual(opt, 0) 6467 6468 @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or 6469 not hasattr(_socket, 'IPV6_V6ONLY'), 6470 "IPV6_V6ONLY option not supported") 6471 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6472 def test_ipv6_only_default(self): 6473 with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: 6474 assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) 6475 6476 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6477 "dualstack_ipv6 not supported") 6478 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6479 def test_dualstack_ipv6_family(self): 6480 with socket.create_server(("::1", 0), family=socket.AF_INET6, 6481 dualstack_ipv6=True) as sock: 6482 self.assertEqual(sock.family, socket.AF_INET6) 6483 6484 6485class CreateServerFunctionalTest(unittest.TestCase): 6486 timeout = support.LOOPBACK_TIMEOUT 6487 6488 def setUp(self): 6489 self.thread = None 6490 6491 def tearDown(self): 6492 if self.thread is not None: 6493 self.thread.join(self.timeout) 6494 6495 def echo_server(self, sock): 6496 def run(sock): 6497 with sock: 6498 conn, _ = sock.accept() 6499 with conn: 6500 event.wait(self.timeout) 6501 msg = conn.recv(1024) 6502 if not msg: 6503 return 6504 conn.sendall(msg) 6505 6506 event = threading.Event() 6507 sock.settimeout(self.timeout) 6508 self.thread = threading.Thread(target=run, args=(sock, )) 6509 self.thread.start() 6510 event.set() 6511 6512 def echo_client(self, addr, family): 6513 with socket.socket(family=family) as sock: 6514 sock.settimeout(self.timeout) 6515 sock.connect(addr) 6516 sock.sendall(b'foo') 6517 self.assertEqual(sock.recv(1024), b'foo') 6518 6519 def test_tcp4(self): 6520 port = socket_helper.find_unused_port() 6521 with socket.create_server(("", port)) as sock: 6522 self.echo_server(sock) 6523 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6524 6525 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6526 def test_tcp6(self): 6527 port = socket_helper.find_unused_port() 6528 with socket.create_server(("", port), 6529 family=socket.AF_INET6) as sock: 6530 self.echo_server(sock) 6531 self.echo_client(("::1", port), socket.AF_INET6) 6532 6533 # --- dual stack tests 6534 6535 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6536 "dualstack_ipv6 not supported") 6537 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6538 def test_dual_stack_client_v4(self): 6539 port = socket_helper.find_unused_port() 6540 with socket.create_server(("", port), family=socket.AF_INET6, 6541 dualstack_ipv6=True) as sock: 6542 self.echo_server(sock) 6543 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6544 6545 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6546 "dualstack_ipv6 not supported") 6547 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6548 def test_dual_stack_client_v6(self): 6549 port = socket_helper.find_unused_port() 6550 with socket.create_server(("", port), family=socket.AF_INET6, 6551 dualstack_ipv6=True) as sock: 6552 self.echo_server(sock) 6553 self.echo_client(("::1", port), socket.AF_INET6) 6554 6555@requireAttrs(socket, "send_fds") 6556@requireAttrs(socket, "recv_fds") 6557@requireAttrs(socket, "AF_UNIX") 6558class SendRecvFdsTests(unittest.TestCase): 6559 def testSendAndRecvFds(self): 6560 def close_pipes(pipes): 6561 for fd1, fd2 in pipes: 6562 os.close(fd1) 6563 os.close(fd2) 6564 6565 def close_fds(fds): 6566 for fd in fds: 6567 os.close(fd) 6568 6569 # send 10 file descriptors 6570 pipes = [os.pipe() for _ in range(10)] 6571 self.addCleanup(close_pipes, pipes) 6572 fds = [rfd for rfd, wfd in pipes] 6573 6574 # use a UNIX socket pair to exchange file descriptors locally 6575 sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) 6576 with sock1, sock2: 6577 socket.send_fds(sock1, [MSG], fds) 6578 # request more data and file descriptors than expected 6579 msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2) 6580 self.addCleanup(close_fds, fds2) 6581 6582 self.assertEqual(msg, MSG) 6583 self.assertEqual(len(fds2), len(fds)) 6584 self.assertEqual(flags, 0) 6585 # don't test addr 6586 6587 # test that file descriptors are connected 6588 for index, fds in enumerate(pipes): 6589 rfd, wfd = fds 6590 os.write(wfd, str(index).encode()) 6591 6592 for index, rfd in enumerate(fds2): 6593 data = os.read(rfd, 100) 6594 self.assertEqual(data, str(index).encode()) 6595 6596 6597def test_main(): 6598 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, 6599 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, 6600 UDPTimeoutTest, CreateServerTest, CreateServerFunctionalTest, 6601 SendRecvFdsTests] 6602 6603 tests.extend([ 6604 NonBlockingTCPTests, 6605 FileObjectClassTestCase, 6606 UnbufferedFileObjectClassTestCase, 6607 LineBufferedFileObjectClassTestCase, 6608 SmallBufferedFileObjectClassTestCase, 6609 UnicodeReadFileObjectClassTestCase, 6610 UnicodeWriteFileObjectClassTestCase, 6611 UnicodeReadWriteFileObjectClassTestCase, 6612 NetworkConnectionNoServer, 6613 NetworkConnectionAttributesTest, 6614 NetworkConnectionBehaviourTest, 6615 ContextManagersTest, 6616 InheritanceTest, 6617 NonblockConstantTest 6618 ]) 6619 tests.append(BasicSocketPairTest) 6620 tests.append(TestUnixDomain) 6621 tests.append(TestLinuxAbstractNamespace) 6622 tests.extend([TIPCTest, TIPCThreadableTest]) 6623 tests.extend([BasicCANTest, CANTest]) 6624 tests.extend([BasicRDSTest, RDSTest]) 6625 tests.append(LinuxKernelCryptoAPI) 6626 tests.append(BasicQIPCRTRTest) 6627 tests.extend([ 6628 BasicVSOCKTest, 6629 ThreadedVSOCKSocketStreamTest, 6630 ]) 6631 tests.append(BasicBluetoothTest) 6632 tests.extend([ 6633 CmsgMacroTests, 6634 SendmsgUDPTest, 6635 RecvmsgUDPTest, 6636 RecvmsgIntoUDPTest, 6637 SendmsgUDP6Test, 6638 RecvmsgUDP6Test, 6639 RecvmsgRFC3542AncillaryUDP6Test, 6640 RecvmsgIntoRFC3542AncillaryUDP6Test, 6641 RecvmsgIntoUDP6Test, 6642 SendmsgUDPLITETest, 6643 RecvmsgUDPLITETest, 6644 RecvmsgIntoUDPLITETest, 6645 SendmsgUDPLITE6Test, 6646 RecvmsgUDPLITE6Test, 6647 RecvmsgRFC3542AncillaryUDPLITE6Test, 6648 RecvmsgIntoRFC3542AncillaryUDPLITE6Test, 6649 RecvmsgIntoUDPLITE6Test, 6650 SendmsgTCPTest, 6651 RecvmsgTCPTest, 6652 RecvmsgIntoTCPTest, 6653 SendmsgSCTPStreamTest, 6654 RecvmsgSCTPStreamTest, 6655 RecvmsgIntoSCTPStreamTest, 6656 SendmsgUnixStreamTest, 6657 RecvmsgUnixStreamTest, 6658 RecvmsgIntoUnixStreamTest, 6659 RecvmsgSCMRightsStreamTest, 6660 RecvmsgIntoSCMRightsStreamTest, 6661 # These are slow when setitimer() is not available 6662 InterruptedRecvTimeoutTest, 6663 InterruptedSendTimeoutTest, 6664 TestSocketSharing, 6665 SendfileUsingSendTest, 6666 SendfileUsingSendfileTest, 6667 ]) 6668 tests.append(TestMSWindowsTCPFlags) 6669 6670 thread_info = support.threading_setup() 6671 support.run_unittest(*tests) 6672 support.threading_cleanup(*thread_info) 6673 6674 6675if __name__ == "__main__": 6676 test_main() 6677