1import unittest 2from test import support 3from test.support import os_helper 4from test.support import socket_helper 5from test.support import threading_helper 6 7import errno 8import io 9import itertools 10import socket 11import select 12import tempfile 13import time 14import traceback 15import queue 16import sys 17import os 18import platform 19import array 20import contextlib 21from weakref import proxy 22import signal 23import math 24import pickle 25import struct 26import random 27import shutil 28import string 29import _thread as thread 30import threading 31try: 32 import multiprocessing 33except ImportError: 34 multiprocessing = False 35try: 36 import fcntl 37except ImportError: 38 fcntl = None 39 40HOST = socket_helper.HOST 41# test unicode string and carriage return 42MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') 43 44VSOCKPORT = 1234 45AIX = platform.system() == "AIX" 46 47try: 48 import _socket 49except ImportError: 50 _socket = None 51 52def get_cid(): 53 if fcntl is None: 54 return None 55 if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'): 56 return None 57 try: 58 with open("/dev/vsock", "rb") as f: 59 r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ") 60 except OSError: 61 return None 62 else: 63 return struct.unpack("I", r)[0] 64 65def _have_socket_can(): 66 """Check whether CAN sockets are supported on this host.""" 67 try: 68 s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 69 except (AttributeError, OSError): 70 return False 71 else: 72 s.close() 73 return True 74 75def _have_socket_can_isotp(): 76 """Check whether CAN ISOTP sockets are supported on this host.""" 77 try: 78 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) 79 except (AttributeError, OSError): 80 return False 81 else: 82 s.close() 83 return True 84 85def _have_socket_can_j1939(): 86 """Check whether CAN J1939 sockets are supported on this host.""" 87 try: 88 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) 89 except (AttributeError, OSError): 90 return False 91 else: 92 s.close() 93 return True 94 95def _have_socket_rds(): 96 """Check whether RDS sockets are supported on this host.""" 97 try: 98 s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 99 except (AttributeError, OSError): 100 return False 101 else: 102 s.close() 103 return True 104 105def _have_socket_alg(): 106 """Check whether AF_ALG sockets are supported on this host.""" 107 try: 108 s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 109 except (AttributeError, OSError): 110 return False 111 else: 112 s.close() 113 return True 114 115def _have_socket_qipcrtr(): 116 """Check whether AF_QIPCRTR sockets are supported on this host.""" 117 try: 118 s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0) 119 except (AttributeError, OSError): 120 return False 121 else: 122 s.close() 123 return True 124 125def _have_socket_vsock(): 126 """Check whether AF_VSOCK sockets are supported on this host.""" 127 ret = get_cid() is not None 128 return ret 129 130 131def _have_socket_bluetooth(): 132 """Check whether AF_BLUETOOTH sockets are supported on this host.""" 133 try: 134 # RFCOMM is supported by all platforms with bluetooth support. Windows 135 # does not support omitting the protocol. 136 s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) 137 except (AttributeError, OSError): 138 return False 139 else: 140 s.close() 141 return True 142 143 144@contextlib.contextmanager 145def socket_setdefaulttimeout(timeout): 146 old_timeout = socket.getdefaulttimeout() 147 try: 148 socket.setdefaulttimeout(timeout) 149 yield 150 finally: 151 socket.setdefaulttimeout(old_timeout) 152 153 154HAVE_SOCKET_CAN = _have_socket_can() 155 156HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() 157 158HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939() 159 160HAVE_SOCKET_RDS = _have_socket_rds() 161 162HAVE_SOCKET_ALG = _have_socket_alg() 163 164HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr() 165 166HAVE_SOCKET_VSOCK = _have_socket_vsock() 167 168HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE") 169 170HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth() 171 172# Size in bytes of the int type 173SIZEOF_INT = array.array("i").itemsize 174 175class SocketTCPTest(unittest.TestCase): 176 177 def setUp(self): 178 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 179 self.port = socket_helper.bind_port(self.serv) 180 self.serv.listen() 181 182 def tearDown(self): 183 self.serv.close() 184 self.serv = None 185 186class SocketUDPTest(unittest.TestCase): 187 188 def setUp(self): 189 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 190 self.port = socket_helper.bind_port(self.serv) 191 192 def tearDown(self): 193 self.serv.close() 194 self.serv = None 195 196class SocketUDPLITETest(SocketUDPTest): 197 198 def setUp(self): 199 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 200 self.port = socket_helper.bind_port(self.serv) 201 202class ThreadSafeCleanupTestCase: 203 """Subclass of unittest.TestCase with thread-safe cleanup methods. 204 205 This subclass protects the addCleanup() and doCleanups() methods 206 with a recursive lock. 207 """ 208 209 def __init__(self, *args, **kwargs): 210 super().__init__(*args, **kwargs) 211 self._cleanup_lock = threading.RLock() 212 213 def addCleanup(self, *args, **kwargs): 214 with self._cleanup_lock: 215 return super().addCleanup(*args, **kwargs) 216 217 def doCleanups(self, *args, **kwargs): 218 with self._cleanup_lock: 219 return super().doCleanups(*args, **kwargs) 220 221class SocketCANTest(unittest.TestCase): 222 223 """To be able to run this test, a `vcan0` CAN interface can be created with 224 the following commands: 225 # modprobe vcan 226 # ip link add dev vcan0 type vcan 227 # ip link set up vcan0 228 """ 229 interface = 'vcan0' 230 bufsize = 128 231 232 """The CAN frame structure is defined in <linux/can.h>: 233 234 struct can_frame { 235 canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ 236 __u8 can_dlc; /* data length code: 0 .. 8 */ 237 __u8 data[8] __attribute__((aligned(8))); 238 }; 239 """ 240 can_frame_fmt = "=IB3x8s" 241 can_frame_size = struct.calcsize(can_frame_fmt) 242 243 """The Broadcast Management Command frame structure is defined 244 in <linux/can/bcm.h>: 245 246 struct bcm_msg_head { 247 __u32 opcode; 248 __u32 flags; 249 __u32 count; 250 struct timeval ival1, ival2; 251 canid_t can_id; 252 __u32 nframes; 253 struct can_frame frames[0]; 254 } 255 256 `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see 257 `struct can_frame` definition). Must use native not standard types for packing. 258 """ 259 bcm_cmd_msg_fmt = "@3I4l2I" 260 bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) 261 262 def setUp(self): 263 self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 264 self.addCleanup(self.s.close) 265 try: 266 self.s.bind((self.interface,)) 267 except OSError: 268 self.skipTest('network interface `%s` does not exist' % 269 self.interface) 270 271 272class SocketRDSTest(unittest.TestCase): 273 274 """To be able to run this test, the `rds` kernel module must be loaded: 275 # modprobe rds 276 """ 277 bufsize = 8192 278 279 def setUp(self): 280 self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 281 self.addCleanup(self.serv.close) 282 try: 283 self.port = socket_helper.bind_port(self.serv) 284 except OSError: 285 self.skipTest('unable to bind RDS socket') 286 287 288class ThreadableTest: 289 """Threadable Test class 290 291 The ThreadableTest class makes it easy to create a threaded 292 client/server pair from an existing unit test. To create a 293 new threaded class from an existing unit test, use multiple 294 inheritance: 295 296 class NewClass (OldClass, ThreadableTest): 297 pass 298 299 This class defines two new fixture functions with obvious 300 purposes for overriding: 301 302 clientSetUp () 303 clientTearDown () 304 305 Any new test functions within the class must then define 306 tests in pairs, where the test name is preceded with a 307 '_' to indicate the client portion of the test. Ex: 308 309 def testFoo(self): 310 # Server portion 311 312 def _testFoo(self): 313 # Client portion 314 315 Any exceptions raised by the clients during their tests 316 are caught and transferred to the main thread to alert 317 the testing framework. 318 319 Note, the server setup function cannot call any blocking 320 functions that rely on the client thread during setup, 321 unless serverExplicitReady() is called just before 322 the blocking call (such as in setting up a client/server 323 connection and performing the accept() in setUp(). 324 """ 325 326 def __init__(self): 327 # Swap the true setup function 328 self.__setUp = self.setUp 329 self.setUp = self._setUp 330 331 def serverExplicitReady(self): 332 """This method allows the server to explicitly indicate that 333 it wants the client thread to proceed. This is useful if the 334 server is about to execute a blocking routine that is 335 dependent upon the client thread during its setup routine.""" 336 self.server_ready.set() 337 338 def _setUp(self): 339 self.wait_threads = threading_helper.wait_threads_exit() 340 self.wait_threads.__enter__() 341 self.addCleanup(self.wait_threads.__exit__, None, None, None) 342 343 self.server_ready = threading.Event() 344 self.client_ready = threading.Event() 345 self.done = threading.Event() 346 self.queue = queue.Queue(1) 347 self.server_crashed = False 348 349 def raise_queued_exception(): 350 if self.queue.qsize(): 351 raise self.queue.get() 352 self.addCleanup(raise_queued_exception) 353 354 # Do some munging to start the client test. 355 methodname = self.id() 356 i = methodname.rfind('.') 357 methodname = methodname[i+1:] 358 test_method = getattr(self, '_' + methodname) 359 self.client_thread = thread.start_new_thread( 360 self.clientRun, (test_method,)) 361 362 try: 363 self.__setUp() 364 except: 365 self.server_crashed = True 366 raise 367 finally: 368 self.server_ready.set() 369 self.client_ready.wait() 370 self.addCleanup(self.done.wait) 371 372 def clientRun(self, test_func): 373 self.server_ready.wait() 374 try: 375 self.clientSetUp() 376 except BaseException as e: 377 self.queue.put(e) 378 self.clientTearDown() 379 return 380 finally: 381 self.client_ready.set() 382 if self.server_crashed: 383 self.clientTearDown() 384 return 385 if not hasattr(test_func, '__call__'): 386 raise TypeError("test_func must be a callable function") 387 try: 388 test_func() 389 except BaseException as e: 390 self.queue.put(e) 391 finally: 392 self.clientTearDown() 393 394 def clientSetUp(self): 395 raise NotImplementedError("clientSetUp must be implemented.") 396 397 def clientTearDown(self): 398 self.done.set() 399 thread.exit() 400 401class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 402 403 def __init__(self, methodName='runTest'): 404 SocketTCPTest.__init__(self, methodName=methodName) 405 ThreadableTest.__init__(self) 406 407 def clientSetUp(self): 408 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 409 410 def clientTearDown(self): 411 self.cli.close() 412 self.cli = None 413 ThreadableTest.clientTearDown(self) 414 415class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 416 417 def __init__(self, methodName='runTest'): 418 SocketUDPTest.__init__(self, methodName=methodName) 419 ThreadableTest.__init__(self) 420 421 def clientSetUp(self): 422 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 423 424 def clientTearDown(self): 425 self.cli.close() 426 self.cli = None 427 ThreadableTest.clientTearDown(self) 428 429@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 430 'UDPLITE sockets required for this test.') 431class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest): 432 433 def __init__(self, methodName='runTest'): 434 SocketUDPLITETest.__init__(self, methodName=methodName) 435 ThreadableTest.__init__(self) 436 437 def clientSetUp(self): 438 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 439 440 def clientTearDown(self): 441 self.cli.close() 442 self.cli = None 443 ThreadableTest.clientTearDown(self) 444 445class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): 446 447 def __init__(self, methodName='runTest'): 448 SocketCANTest.__init__(self, methodName=methodName) 449 ThreadableTest.__init__(self) 450 451 def clientSetUp(self): 452 self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 453 try: 454 self.cli.bind((self.interface,)) 455 except OSError: 456 # skipTest should not be called here, and will be called in the 457 # server instead 458 pass 459 460 def clientTearDown(self): 461 self.cli.close() 462 self.cli = None 463 ThreadableTest.clientTearDown(self) 464 465class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): 466 467 def __init__(self, methodName='runTest'): 468 SocketRDSTest.__init__(self, methodName=methodName) 469 ThreadableTest.__init__(self) 470 471 def clientSetUp(self): 472 self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 473 try: 474 # RDS sockets must be bound explicitly to send or receive data 475 self.cli.bind((HOST, 0)) 476 self.cli_addr = self.cli.getsockname() 477 except OSError: 478 # skipTest should not be called here, and will be called in the 479 # server instead 480 pass 481 482 def clientTearDown(self): 483 self.cli.close() 484 self.cli = None 485 ThreadableTest.clientTearDown(self) 486 487@unittest.skipIf(fcntl is None, "need fcntl") 488@unittest.skipUnless(HAVE_SOCKET_VSOCK, 489 'VSOCK sockets required for this test.') 490@unittest.skipUnless(get_cid() != 2, 491 "This test can only be run on a virtual guest.") 492class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest): 493 494 def __init__(self, methodName='runTest'): 495 unittest.TestCase.__init__(self, methodName=methodName) 496 ThreadableTest.__init__(self) 497 498 def setUp(self): 499 self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 500 self.addCleanup(self.serv.close) 501 self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT)) 502 self.serv.listen() 503 self.serverExplicitReady() 504 self.conn, self.connaddr = self.serv.accept() 505 self.addCleanup(self.conn.close) 506 507 def clientSetUp(self): 508 time.sleep(0.1) 509 self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 510 self.addCleanup(self.cli.close) 511 cid = get_cid() 512 self.cli.connect((cid, VSOCKPORT)) 513 514 def testStream(self): 515 msg = self.conn.recv(1024) 516 self.assertEqual(msg, MSG) 517 518 def _testStream(self): 519 self.cli.send(MSG) 520 self.cli.close() 521 522class SocketConnectedTest(ThreadedTCPSocketTest): 523 """Socket tests for client-server connection. 524 525 self.cli_conn is a client socket connected to the server. The 526 setUp() method guarantees that it is connected to the server. 527 """ 528 529 def __init__(self, methodName='runTest'): 530 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 531 532 def setUp(self): 533 ThreadedTCPSocketTest.setUp(self) 534 # Indicate explicitly we're ready for the client thread to 535 # proceed and then perform the blocking call to accept 536 self.serverExplicitReady() 537 conn, addr = self.serv.accept() 538 self.cli_conn = conn 539 540 def tearDown(self): 541 self.cli_conn.close() 542 self.cli_conn = None 543 ThreadedTCPSocketTest.tearDown(self) 544 545 def clientSetUp(self): 546 ThreadedTCPSocketTest.clientSetUp(self) 547 self.cli.connect((HOST, self.port)) 548 self.serv_conn = self.cli 549 550 def clientTearDown(self): 551 self.serv_conn.close() 552 self.serv_conn = None 553 ThreadedTCPSocketTest.clientTearDown(self) 554 555class SocketPairTest(unittest.TestCase, ThreadableTest): 556 557 def __init__(self, methodName='runTest'): 558 unittest.TestCase.__init__(self, methodName=methodName) 559 ThreadableTest.__init__(self) 560 561 def setUp(self): 562 self.serv, self.cli = socket.socketpair() 563 564 def tearDown(self): 565 self.serv.close() 566 self.serv = None 567 568 def clientSetUp(self): 569 pass 570 571 def clientTearDown(self): 572 self.cli.close() 573 self.cli = None 574 ThreadableTest.clientTearDown(self) 575 576 577# The following classes are used by the sendmsg()/recvmsg() tests. 578# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase 579# gives a drop-in replacement for SocketConnectedTest, but different 580# address families can be used, and the attributes serv_addr and 581# cli_addr will be set to the addresses of the endpoints. 582 583class SocketTestBase(unittest.TestCase): 584 """A base class for socket tests. 585 586 Subclasses must provide methods newSocket() to return a new socket 587 and bindSock(sock) to bind it to an unused address. 588 589 Creates a socket self.serv and sets self.serv_addr to its address. 590 """ 591 592 def setUp(self): 593 self.serv = self.newSocket() 594 self.bindServer() 595 596 def bindServer(self): 597 """Bind server socket and set self.serv_addr to its address.""" 598 self.bindSock(self.serv) 599 self.serv_addr = self.serv.getsockname() 600 601 def tearDown(self): 602 self.serv.close() 603 self.serv = None 604 605 606class SocketListeningTestMixin(SocketTestBase): 607 """Mixin to listen on the server socket.""" 608 609 def setUp(self): 610 super().setUp() 611 self.serv.listen() 612 613 614class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, 615 ThreadableTest): 616 """Mixin to add client socket and allow client/server tests. 617 618 Client socket is self.cli and its address is self.cli_addr. See 619 ThreadableTest for usage information. 620 """ 621 622 def __init__(self, *args, **kwargs): 623 super().__init__(*args, **kwargs) 624 ThreadableTest.__init__(self) 625 626 def clientSetUp(self): 627 self.cli = self.newClientSocket() 628 self.bindClient() 629 630 def newClientSocket(self): 631 """Return a new socket for use as client.""" 632 return self.newSocket() 633 634 def bindClient(self): 635 """Bind client socket and set self.cli_addr to its address.""" 636 self.bindSock(self.cli) 637 self.cli_addr = self.cli.getsockname() 638 639 def clientTearDown(self): 640 self.cli.close() 641 self.cli = None 642 ThreadableTest.clientTearDown(self) 643 644 645class ConnectedStreamTestMixin(SocketListeningTestMixin, 646 ThreadedSocketTestMixin): 647 """Mixin to allow client/server stream tests with connected client. 648 649 Server's socket representing connection to client is self.cli_conn 650 and client's connection to server is self.serv_conn. (Based on 651 SocketConnectedTest.) 652 """ 653 654 def setUp(self): 655 super().setUp() 656 # Indicate explicitly we're ready for the client thread to 657 # proceed and then perform the blocking call to accept 658 self.serverExplicitReady() 659 conn, addr = self.serv.accept() 660 self.cli_conn = conn 661 662 def tearDown(self): 663 self.cli_conn.close() 664 self.cli_conn = None 665 super().tearDown() 666 667 def clientSetUp(self): 668 super().clientSetUp() 669 self.cli.connect(self.serv_addr) 670 self.serv_conn = self.cli 671 672 def clientTearDown(self): 673 try: 674 self.serv_conn.close() 675 self.serv_conn = None 676 except AttributeError: 677 pass 678 super().clientTearDown() 679 680 681class UnixSocketTestBase(SocketTestBase): 682 """Base class for Unix-domain socket tests.""" 683 684 # This class is used for file descriptor passing tests, so we 685 # create the sockets in a private directory so that other users 686 # can't send anything that might be problematic for a privileged 687 # user running the tests. 688 689 def setUp(self): 690 self.dir_path = tempfile.mkdtemp() 691 self.addCleanup(os.rmdir, self.dir_path) 692 super().setUp() 693 694 def bindSock(self, sock): 695 path = tempfile.mktemp(dir=self.dir_path) 696 socket_helper.bind_unix_socket(sock, path) 697 self.addCleanup(os_helper.unlink, path) 698 699class UnixStreamBase(UnixSocketTestBase): 700 """Base class for Unix-domain SOCK_STREAM tests.""" 701 702 def newSocket(self): 703 return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 704 705 706class InetTestBase(SocketTestBase): 707 """Base class for IPv4 socket tests.""" 708 709 host = HOST 710 711 def setUp(self): 712 super().setUp() 713 self.port = self.serv_addr[1] 714 715 def bindSock(self, sock): 716 socket_helper.bind_port(sock, host=self.host) 717 718class TCPTestBase(InetTestBase): 719 """Base class for TCP-over-IPv4 tests.""" 720 721 def newSocket(self): 722 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 723 724class UDPTestBase(InetTestBase): 725 """Base class for UDP-over-IPv4 tests.""" 726 727 def newSocket(self): 728 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 729 730class UDPLITETestBase(InetTestBase): 731 """Base class for UDPLITE-over-IPv4 tests.""" 732 733 def newSocket(self): 734 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 735 736class SCTPStreamBase(InetTestBase): 737 """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" 738 739 def newSocket(self): 740 return socket.socket(socket.AF_INET, socket.SOCK_STREAM, 741 socket.IPPROTO_SCTP) 742 743 744class Inet6TestBase(InetTestBase): 745 """Base class for IPv6 socket tests.""" 746 747 host = socket_helper.HOSTv6 748 749class UDP6TestBase(Inet6TestBase): 750 """Base class for UDP-over-IPv6 tests.""" 751 752 def newSocket(self): 753 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 754 755class UDPLITE6TestBase(Inet6TestBase): 756 """Base class for UDPLITE-over-IPv6 tests.""" 757 758 def newSocket(self): 759 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 760 761 762# Test-skipping decorators for use with ThreadableTest. 763 764def skipWithClientIf(condition, reason): 765 """Skip decorated test if condition is true, add client_skip decorator. 766 767 If the decorated object is not a class, sets its attribute 768 "client_skip" to a decorator which will return an empty function 769 if the test is to be skipped, or the original function if it is 770 not. This can be used to avoid running the client part of a 771 skipped test when using ThreadableTest. 772 """ 773 def client_pass(*args, **kwargs): 774 pass 775 def skipdec(obj): 776 retval = unittest.skip(reason)(obj) 777 if not isinstance(obj, type): 778 retval.client_skip = lambda f: client_pass 779 return retval 780 def noskipdec(obj): 781 if not (isinstance(obj, type) or hasattr(obj, "client_skip")): 782 obj.client_skip = lambda f: f 783 return obj 784 return skipdec if condition else noskipdec 785 786 787def requireAttrs(obj, *attributes): 788 """Skip decorated test if obj is missing any of the given attributes. 789 790 Sets client_skip attribute as skipWithClientIf() does. 791 """ 792 missing = [name for name in attributes if not hasattr(obj, name)] 793 return skipWithClientIf( 794 missing, "don't have " + ", ".join(name for name in missing)) 795 796 797def requireSocket(*args): 798 """Skip decorated test if a socket cannot be created with given arguments. 799 800 When an argument is given as a string, will use the value of that 801 attribute of the socket module, or skip the test if it doesn't 802 exist. Sets client_skip attribute as skipWithClientIf() does. 803 """ 804 err = None 805 missing = [obj for obj in args if 806 isinstance(obj, str) and not hasattr(socket, obj)] 807 if missing: 808 err = "don't have " + ", ".join(name for name in missing) 809 else: 810 callargs = [getattr(socket, obj) if isinstance(obj, str) else obj 811 for obj in args] 812 try: 813 s = socket.socket(*callargs) 814 except OSError as e: 815 # XXX: check errno? 816 err = str(e) 817 else: 818 s.close() 819 return skipWithClientIf( 820 err is not None, 821 "can't create socket({0}): {1}".format( 822 ", ".join(str(o) for o in args), err)) 823 824 825####################################################################### 826## Begin Tests 827 828class GeneralModuleTests(unittest.TestCase): 829 830 def test_SocketType_is_socketobject(self): 831 import _socket 832 self.assertTrue(socket.SocketType is _socket.socket) 833 s = socket.socket() 834 self.assertIsInstance(s, socket.SocketType) 835 s.close() 836 837 def test_repr(self): 838 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 839 with s: 840 self.assertIn('fd=%i' % s.fileno(), repr(s)) 841 self.assertIn('family=%s' % socket.AF_INET, repr(s)) 842 self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) 843 self.assertIn('proto=0', repr(s)) 844 self.assertNotIn('raddr', repr(s)) 845 s.bind(('127.0.0.1', 0)) 846 self.assertIn('laddr', repr(s)) 847 self.assertIn(str(s.getsockname()), repr(s)) 848 self.assertIn('[closed]', repr(s)) 849 self.assertNotIn('laddr', repr(s)) 850 851 @unittest.skipUnless(_socket is not None, 'need _socket module') 852 def test_csocket_repr(self): 853 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 854 try: 855 expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' 856 % (s.fileno(), s.family, s.type, s.proto)) 857 self.assertEqual(repr(s), expected) 858 finally: 859 s.close() 860 expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' 861 % (s.family, s.type, s.proto)) 862 self.assertEqual(repr(s), expected) 863 864 def test_weakref(self): 865 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 866 p = proxy(s) 867 self.assertEqual(p.fileno(), s.fileno()) 868 s = None 869 support.gc_collect() # For PyPy or other GCs. 870 try: 871 p.fileno() 872 except ReferenceError: 873 pass 874 else: 875 self.fail('Socket proxy still exists') 876 877 def testSocketError(self): 878 # Testing socket module exceptions 879 msg = "Error raising socket exception (%s)." 880 with self.assertRaises(OSError, msg=msg % 'OSError'): 881 raise OSError 882 with self.assertRaises(OSError, msg=msg % 'socket.herror'): 883 raise socket.herror 884 with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): 885 raise socket.gaierror 886 887 def testSendtoErrors(self): 888 # Testing that sendto doesn't mask failures. See #10169. 889 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 890 self.addCleanup(s.close) 891 s.bind(('', 0)) 892 sockname = s.getsockname() 893 # 2 args 894 with self.assertRaises(TypeError) as cm: 895 s.sendto('\u2620', sockname) 896 self.assertEqual(str(cm.exception), 897 "a bytes-like object is required, not 'str'") 898 with self.assertRaises(TypeError) as cm: 899 s.sendto(5j, sockname) 900 self.assertEqual(str(cm.exception), 901 "a bytes-like object is required, not 'complex'") 902 with self.assertRaises(TypeError) as cm: 903 s.sendto(b'foo', None) 904 self.assertIn('not NoneType',str(cm.exception)) 905 # 3 args 906 with self.assertRaises(TypeError) as cm: 907 s.sendto('\u2620', 0, sockname) 908 self.assertEqual(str(cm.exception), 909 "a bytes-like object is required, not 'str'") 910 with self.assertRaises(TypeError) as cm: 911 s.sendto(5j, 0, sockname) 912 self.assertEqual(str(cm.exception), 913 "a bytes-like object is required, not 'complex'") 914 with self.assertRaises(TypeError) as cm: 915 s.sendto(b'foo', 0, None) 916 self.assertIn('not NoneType', str(cm.exception)) 917 with self.assertRaises(TypeError) as cm: 918 s.sendto(b'foo', 'bar', sockname) 919 with self.assertRaises(TypeError) as cm: 920 s.sendto(b'foo', None, None) 921 # wrong number of args 922 with self.assertRaises(TypeError) as cm: 923 s.sendto(b'foo') 924 self.assertIn('(1 given)', str(cm.exception)) 925 with self.assertRaises(TypeError) as cm: 926 s.sendto(b'foo', 0, sockname, 4) 927 self.assertIn('(4 given)', str(cm.exception)) 928 929 def testCrucialConstants(self): 930 # Testing for mission critical constants 931 socket.AF_INET 932 if socket.has_ipv6: 933 socket.AF_INET6 934 socket.SOCK_STREAM 935 socket.SOCK_DGRAM 936 socket.SOCK_RAW 937 socket.SOCK_RDM 938 socket.SOCK_SEQPACKET 939 socket.SOL_SOCKET 940 socket.SO_REUSEADDR 941 942 def testCrucialIpProtoConstants(self): 943 socket.IPPROTO_TCP 944 socket.IPPROTO_UDP 945 if socket.has_ipv6: 946 socket.IPPROTO_IPV6 947 948 @unittest.skipUnless(os.name == "nt", "Windows specific") 949 def testWindowsSpecificConstants(self): 950 socket.IPPROTO_ICLFXBM 951 socket.IPPROTO_ST 952 socket.IPPROTO_CBT 953 socket.IPPROTO_IGP 954 socket.IPPROTO_RDP 955 socket.IPPROTO_PGM 956 socket.IPPROTO_L2TP 957 socket.IPPROTO_SCTP 958 959 @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') 960 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 961 def test3542SocketOptions(self): 962 # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542 963 opts = { 964 'IPV6_CHECKSUM', 965 'IPV6_DONTFRAG', 966 'IPV6_DSTOPTS', 967 'IPV6_HOPLIMIT', 968 'IPV6_HOPOPTS', 969 'IPV6_NEXTHOP', 970 'IPV6_PATHMTU', 971 'IPV6_PKTINFO', 972 'IPV6_RECVDSTOPTS', 973 'IPV6_RECVHOPLIMIT', 974 'IPV6_RECVHOPOPTS', 975 'IPV6_RECVPATHMTU', 976 'IPV6_RECVPKTINFO', 977 'IPV6_RECVRTHDR', 978 'IPV6_RECVTCLASS', 979 'IPV6_RTHDR', 980 'IPV6_RTHDRDSTOPTS', 981 'IPV6_RTHDR_TYPE_0', 982 'IPV6_TCLASS', 983 'IPV6_USE_MIN_MTU', 984 } 985 for opt in opts: 986 self.assertTrue( 987 hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'" 988 ) 989 990 def testHostnameRes(self): 991 # Testing hostname resolution mechanisms 992 hostname = socket.gethostname() 993 try: 994 ip = socket.gethostbyname(hostname) 995 except OSError: 996 # Probably name lookup wasn't set up right; skip this test 997 self.skipTest('name lookup failure') 998 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 999 try: 1000 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 1001 except OSError: 1002 # Probably a similar problem as above; skip this test 1003 self.skipTest('name lookup failure') 1004 all_host_names = [hostname, hname] + aliases 1005 fqhn = socket.getfqdn(ip) 1006 if not fqhn in all_host_names: 1007 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 1008 1009 def test_host_resolution(self): 1010 for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']: 1011 self.assertEqual(socket.gethostbyname(addr), addr) 1012 1013 # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have 1014 # a matching name entry (e.g. 'ip6-localhost') 1015 for host in [socket_helper.HOSTv4]: 1016 self.assertIn(host, socket.gethostbyaddr(host)[2]) 1017 1018 def test_host_resolution_bad_address(self): 1019 # These are all malformed IP addresses and expected not to resolve to 1020 # any result. But some ISPs, e.g. AWS, may successfully resolve these 1021 # IPs. 1022 explanation = ( 1023 "resolving an invalid IP address did not raise OSError; " 1024 "can be caused by a broken DNS server" 1025 ) 1026 for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', 1027 '1:1:1:1:1:1:1:1:1']: 1028 with self.assertRaises(OSError, msg=addr): 1029 socket.gethostbyname(addr) 1030 with self.assertRaises(OSError, msg=explanation): 1031 socket.gethostbyaddr(addr) 1032 1033 @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") 1034 @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") 1035 def test_sethostname(self): 1036 oldhn = socket.gethostname() 1037 try: 1038 socket.sethostname('new') 1039 except OSError as e: 1040 if e.errno == errno.EPERM: 1041 self.skipTest("test should be run as root") 1042 else: 1043 raise 1044 try: 1045 # running test as root! 1046 self.assertEqual(socket.gethostname(), 'new') 1047 # Should work with bytes objects too 1048 socket.sethostname(b'bar') 1049 self.assertEqual(socket.gethostname(), 'bar') 1050 finally: 1051 socket.sethostname(oldhn) 1052 1053 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 1054 'socket.if_nameindex() not available.') 1055 def testInterfaceNameIndex(self): 1056 interfaces = socket.if_nameindex() 1057 for index, name in interfaces: 1058 self.assertIsInstance(index, int) 1059 self.assertIsInstance(name, str) 1060 # interface indices are non-zero integers 1061 self.assertGreater(index, 0) 1062 _index = socket.if_nametoindex(name) 1063 self.assertIsInstance(_index, int) 1064 self.assertEqual(index, _index) 1065 _name = socket.if_indextoname(index) 1066 self.assertIsInstance(_name, str) 1067 self.assertEqual(name, _name) 1068 1069 @unittest.skipUnless(hasattr(socket, 'if_indextoname'), 1070 'socket.if_indextoname() not available.') 1071 def testInvalidInterfaceIndexToName(self): 1072 self.assertRaises(OSError, socket.if_indextoname, 0) 1073 self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') 1074 1075 @unittest.skipUnless(hasattr(socket, 'if_nametoindex'), 1076 'socket.if_nametoindex() not available.') 1077 def testInvalidInterfaceNameToIndex(self): 1078 self.assertRaises(TypeError, socket.if_nametoindex, 0) 1079 self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') 1080 1081 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 1082 'test needs sys.getrefcount()') 1083 def testRefCountGetNameInfo(self): 1084 # Testing reference count for getnameinfo 1085 try: 1086 # On some versions, this loses a reference 1087 orig = sys.getrefcount(__name__) 1088 socket.getnameinfo(__name__,0) 1089 except TypeError: 1090 if sys.getrefcount(__name__) != orig: 1091 self.fail("socket.getnameinfo loses a reference") 1092 1093 def testInterpreterCrash(self): 1094 # Making sure getnameinfo doesn't crash the interpreter 1095 try: 1096 # On some versions, this crashes the interpreter. 1097 socket.getnameinfo(('x', 0, 0, 0), 0) 1098 except OSError: 1099 pass 1100 1101 def testNtoH(self): 1102 # This just checks that htons etc. are their own inverse, 1103 # when looking at the lower 16 or 32 bits. 1104 sizes = {socket.htonl: 32, socket.ntohl: 32, 1105 socket.htons: 16, socket.ntohs: 16} 1106 for func, size in sizes.items(): 1107 mask = (1<<size) - 1 1108 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 1109 self.assertEqual(i & mask, func(func(i&mask)) & mask) 1110 1111 swapped = func(mask) 1112 self.assertEqual(swapped & mask, mask) 1113 self.assertRaises(OverflowError, func, 1<<34) 1114 1115 @support.cpython_only 1116 def testNtoHErrors(self): 1117 import _testcapi 1118 s_good_values = [0, 1, 2, 0xffff] 1119 l_good_values = s_good_values + [0xffffffff] 1120 l_bad_values = [-1, -2, 1<<32, 1<<1000] 1121 s_bad_values = ( 1122 l_bad_values + 1123 [_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] + 1124 [1 << 16, _testcapi.INT_MAX] 1125 ) 1126 for k in s_good_values: 1127 socket.ntohs(k) 1128 socket.htons(k) 1129 for k in l_good_values: 1130 socket.ntohl(k) 1131 socket.htonl(k) 1132 for k in s_bad_values: 1133 self.assertRaises(OverflowError, socket.ntohs, k) 1134 self.assertRaises(OverflowError, socket.htons, k) 1135 for k in l_bad_values: 1136 self.assertRaises(OverflowError, socket.ntohl, k) 1137 self.assertRaises(OverflowError, socket.htonl, k) 1138 1139 def testGetServBy(self): 1140 eq = self.assertEqual 1141 # Find one service that exists, then check all the related interfaces. 1142 # I've ordered this by protocols that have both a tcp and udp 1143 # protocol, at least for modern Linuxes. 1144 if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) 1145 or sys.platform in ('linux', 'darwin')): 1146 # avoid the 'echo' service on this platform, as there is an 1147 # assumption breaking non-standard port/protocol entry 1148 services = ('daytime', 'qotd', 'domain') 1149 else: 1150 services = ('echo', 'daytime', 'domain') 1151 for service in services: 1152 try: 1153 port = socket.getservbyname(service, 'tcp') 1154 break 1155 except OSError: 1156 pass 1157 else: 1158 raise OSError 1159 # Try same call with optional protocol omitted 1160 # Issue #26936: Android getservbyname() was broken before API 23. 1161 if (not hasattr(sys, 'getandroidapilevel') or 1162 sys.getandroidapilevel() >= 23): 1163 port2 = socket.getservbyname(service) 1164 eq(port, port2) 1165 # Try udp, but don't barf if it doesn't exist 1166 try: 1167 udpport = socket.getservbyname(service, 'udp') 1168 except OSError: 1169 udpport = None 1170 else: 1171 eq(udpport, port) 1172 # Now make sure the lookup by port returns the same service name 1173 # Issue #26936: Android getservbyport() is broken. 1174 if not support.is_android: 1175 eq(socket.getservbyport(port2), service) 1176 eq(socket.getservbyport(port, 'tcp'), service) 1177 if udpport is not None: 1178 eq(socket.getservbyport(udpport, 'udp'), service) 1179 # Make sure getservbyport does not accept out of range ports. 1180 self.assertRaises(OverflowError, socket.getservbyport, -1) 1181 self.assertRaises(OverflowError, socket.getservbyport, 65536) 1182 1183 def testDefaultTimeout(self): 1184 # Testing default timeout 1185 # The default timeout should initially be None 1186 self.assertEqual(socket.getdefaulttimeout(), None) 1187 with socket.socket() as s: 1188 self.assertEqual(s.gettimeout(), None) 1189 1190 # Set the default timeout to 10, and see if it propagates 1191 with socket_setdefaulttimeout(10): 1192 self.assertEqual(socket.getdefaulttimeout(), 10) 1193 with socket.socket() as sock: 1194 self.assertEqual(sock.gettimeout(), 10) 1195 1196 # Reset the default timeout to None, and see if it propagates 1197 socket.setdefaulttimeout(None) 1198 self.assertEqual(socket.getdefaulttimeout(), None) 1199 with socket.socket() as sock: 1200 self.assertEqual(sock.gettimeout(), None) 1201 1202 # Check that setting it to an invalid value raises ValueError 1203 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 1204 1205 # Check that setting it to an invalid type raises TypeError 1206 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 1207 1208 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 1209 'test needs socket.inet_aton()') 1210 def testIPv4_inet_aton_fourbytes(self): 1211 # Test that issue1008086 and issue767150 are fixed. 1212 # It must return 4 bytes. 1213 self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) 1214 self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) 1215 1216 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1217 'test needs socket.inet_pton()') 1218 def testIPv4toString(self): 1219 from socket import inet_aton as f, inet_pton, AF_INET 1220 g = lambda a: inet_pton(AF_INET, a) 1221 1222 assertInvalid = lambda func,a: self.assertRaises( 1223 (OSError, ValueError), func, a 1224 ) 1225 1226 self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) 1227 self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) 1228 self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) 1229 self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) 1230 self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) 1231 # bpo-29972: inet_pton() doesn't fail on AIX 1232 if not AIX: 1233 assertInvalid(f, '0.0.0.') 1234 assertInvalid(f, '300.0.0.0') 1235 assertInvalid(f, 'a.0.0.0') 1236 assertInvalid(f, '1.2.3.4.5') 1237 assertInvalid(f, '::1') 1238 1239 self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) 1240 self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) 1241 self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) 1242 self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) 1243 assertInvalid(g, '0.0.0.') 1244 assertInvalid(g, '300.0.0.0') 1245 assertInvalid(g, 'a.0.0.0') 1246 assertInvalid(g, '1.2.3.4.5') 1247 assertInvalid(g, '::1') 1248 1249 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1250 'test needs socket.inet_pton()') 1251 def testIPv6toString(self): 1252 try: 1253 from socket import inet_pton, AF_INET6, has_ipv6 1254 if not has_ipv6: 1255 self.skipTest('IPv6 not available') 1256 except ImportError: 1257 self.skipTest('could not import needed symbols from socket') 1258 1259 if sys.platform == "win32": 1260 try: 1261 inet_pton(AF_INET6, '::') 1262 except OSError as e: 1263 if e.winerror == 10022: 1264 self.skipTest('IPv6 might not be supported') 1265 1266 f = lambda a: inet_pton(AF_INET6, a) 1267 assertInvalid = lambda a: self.assertRaises( 1268 (OSError, ValueError), f, a 1269 ) 1270 1271 self.assertEqual(b'\x00' * 16, f('::')) 1272 self.assertEqual(b'\x00' * 16, f('0::0')) 1273 self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) 1274 self.assertEqual( 1275 b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 1276 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 1277 ) 1278 self.assertEqual( 1279 b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', 1280 f('ad42:abc::127:0:254:2') 1281 ) 1282 self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) 1283 assertInvalid('0x20::') 1284 assertInvalid(':::') 1285 assertInvalid('::0::') 1286 assertInvalid('1::abc::') 1287 assertInvalid('1::abc::def') 1288 assertInvalid('1:2:3:4:5:6') 1289 assertInvalid('1:2:3:4:5:6:') 1290 assertInvalid('1:2:3:4:5:6:7:8:0') 1291 # bpo-29972: inet_pton() doesn't fail on AIX 1292 if not AIX: 1293 assertInvalid('1:2:3:4:5:6:7:8:') 1294 1295 self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', 1296 f('::254.42.23.64') 1297 ) 1298 self.assertEqual( 1299 b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', 1300 f('42::a29b:254.42.23.64') 1301 ) 1302 self.assertEqual( 1303 b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', 1304 f('42:a8b9:0:2:ffff:a29b:254.42.23.64') 1305 ) 1306 assertInvalid('255.254.253.252') 1307 assertInvalid('1::260.2.3.0') 1308 assertInvalid('1::0.be.e.0') 1309 assertInvalid('1:2:3:4:5:6:7:1.2.3.4') 1310 assertInvalid('::1.2.3.4:0') 1311 assertInvalid('0.100.200.0:3:4:5:6:7:8') 1312 1313 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1314 'test needs socket.inet_ntop()') 1315 def testStringToIPv4(self): 1316 from socket import inet_ntoa as f, inet_ntop, AF_INET 1317 g = lambda a: inet_ntop(AF_INET, a) 1318 assertInvalid = lambda func,a: self.assertRaises( 1319 (OSError, ValueError), func, a 1320 ) 1321 1322 self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) 1323 self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) 1324 self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) 1325 self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) 1326 assertInvalid(f, b'\x00' * 3) 1327 assertInvalid(f, b'\x00' * 5) 1328 assertInvalid(f, b'\x00' * 16) 1329 self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) 1330 1331 self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) 1332 self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) 1333 self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) 1334 assertInvalid(g, b'\x00' * 3) 1335 assertInvalid(g, b'\x00' * 5) 1336 assertInvalid(g, b'\x00' * 16) 1337 self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) 1338 1339 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1340 'test needs socket.inet_ntop()') 1341 def testStringToIPv6(self): 1342 try: 1343 from socket import inet_ntop, AF_INET6, has_ipv6 1344 if not has_ipv6: 1345 self.skipTest('IPv6 not available') 1346 except ImportError: 1347 self.skipTest('could not import needed symbols from socket') 1348 1349 if sys.platform == "win32": 1350 try: 1351 inet_ntop(AF_INET6, b'\x00' * 16) 1352 except OSError as e: 1353 if e.winerror == 10022: 1354 self.skipTest('IPv6 might not be supported') 1355 1356 f = lambda a: inet_ntop(AF_INET6, a) 1357 assertInvalid = lambda a: self.assertRaises( 1358 (OSError, ValueError), f, a 1359 ) 1360 1361 self.assertEqual('::', f(b'\x00' * 16)) 1362 self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) 1363 self.assertEqual( 1364 'aef:b01:506:1001:ffff:9997:55:170', 1365 f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 1366 ) 1367 self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) 1368 1369 assertInvalid(b'\x12' * 15) 1370 assertInvalid(b'\x12' * 17) 1371 assertInvalid(b'\x12' * 4) 1372 1373 # XXX The following don't test module-level functionality... 1374 1375 def testSockName(self): 1376 # Testing getsockname() 1377 port = socket_helper.find_unused_port() 1378 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1379 self.addCleanup(sock.close) 1380 sock.bind(("0.0.0.0", port)) 1381 name = sock.getsockname() 1382 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 1383 # it reasonable to get the host's addr in addition to 0.0.0.0. 1384 # At least for eCos. This is required for the S/390 to pass. 1385 try: 1386 my_ip_addr = socket.gethostbyname(socket.gethostname()) 1387 except OSError: 1388 # Probably name lookup wasn't set up right; skip this test 1389 self.skipTest('name lookup failure') 1390 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 1391 self.assertEqual(name[1], port) 1392 1393 def testGetSockOpt(self): 1394 # Testing getsockopt() 1395 # We know a socket should start without reuse==0 1396 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1397 self.addCleanup(sock.close) 1398 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1399 self.assertFalse(reuse != 0, "initial mode is reuse") 1400 1401 def testSetSockOpt(self): 1402 # Testing setsockopt() 1403 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1404 self.addCleanup(sock.close) 1405 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1406 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1407 self.assertFalse(reuse == 0, "failed to set reuse mode") 1408 1409 def testSendAfterClose(self): 1410 # testing send() after close() with timeout 1411 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1412 sock.settimeout(1) 1413 self.assertRaises(OSError, sock.send, b"spam") 1414 1415 def testCloseException(self): 1416 sock = socket.socket() 1417 sock.bind((socket._LOCALHOST, 0)) 1418 socket.socket(fileno=sock.fileno()).close() 1419 try: 1420 sock.close() 1421 except OSError as err: 1422 # Winsock apparently raises ENOTSOCK 1423 self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) 1424 else: 1425 self.fail("close() should raise EBADF/ENOTSOCK") 1426 1427 def testNewAttributes(self): 1428 # testing .family, .type and .protocol 1429 1430 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1431 self.assertEqual(sock.family, socket.AF_INET) 1432 if hasattr(socket, 'SOCK_CLOEXEC'): 1433 self.assertIn(sock.type, 1434 (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, 1435 socket.SOCK_STREAM)) 1436 else: 1437 self.assertEqual(sock.type, socket.SOCK_STREAM) 1438 self.assertEqual(sock.proto, 0) 1439 1440 def test_getsockaddrarg(self): 1441 sock = socket.socket() 1442 self.addCleanup(sock.close) 1443 port = socket_helper.find_unused_port() 1444 big_port = port + 65536 1445 neg_port = port - 65536 1446 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 1447 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 1448 # Since find_unused_port() is inherently subject to race conditions, we 1449 # call it a couple times if necessary. 1450 for i in itertools.count(): 1451 port = socket_helper.find_unused_port() 1452 try: 1453 sock.bind((HOST, port)) 1454 except OSError as e: 1455 if e.errno != errno.EADDRINUSE or i == 5: 1456 raise 1457 else: 1458 break 1459 1460 @unittest.skipUnless(os.name == "nt", "Windows specific") 1461 def test_sock_ioctl(self): 1462 self.assertTrue(hasattr(socket.socket, 'ioctl')) 1463 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 1464 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 1465 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 1466 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 1467 s = socket.socket() 1468 self.addCleanup(s.close) 1469 self.assertRaises(ValueError, s.ioctl, -1, None) 1470 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 1471 1472 @unittest.skipUnless(os.name == "nt", "Windows specific") 1473 @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), 1474 'Loopback fast path support required for this test') 1475 def test_sio_loopback_fast_path(self): 1476 s = socket.socket() 1477 self.addCleanup(s.close) 1478 try: 1479 s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) 1480 except OSError as exc: 1481 WSAEOPNOTSUPP = 10045 1482 if exc.winerror == WSAEOPNOTSUPP: 1483 self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " 1484 "doesn't implemented in this Windows version") 1485 raise 1486 self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) 1487 1488 def testGetaddrinfo(self): 1489 try: 1490 socket.getaddrinfo('localhost', 80) 1491 except socket.gaierror as err: 1492 if err.errno == socket.EAI_SERVICE: 1493 # see http://bugs.python.org/issue1282647 1494 self.skipTest("buggy libc version") 1495 raise 1496 # len of every sequence is supposed to be == 5 1497 for info in socket.getaddrinfo(HOST, None): 1498 self.assertEqual(len(info), 5) 1499 # host can be a domain name, a string representation of an 1500 # IPv4/v6 address or None 1501 socket.getaddrinfo('localhost', 80) 1502 socket.getaddrinfo('127.0.0.1', 80) 1503 socket.getaddrinfo(None, 80) 1504 if socket_helper.IPV6_ENABLED: 1505 socket.getaddrinfo('::1', 80) 1506 # port can be a string service name such as "http", a numeric 1507 # port number or None 1508 # Issue #26936: Android getaddrinfo() was broken before API level 23. 1509 if (not hasattr(sys, 'getandroidapilevel') or 1510 sys.getandroidapilevel() >= 23): 1511 socket.getaddrinfo(HOST, "http") 1512 socket.getaddrinfo(HOST, 80) 1513 socket.getaddrinfo(HOST, None) 1514 # test family and socktype filters 1515 infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) 1516 for family, type, _, _, _ in infos: 1517 self.assertEqual(family, socket.AF_INET) 1518 self.assertEqual(str(family), 'AddressFamily.AF_INET') 1519 self.assertEqual(type, socket.SOCK_STREAM) 1520 self.assertEqual(str(type), 'SocketKind.SOCK_STREAM') 1521 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1522 for _, socktype, _, _, _ in infos: 1523 self.assertEqual(socktype, socket.SOCK_STREAM) 1524 # test proto and flags arguments 1525 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1526 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1527 # a server willing to support both IPv4 and IPv6 will 1528 # usually do this 1529 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1530 socket.AI_PASSIVE) 1531 # test keyword arguments 1532 a = socket.getaddrinfo(HOST, None) 1533 b = socket.getaddrinfo(host=HOST, port=None) 1534 self.assertEqual(a, b) 1535 a = socket.getaddrinfo(HOST, None, socket.AF_INET) 1536 b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) 1537 self.assertEqual(a, b) 1538 a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1539 b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) 1540 self.assertEqual(a, b) 1541 a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1542 b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) 1543 self.assertEqual(a, b) 1544 a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1545 b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) 1546 self.assertEqual(a, b) 1547 a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1548 socket.AI_PASSIVE) 1549 b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, 1550 type=socket.SOCK_STREAM, proto=0, 1551 flags=socket.AI_PASSIVE) 1552 self.assertEqual(a, b) 1553 # Issue #6697. 1554 self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') 1555 1556 # Issue 17269: test workaround for OS X platform bug segfault 1557 if hasattr(socket, 'AI_NUMERICSERV'): 1558 try: 1559 # The arguments here are undefined and the call may succeed 1560 # or fail. All we care here is that it doesn't segfault. 1561 socket.getaddrinfo("localhost", None, 0, 0, 0, 1562 socket.AI_NUMERICSERV) 1563 except socket.gaierror: 1564 pass 1565 1566 def test_getnameinfo(self): 1567 # only IP addresses are allowed 1568 self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) 1569 1570 @unittest.skipUnless(support.is_resource_enabled('network'), 1571 'network is not enabled') 1572 def test_idna(self): 1573 # Check for internet access before running test 1574 # (issue #12804, issue #25138). 1575 with socket_helper.transient_internet('python.org'): 1576 socket.gethostbyname('python.org') 1577 1578 # these should all be successful 1579 domain = 'испытание.pythontest.net' 1580 socket.gethostbyname(domain) 1581 socket.gethostbyname_ex(domain) 1582 socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) 1583 # this may not work if the forward lookup chooses the IPv6 address, as that doesn't 1584 # have a reverse entry yet 1585 # socket.gethostbyaddr('испытание.python.org') 1586 1587 def check_sendall_interrupted(self, with_timeout): 1588 # socketpair() is not strictly required, but it makes things easier. 1589 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 1590 self.skipTest("signal.alarm and socket.socketpair required for this test") 1591 # Our signal handlers clobber the C errno by calling a math function 1592 # with an invalid domain value. 1593 def ok_handler(*args): 1594 self.assertRaises(ValueError, math.acosh, 0) 1595 def raising_handler(*args): 1596 self.assertRaises(ValueError, math.acosh, 0) 1597 1 // 0 1598 c, s = socket.socketpair() 1599 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 1600 try: 1601 if with_timeout: 1602 # Just above the one second minimum for signal.alarm 1603 c.settimeout(1.5) 1604 with self.assertRaises(ZeroDivisionError): 1605 signal.alarm(1) 1606 c.sendall(b"x" * support.SOCK_MAX_SIZE) 1607 if with_timeout: 1608 signal.signal(signal.SIGALRM, ok_handler) 1609 signal.alarm(1) 1610 self.assertRaises(TimeoutError, c.sendall, 1611 b"x" * support.SOCK_MAX_SIZE) 1612 finally: 1613 signal.alarm(0) 1614 signal.signal(signal.SIGALRM, old_alarm) 1615 c.close() 1616 s.close() 1617 1618 def test_sendall_interrupted(self): 1619 self.check_sendall_interrupted(False) 1620 1621 def test_sendall_interrupted_with_timeout(self): 1622 self.check_sendall_interrupted(True) 1623 1624 def test_dealloc_warn(self): 1625 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1626 r = repr(sock) 1627 with self.assertWarns(ResourceWarning) as cm: 1628 sock = None 1629 support.gc_collect() 1630 self.assertIn(r, str(cm.warning.args[0])) 1631 # An open socket file object gets dereferenced after the socket 1632 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1633 f = sock.makefile('rb') 1634 r = repr(sock) 1635 sock = None 1636 support.gc_collect() 1637 with self.assertWarns(ResourceWarning): 1638 f = None 1639 support.gc_collect() 1640 1641 def test_name_closed_socketio(self): 1642 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1643 fp = sock.makefile("rb") 1644 fp.close() 1645 self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") 1646 1647 def test_unusable_closed_socketio(self): 1648 with socket.socket() as sock: 1649 fp = sock.makefile("rb", buffering=0) 1650 self.assertTrue(fp.readable()) 1651 self.assertFalse(fp.writable()) 1652 self.assertFalse(fp.seekable()) 1653 fp.close() 1654 self.assertRaises(ValueError, fp.readable) 1655 self.assertRaises(ValueError, fp.writable) 1656 self.assertRaises(ValueError, fp.seekable) 1657 1658 def test_socket_close(self): 1659 sock = socket.socket() 1660 try: 1661 sock.bind((HOST, 0)) 1662 socket.close(sock.fileno()) 1663 with self.assertRaises(OSError): 1664 sock.listen(1) 1665 finally: 1666 with self.assertRaises(OSError): 1667 # sock.close() fails with EBADF 1668 sock.close() 1669 with self.assertRaises(TypeError): 1670 socket.close(None) 1671 with self.assertRaises(OSError): 1672 socket.close(-1) 1673 1674 def test_makefile_mode(self): 1675 for mode in 'r', 'rb', 'rw', 'w', 'wb': 1676 with self.subTest(mode=mode): 1677 with socket.socket() as sock: 1678 encoding = None if "b" in mode else "utf-8" 1679 with sock.makefile(mode, encoding=encoding) as fp: 1680 self.assertEqual(fp.mode, mode) 1681 1682 def test_makefile_invalid_mode(self): 1683 for mode in 'rt', 'x', '+', 'a': 1684 with self.subTest(mode=mode): 1685 with socket.socket() as sock: 1686 with self.assertRaisesRegex(ValueError, 'invalid mode'): 1687 sock.makefile(mode) 1688 1689 def test_pickle(self): 1690 sock = socket.socket() 1691 with sock: 1692 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1693 self.assertRaises(TypeError, pickle.dumps, sock, protocol) 1694 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1695 family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) 1696 self.assertEqual(family, socket.AF_INET) 1697 type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) 1698 self.assertEqual(type, socket.SOCK_STREAM) 1699 1700 def test_listen_backlog(self): 1701 for backlog in 0, -1: 1702 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1703 srv.bind((HOST, 0)) 1704 srv.listen(backlog) 1705 1706 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1707 srv.bind((HOST, 0)) 1708 srv.listen() 1709 1710 @support.cpython_only 1711 def test_listen_backlog_overflow(self): 1712 # Issue 15989 1713 import _testcapi 1714 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1715 srv.bind((HOST, 0)) 1716 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 1717 1718 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1719 def test_flowinfo(self): 1720 self.assertRaises(OverflowError, socket.getnameinfo, 1721 (socket_helper.HOSTv6, 0, 0xffffffff), 0) 1722 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 1723 self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10)) 1724 1725 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1726 def test_getaddrinfo_ipv6_basic(self): 1727 ((*_, sockaddr),) = socket.getaddrinfo( 1728 'ff02::1de:c0:face:8D', # Note capital letter `D`. 1729 1234, socket.AF_INET6, 1730 socket.SOCK_DGRAM, 1731 socket.IPPROTO_UDP 1732 ) 1733 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) 1734 1735 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1736 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1737 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1738 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") 1739 def test_getaddrinfo_ipv6_scopeid_symbolic(self): 1740 # Just pick up any network interface (Linux, Mac OS X) 1741 (ifindex, test_interface) = socket.if_nameindex()[0] 1742 ((*_, sockaddr),) = socket.getaddrinfo( 1743 'ff02::1de:c0:face:8D%' + test_interface, 1744 1234, socket.AF_INET6, 1745 socket.SOCK_DGRAM, 1746 socket.IPPROTO_UDP 1747 ) 1748 # Note missing interface name part in IPv6 address 1749 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1750 1751 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1752 @unittest.skipUnless( 1753 sys.platform == 'win32', 1754 'Numeric scope id does not work or undocumented') 1755 def test_getaddrinfo_ipv6_scopeid_numeric(self): 1756 # Also works on Linux and Mac OS X, but is not documented (?) 1757 # Windows, Linux and Max OS X allow nonexistent interface numbers here. 1758 ifindex = 42 1759 ((*_, sockaddr),) = socket.getaddrinfo( 1760 'ff02::1de:c0:face:8D%' + str(ifindex), 1761 1234, socket.AF_INET6, 1762 socket.SOCK_DGRAM, 1763 socket.IPPROTO_UDP 1764 ) 1765 # Note missing interface name part in IPv6 address 1766 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1767 1768 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1769 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1770 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1771 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") 1772 def test_getnameinfo_ipv6_scopeid_symbolic(self): 1773 # Just pick up any network interface. 1774 (ifindex, test_interface) = socket.if_nameindex()[0] 1775 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1776 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1777 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) 1778 1779 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1780 @unittest.skipUnless( sys.platform == 'win32', 1781 'Numeric scope id does not work or undocumented') 1782 def test_getnameinfo_ipv6_scopeid_numeric(self): 1783 # Also works on Linux (undocumented), but does not work on Mac OS X 1784 # Windows and Linux allow nonexistent interface numbers here. 1785 ifindex = 42 1786 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1787 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1788 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234')) 1789 1790 def test_str_for_enums(self): 1791 # Make sure that the AF_* and SOCK_* constants have enum-like string 1792 # reprs. 1793 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1794 self.assertEqual(str(s.family), 'AddressFamily.AF_INET') 1795 self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM') 1796 1797 def test_socket_consistent_sock_type(self): 1798 SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) 1799 SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0) 1800 sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC 1801 1802 with socket.socket(socket.AF_INET, sock_type) as s: 1803 self.assertEqual(s.type, socket.SOCK_STREAM) 1804 s.settimeout(1) 1805 self.assertEqual(s.type, socket.SOCK_STREAM) 1806 s.settimeout(0) 1807 self.assertEqual(s.type, socket.SOCK_STREAM) 1808 s.setblocking(True) 1809 self.assertEqual(s.type, socket.SOCK_STREAM) 1810 s.setblocking(False) 1811 self.assertEqual(s.type, socket.SOCK_STREAM) 1812 1813 def test_unknown_socket_family_repr(self): 1814 # Test that when created with a family that's not one of the known 1815 # AF_*/SOCK_* constants, socket.family just returns the number. 1816 # 1817 # To do this we fool socket.socket into believing it already has an 1818 # open fd because on this path it doesn't actually verify the family and 1819 # type and populates the socket object. 1820 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1821 fd = sock.detach() 1822 unknown_family = max(socket.AddressFamily.__members__.values()) + 1 1823 1824 unknown_type = max( 1825 kind 1826 for name, kind in socket.SocketKind.__members__.items() 1827 if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'} 1828 ) + 1 1829 1830 with socket.socket( 1831 family=unknown_family, type=unknown_type, proto=23, 1832 fileno=fd) as s: 1833 self.assertEqual(s.family, unknown_family) 1834 self.assertEqual(s.type, unknown_type) 1835 # some OS like macOS ignore proto 1836 self.assertIn(s.proto, {0, 23}) 1837 1838 @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') 1839 def test__sendfile_use_sendfile(self): 1840 class File: 1841 def __init__(self, fd): 1842 self.fd = fd 1843 1844 def fileno(self): 1845 return self.fd 1846 with socket.socket() as sock: 1847 fd = os.open(os.curdir, os.O_RDONLY) 1848 os.close(fd) 1849 with self.assertRaises(socket._GiveupOnSendfile): 1850 sock._sendfile_use_sendfile(File(fd)) 1851 with self.assertRaises(OverflowError): 1852 sock._sendfile_use_sendfile(File(2**1000)) 1853 with self.assertRaises(TypeError): 1854 sock._sendfile_use_sendfile(File(None)) 1855 1856 def _test_socket_fileno(self, s, family, stype): 1857 self.assertEqual(s.family, family) 1858 self.assertEqual(s.type, stype) 1859 1860 fd = s.fileno() 1861 s2 = socket.socket(fileno=fd) 1862 self.addCleanup(s2.close) 1863 # detach old fd to avoid double close 1864 s.detach() 1865 self.assertEqual(s2.family, family) 1866 self.assertEqual(s2.type, stype) 1867 self.assertEqual(s2.fileno(), fd) 1868 1869 def test_socket_fileno(self): 1870 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1871 self.addCleanup(s.close) 1872 s.bind((socket_helper.HOST, 0)) 1873 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) 1874 1875 if hasattr(socket, "SOCK_DGRAM"): 1876 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1877 self.addCleanup(s.close) 1878 s.bind((socket_helper.HOST, 0)) 1879 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) 1880 1881 if socket_helper.IPV6_ENABLED: 1882 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 1883 self.addCleanup(s.close) 1884 s.bind((socket_helper.HOSTv6, 0, 0, 0)) 1885 self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) 1886 1887 if hasattr(socket, "AF_UNIX"): 1888 tmpdir = tempfile.mkdtemp() 1889 self.addCleanup(shutil.rmtree, tmpdir) 1890 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1891 self.addCleanup(s.close) 1892 try: 1893 s.bind(os.path.join(tmpdir, 'socket')) 1894 except PermissionError: 1895 pass 1896 else: 1897 self._test_socket_fileno(s, socket.AF_UNIX, 1898 socket.SOCK_STREAM) 1899 1900 def test_socket_fileno_rejects_float(self): 1901 with self.assertRaises(TypeError): 1902 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5) 1903 1904 def test_socket_fileno_rejects_other_types(self): 1905 with self.assertRaises(TypeError): 1906 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo") 1907 1908 def test_socket_fileno_rejects_invalid_socket(self): 1909 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1910 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1) 1911 1912 @unittest.skipIf(os.name == "nt", "Windows disallows -1 only") 1913 def test_socket_fileno_rejects_negative(self): 1914 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1915 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42) 1916 1917 def test_socket_fileno_requires_valid_fd(self): 1918 WSAENOTSOCK = 10038 1919 with self.assertRaises(OSError) as cm: 1920 socket.socket(fileno=os_helper.make_bad_fd()) 1921 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1922 1923 with self.assertRaises(OSError) as cm: 1924 socket.socket( 1925 socket.AF_INET, 1926 socket.SOCK_STREAM, 1927 fileno=os_helper.make_bad_fd()) 1928 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1929 1930 def test_socket_fileno_requires_socket_fd(self): 1931 with tempfile.NamedTemporaryFile() as afile: 1932 with self.assertRaises(OSError): 1933 socket.socket(fileno=afile.fileno()) 1934 1935 with self.assertRaises(OSError) as cm: 1936 socket.socket( 1937 socket.AF_INET, 1938 socket.SOCK_STREAM, 1939 fileno=afile.fileno()) 1940 self.assertEqual(cm.exception.errno, errno.ENOTSOCK) 1941 1942 1943@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1944class BasicCANTest(unittest.TestCase): 1945 1946 def testCrucialConstants(self): 1947 socket.AF_CAN 1948 socket.PF_CAN 1949 socket.CAN_RAW 1950 1951 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1952 'socket.CAN_BCM required for this test.') 1953 def testBCMConstants(self): 1954 socket.CAN_BCM 1955 1956 # opcodes 1957 socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task 1958 socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task 1959 socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task 1960 socket.CAN_BCM_TX_SEND # send one CAN frame 1961 socket.CAN_BCM_RX_SETUP # create RX content filter subscription 1962 socket.CAN_BCM_RX_DELETE # remove RX content filter subscription 1963 socket.CAN_BCM_RX_READ # read properties of RX content filter subscription 1964 socket.CAN_BCM_TX_STATUS # reply to TX_READ request 1965 socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) 1966 socket.CAN_BCM_RX_STATUS # reply to RX_READ request 1967 socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent 1968 socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) 1969 1970 # flags 1971 socket.CAN_BCM_SETTIMER 1972 socket.CAN_BCM_STARTTIMER 1973 socket.CAN_BCM_TX_COUNTEVT 1974 socket.CAN_BCM_TX_ANNOUNCE 1975 socket.CAN_BCM_TX_CP_CAN_ID 1976 socket.CAN_BCM_RX_FILTER_ID 1977 socket.CAN_BCM_RX_CHECK_DLC 1978 socket.CAN_BCM_RX_NO_AUTOTIMER 1979 socket.CAN_BCM_RX_ANNOUNCE_RESUME 1980 socket.CAN_BCM_TX_RESET_MULTI_IDX 1981 socket.CAN_BCM_RX_RTR_FRAME 1982 1983 def testCreateSocket(self): 1984 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1985 pass 1986 1987 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1988 'socket.CAN_BCM required for this test.') 1989 def testCreateBCMSocket(self): 1990 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: 1991 pass 1992 1993 def testBindAny(self): 1994 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1995 address = ('', ) 1996 s.bind(address) 1997 self.assertEqual(s.getsockname(), address) 1998 1999 def testTooLongInterfaceName(self): 2000 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2001 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2002 self.assertRaisesRegex(OSError, 'interface name too long', 2003 s.bind, ('x' * 1024,)) 2004 2005 @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), 2006 'socket.CAN_RAW_LOOPBACK required for this test.') 2007 def testLoopback(self): 2008 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2009 for loopback in (0, 1): 2010 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, 2011 loopback) 2012 self.assertEqual(loopback, 2013 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) 2014 2015 @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), 2016 'socket.CAN_RAW_FILTER required for this test.') 2017 def testFilter(self): 2018 can_id, can_mask = 0x200, 0x700 2019 can_filter = struct.pack("=II", can_id, can_mask) 2020 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2021 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) 2022 self.assertEqual(can_filter, 2023 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) 2024 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) 2025 2026 2027@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 2028class CANTest(ThreadedCANSocketTest): 2029 2030 def __init__(self, methodName='runTest'): 2031 ThreadedCANSocketTest.__init__(self, methodName=methodName) 2032 2033 @classmethod 2034 def build_can_frame(cls, can_id, data): 2035 """Build a CAN frame.""" 2036 can_dlc = len(data) 2037 data = data.ljust(8, b'\x00') 2038 return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) 2039 2040 @classmethod 2041 def dissect_can_frame(cls, frame): 2042 """Dissect a CAN frame.""" 2043 can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) 2044 return (can_id, can_dlc, data[:can_dlc]) 2045 2046 def testSendFrame(self): 2047 cf, addr = self.s.recvfrom(self.bufsize) 2048 self.assertEqual(self.cf, cf) 2049 self.assertEqual(addr[0], self.interface) 2050 2051 def _testSendFrame(self): 2052 self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') 2053 self.cli.send(self.cf) 2054 2055 def testSendMaxFrame(self): 2056 cf, addr = self.s.recvfrom(self.bufsize) 2057 self.assertEqual(self.cf, cf) 2058 2059 def _testSendMaxFrame(self): 2060 self.cf = self.build_can_frame(0x00, b'\x07' * 8) 2061 self.cli.send(self.cf) 2062 2063 def testSendMultiFrames(self): 2064 cf, addr = self.s.recvfrom(self.bufsize) 2065 self.assertEqual(self.cf1, cf) 2066 2067 cf, addr = self.s.recvfrom(self.bufsize) 2068 self.assertEqual(self.cf2, cf) 2069 2070 def _testSendMultiFrames(self): 2071 self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') 2072 self.cli.send(self.cf1) 2073 2074 self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') 2075 self.cli.send(self.cf2) 2076 2077 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2078 'socket.CAN_BCM required for this test.') 2079 def _testBCM(self): 2080 cf, addr = self.cli.recvfrom(self.bufsize) 2081 self.assertEqual(self.cf, cf) 2082 can_id, can_dlc, data = self.dissect_can_frame(cf) 2083 self.assertEqual(self.can_id, can_id) 2084 self.assertEqual(self.data, data) 2085 2086 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2087 'socket.CAN_BCM required for this test.') 2088 def testBCM(self): 2089 bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) 2090 self.addCleanup(bcm.close) 2091 bcm.connect((self.interface,)) 2092 self.can_id = 0x123 2093 self.data = bytes([0xc0, 0xff, 0xee]) 2094 self.cf = self.build_can_frame(self.can_id, self.data) 2095 opcode = socket.CAN_BCM_TX_SEND 2096 flags = 0 2097 count = 0 2098 ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 2099 bcm_can_id = 0x0222 2100 nframes = 1 2101 assert len(self.cf) == 16 2102 header = struct.pack(self.bcm_cmd_msg_fmt, 2103 opcode, 2104 flags, 2105 count, 2106 ival1_seconds, 2107 ival1_usec, 2108 ival2_seconds, 2109 ival2_usec, 2110 bcm_can_id, 2111 nframes, 2112 ) 2113 header_plus_frame = header + self.cf 2114 bytes_sent = bcm.send(header_plus_frame) 2115 self.assertEqual(bytes_sent, len(header_plus_frame)) 2116 2117 2118@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.') 2119class ISOTPTest(unittest.TestCase): 2120 2121 def __init__(self, *args, **kwargs): 2122 super().__init__(*args, **kwargs) 2123 self.interface = "vcan0" 2124 2125 def testCrucialConstants(self): 2126 socket.AF_CAN 2127 socket.PF_CAN 2128 socket.CAN_ISOTP 2129 socket.SOCK_DGRAM 2130 2131 def testCreateSocket(self): 2132 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2133 pass 2134 2135 @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"), 2136 'socket.CAN_ISOTP required for this test.') 2137 def testCreateISOTPSocket(self): 2138 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2139 pass 2140 2141 def testTooLongInterfaceName(self): 2142 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2143 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2144 with self.assertRaisesRegex(OSError, 'interface name too long'): 2145 s.bind(('x' * 1024, 1, 2)) 2146 2147 def testBind(self): 2148 try: 2149 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2150 addr = self.interface, 0x123, 0x456 2151 s.bind(addr) 2152 self.assertEqual(s.getsockname(), addr) 2153 except OSError as e: 2154 if e.errno == errno.ENODEV: 2155 self.skipTest('network interface `%s` does not exist' % 2156 self.interface) 2157 else: 2158 raise 2159 2160 2161@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.') 2162class J1939Test(unittest.TestCase): 2163 2164 def __init__(self, *args, **kwargs): 2165 super().__init__(*args, **kwargs) 2166 self.interface = "vcan0" 2167 2168 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2169 'socket.CAN_J1939 required for this test.') 2170 def testJ1939Constants(self): 2171 socket.CAN_J1939 2172 2173 socket.J1939_MAX_UNICAST_ADDR 2174 socket.J1939_IDLE_ADDR 2175 socket.J1939_NO_ADDR 2176 socket.J1939_NO_NAME 2177 socket.J1939_PGN_REQUEST 2178 socket.J1939_PGN_ADDRESS_CLAIMED 2179 socket.J1939_PGN_ADDRESS_COMMANDED 2180 socket.J1939_PGN_PDU1_MAX 2181 socket.J1939_PGN_MAX 2182 socket.J1939_NO_PGN 2183 2184 # J1939 socket options 2185 socket.SO_J1939_FILTER 2186 socket.SO_J1939_PROMISC 2187 socket.SO_J1939_SEND_PRIO 2188 socket.SO_J1939_ERRQUEUE 2189 2190 socket.SCM_J1939_DEST_ADDR 2191 socket.SCM_J1939_DEST_NAME 2192 socket.SCM_J1939_PRIO 2193 socket.SCM_J1939_ERRQUEUE 2194 2195 socket.J1939_NLA_PAD 2196 socket.J1939_NLA_BYTES_ACKED 2197 2198 socket.J1939_EE_INFO_NONE 2199 socket.J1939_EE_INFO_TX_ABORT 2200 2201 socket.J1939_FILTER_MAX 2202 2203 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2204 'socket.CAN_J1939 required for this test.') 2205 def testCreateJ1939Socket(self): 2206 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2207 pass 2208 2209 def testBind(self): 2210 try: 2211 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2212 addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR 2213 s.bind(addr) 2214 self.assertEqual(s.getsockname(), addr) 2215 except OSError as e: 2216 if e.errno == errno.ENODEV: 2217 self.skipTest('network interface `%s` does not exist' % 2218 self.interface) 2219 else: 2220 raise 2221 2222 2223@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2224class BasicRDSTest(unittest.TestCase): 2225 2226 def testCrucialConstants(self): 2227 socket.AF_RDS 2228 socket.PF_RDS 2229 2230 def testCreateSocket(self): 2231 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2232 pass 2233 2234 def testSocketBufferSize(self): 2235 bufsize = 16384 2236 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2237 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) 2238 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) 2239 2240 2241@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2242class RDSTest(ThreadedRDSSocketTest): 2243 2244 def __init__(self, methodName='runTest'): 2245 ThreadedRDSSocketTest.__init__(self, methodName=methodName) 2246 2247 def setUp(self): 2248 super().setUp() 2249 self.evt = threading.Event() 2250 2251 def testSendAndRecv(self): 2252 data, addr = self.serv.recvfrom(self.bufsize) 2253 self.assertEqual(self.data, data) 2254 self.assertEqual(self.cli_addr, addr) 2255 2256 def _testSendAndRecv(self): 2257 self.data = b'spam' 2258 self.cli.sendto(self.data, 0, (HOST, self.port)) 2259 2260 def testPeek(self): 2261 data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) 2262 self.assertEqual(self.data, data) 2263 data, addr = self.serv.recvfrom(self.bufsize) 2264 self.assertEqual(self.data, data) 2265 2266 def _testPeek(self): 2267 self.data = b'spam' 2268 self.cli.sendto(self.data, 0, (HOST, self.port)) 2269 2270 @requireAttrs(socket.socket, 'recvmsg') 2271 def testSendAndRecvMsg(self): 2272 data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) 2273 self.assertEqual(self.data, data) 2274 2275 @requireAttrs(socket.socket, 'sendmsg') 2276 def _testSendAndRecvMsg(self): 2277 self.data = b'hello ' * 10 2278 self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) 2279 2280 def testSendAndRecvMulti(self): 2281 data, addr = self.serv.recvfrom(self.bufsize) 2282 self.assertEqual(self.data1, data) 2283 2284 data, addr = self.serv.recvfrom(self.bufsize) 2285 self.assertEqual(self.data2, data) 2286 2287 def _testSendAndRecvMulti(self): 2288 self.data1 = b'bacon' 2289 self.cli.sendto(self.data1, 0, (HOST, self.port)) 2290 2291 self.data2 = b'egg' 2292 self.cli.sendto(self.data2, 0, (HOST, self.port)) 2293 2294 def testSelect(self): 2295 r, w, x = select.select([self.serv], [], [], 3.0) 2296 self.assertIn(self.serv, r) 2297 data, addr = self.serv.recvfrom(self.bufsize) 2298 self.assertEqual(self.data, data) 2299 2300 def _testSelect(self): 2301 self.data = b'select' 2302 self.cli.sendto(self.data, 0, (HOST, self.port)) 2303 2304@unittest.skipUnless(HAVE_SOCKET_QIPCRTR, 2305 'QIPCRTR sockets required for this test.') 2306class BasicQIPCRTRTest(unittest.TestCase): 2307 2308 def testCrucialConstants(self): 2309 socket.AF_QIPCRTR 2310 2311 def testCreateSocket(self): 2312 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2313 pass 2314 2315 def testUnbound(self): 2316 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2317 self.assertEqual(s.getsockname()[1], 0) 2318 2319 def testBindSock(self): 2320 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2321 socket_helper.bind_port(s, host=s.getsockname()[0]) 2322 self.assertNotEqual(s.getsockname()[1], 0) 2323 2324 def testInvalidBindSock(self): 2325 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2326 self.assertRaises(OSError, socket_helper.bind_port, s, host=-2) 2327 2328 def testAutoBindSock(self): 2329 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2330 s.connect((123, 123)) 2331 self.assertNotEqual(s.getsockname()[1], 0) 2332 2333@unittest.skipIf(fcntl is None, "need fcntl") 2334@unittest.skipUnless(HAVE_SOCKET_VSOCK, 2335 'VSOCK sockets required for this test.') 2336class BasicVSOCKTest(unittest.TestCase): 2337 2338 def testCrucialConstants(self): 2339 socket.AF_VSOCK 2340 2341 def testVSOCKConstants(self): 2342 socket.SO_VM_SOCKETS_BUFFER_SIZE 2343 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE 2344 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE 2345 socket.VMADDR_CID_ANY 2346 socket.VMADDR_PORT_ANY 2347 socket.VMADDR_CID_HOST 2348 socket.VM_SOCKETS_INVALID_VERSION 2349 socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID 2350 2351 def testCreateSocket(self): 2352 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2353 pass 2354 2355 def testSocketBufferSize(self): 2356 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2357 orig_max = s.getsockopt(socket.AF_VSOCK, 2358 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE) 2359 orig = s.getsockopt(socket.AF_VSOCK, 2360 socket.SO_VM_SOCKETS_BUFFER_SIZE) 2361 orig_min = s.getsockopt(socket.AF_VSOCK, 2362 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE) 2363 2364 s.setsockopt(socket.AF_VSOCK, 2365 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2) 2366 s.setsockopt(socket.AF_VSOCK, 2367 socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2) 2368 s.setsockopt(socket.AF_VSOCK, 2369 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2) 2370 2371 self.assertEqual(orig_max * 2, 2372 s.getsockopt(socket.AF_VSOCK, 2373 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)) 2374 self.assertEqual(orig * 2, 2375 s.getsockopt(socket.AF_VSOCK, 2376 socket.SO_VM_SOCKETS_BUFFER_SIZE)) 2377 self.assertEqual(orig_min * 2, 2378 s.getsockopt(socket.AF_VSOCK, 2379 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)) 2380 2381 2382@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH, 2383 'Bluetooth sockets required for this test.') 2384class BasicBluetoothTest(unittest.TestCase): 2385 2386 def testBluetoothConstants(self): 2387 socket.BDADDR_ANY 2388 socket.BDADDR_LOCAL 2389 socket.AF_BLUETOOTH 2390 socket.BTPROTO_RFCOMM 2391 2392 if sys.platform != "win32": 2393 socket.BTPROTO_HCI 2394 socket.SOL_HCI 2395 socket.BTPROTO_L2CAP 2396 2397 if not sys.platform.startswith("freebsd"): 2398 socket.BTPROTO_SCO 2399 2400 def testCreateRfcommSocket(self): 2401 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: 2402 pass 2403 2404 @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets") 2405 def testCreateL2capSocket(self): 2406 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s: 2407 pass 2408 2409 @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets") 2410 def testCreateHciSocket(self): 2411 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: 2412 pass 2413 2414 @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"), 2415 "windows and freebsd do not support SCO sockets") 2416 def testCreateScoSocket(self): 2417 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: 2418 pass 2419 2420 2421class BasicTCPTest(SocketConnectedTest): 2422 2423 def __init__(self, methodName='runTest'): 2424 SocketConnectedTest.__init__(self, methodName=methodName) 2425 2426 def testRecv(self): 2427 # Testing large receive over TCP 2428 msg = self.cli_conn.recv(1024) 2429 self.assertEqual(msg, MSG) 2430 2431 def _testRecv(self): 2432 self.serv_conn.send(MSG) 2433 2434 def testOverFlowRecv(self): 2435 # Testing receive in chunks over TCP 2436 seg1 = self.cli_conn.recv(len(MSG) - 3) 2437 seg2 = self.cli_conn.recv(1024) 2438 msg = seg1 + seg2 2439 self.assertEqual(msg, MSG) 2440 2441 def _testOverFlowRecv(self): 2442 self.serv_conn.send(MSG) 2443 2444 def testRecvFrom(self): 2445 # Testing large recvfrom() over TCP 2446 msg, addr = self.cli_conn.recvfrom(1024) 2447 self.assertEqual(msg, MSG) 2448 2449 def _testRecvFrom(self): 2450 self.serv_conn.send(MSG) 2451 2452 def testOverFlowRecvFrom(self): 2453 # Testing recvfrom() in chunks over TCP 2454 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 2455 seg2, addr = self.cli_conn.recvfrom(1024) 2456 msg = seg1 + seg2 2457 self.assertEqual(msg, MSG) 2458 2459 def _testOverFlowRecvFrom(self): 2460 self.serv_conn.send(MSG) 2461 2462 def testSendAll(self): 2463 # Testing sendall() with a 2048 byte string over TCP 2464 msg = b'' 2465 while 1: 2466 read = self.cli_conn.recv(1024) 2467 if not read: 2468 break 2469 msg += read 2470 self.assertEqual(msg, b'f' * 2048) 2471 2472 def _testSendAll(self): 2473 big_chunk = b'f' * 2048 2474 self.serv_conn.sendall(big_chunk) 2475 2476 def testFromFd(self): 2477 # Testing fromfd() 2478 fd = self.cli_conn.fileno() 2479 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 2480 self.addCleanup(sock.close) 2481 self.assertIsInstance(sock, socket.socket) 2482 msg = sock.recv(1024) 2483 self.assertEqual(msg, MSG) 2484 2485 def _testFromFd(self): 2486 self.serv_conn.send(MSG) 2487 2488 def testDup(self): 2489 # Testing dup() 2490 sock = self.cli_conn.dup() 2491 self.addCleanup(sock.close) 2492 msg = sock.recv(1024) 2493 self.assertEqual(msg, MSG) 2494 2495 def _testDup(self): 2496 self.serv_conn.send(MSG) 2497 2498 def testShutdown(self): 2499 # Testing shutdown() 2500 msg = self.cli_conn.recv(1024) 2501 self.assertEqual(msg, MSG) 2502 # wait for _testShutdown to finish: on OS X, when the server 2503 # closes the connection the client also becomes disconnected, 2504 # and the client's shutdown call will fail. (Issue #4397.) 2505 self.done.wait() 2506 2507 def _testShutdown(self): 2508 self.serv_conn.send(MSG) 2509 self.serv_conn.shutdown(2) 2510 2511 testShutdown_overflow = support.cpython_only(testShutdown) 2512 2513 @support.cpython_only 2514 def _testShutdown_overflow(self): 2515 import _testcapi 2516 self.serv_conn.send(MSG) 2517 # Issue 15989 2518 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2519 _testcapi.INT_MAX + 1) 2520 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2521 2 + (_testcapi.UINT_MAX + 1)) 2522 self.serv_conn.shutdown(2) 2523 2524 def testDetach(self): 2525 # Testing detach() 2526 fileno = self.cli_conn.fileno() 2527 f = self.cli_conn.detach() 2528 self.assertEqual(f, fileno) 2529 # cli_conn cannot be used anymore... 2530 self.assertTrue(self.cli_conn._closed) 2531 self.assertRaises(OSError, self.cli_conn.recv, 1024) 2532 self.cli_conn.close() 2533 # ...but we can create another socket using the (still open) 2534 # file descriptor 2535 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) 2536 self.addCleanup(sock.close) 2537 msg = sock.recv(1024) 2538 self.assertEqual(msg, MSG) 2539 2540 def _testDetach(self): 2541 self.serv_conn.send(MSG) 2542 2543 2544class BasicUDPTest(ThreadedUDPSocketTest): 2545 2546 def __init__(self, methodName='runTest'): 2547 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 2548 2549 def testSendtoAndRecv(self): 2550 # Testing sendto() and Recv() over UDP 2551 msg = self.serv.recv(len(MSG)) 2552 self.assertEqual(msg, MSG) 2553 2554 def _testSendtoAndRecv(self): 2555 self.cli.sendto(MSG, 0, (HOST, self.port)) 2556 2557 def testRecvFrom(self): 2558 # Testing recvfrom() over UDP 2559 msg, addr = self.serv.recvfrom(len(MSG)) 2560 self.assertEqual(msg, MSG) 2561 2562 def _testRecvFrom(self): 2563 self.cli.sendto(MSG, 0, (HOST, self.port)) 2564 2565 def testRecvFromNegative(self): 2566 # Negative lengths passed to recvfrom should give ValueError. 2567 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2568 2569 def _testRecvFromNegative(self): 2570 self.cli.sendto(MSG, 0, (HOST, self.port)) 2571 2572 2573@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 2574 'UDPLITE sockets required for this test.') 2575class BasicUDPLITETest(ThreadedUDPLITESocketTest): 2576 2577 def __init__(self, methodName='runTest'): 2578 ThreadedUDPLITESocketTest.__init__(self, methodName=methodName) 2579 2580 def testSendtoAndRecv(self): 2581 # Testing sendto() and Recv() over UDPLITE 2582 msg = self.serv.recv(len(MSG)) 2583 self.assertEqual(msg, MSG) 2584 2585 def _testSendtoAndRecv(self): 2586 self.cli.sendto(MSG, 0, (HOST, self.port)) 2587 2588 def testRecvFrom(self): 2589 # Testing recvfrom() over UDPLITE 2590 msg, addr = self.serv.recvfrom(len(MSG)) 2591 self.assertEqual(msg, MSG) 2592 2593 def _testRecvFrom(self): 2594 self.cli.sendto(MSG, 0, (HOST, self.port)) 2595 2596 def testRecvFromNegative(self): 2597 # Negative lengths passed to recvfrom should give ValueError. 2598 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2599 2600 def _testRecvFromNegative(self): 2601 self.cli.sendto(MSG, 0, (HOST, self.port)) 2602 2603# Tests for the sendmsg()/recvmsg() interface. Where possible, the 2604# same test code is used with different families and types of socket 2605# (e.g. stream, datagram), and tests using recvmsg() are repeated 2606# using recvmsg_into(). 2607# 2608# The generic test classes such as SendmsgTests and 2609# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be 2610# supplied with sockets cli_sock and serv_sock representing the 2611# client's and the server's end of the connection respectively, and 2612# attributes cli_addr and serv_addr holding their (numeric where 2613# appropriate) addresses. 2614# 2615# The final concrete test classes combine these with subclasses of 2616# SocketTestBase which set up client and server sockets of a specific 2617# type, and with subclasses of SendrecvmsgBase such as 2618# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these 2619# sockets to cli_sock and serv_sock and override the methods and 2620# attributes of SendrecvmsgBase to fill in destination addresses if 2621# needed when sending, check for specific flags in msg_flags, etc. 2622# 2623# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using 2624# recvmsg_into(). 2625 2626# XXX: like the other datagram (UDP) tests in this module, the code 2627# here assumes that datagram delivery on the local machine will be 2628# reliable. 2629 2630class SendrecvmsgBase(ThreadSafeCleanupTestCase): 2631 # Base class for sendmsg()/recvmsg() tests. 2632 2633 # Time in seconds to wait before considering a test failed, or 2634 # None for no timeout. Not all tests actually set a timeout. 2635 fail_timeout = support.LOOPBACK_TIMEOUT 2636 2637 def setUp(self): 2638 self.misc_event = threading.Event() 2639 super().setUp() 2640 2641 def sendToServer(self, msg): 2642 # Send msg to the server. 2643 return self.cli_sock.send(msg) 2644 2645 # Tuple of alternative default arguments for sendmsg() when called 2646 # via sendmsgToServer() (e.g. to include a destination address). 2647 sendmsg_to_server_defaults = () 2648 2649 def sendmsgToServer(self, *args): 2650 # Call sendmsg() on self.cli_sock with the given arguments, 2651 # filling in any arguments which are not supplied with the 2652 # corresponding items of self.sendmsg_to_server_defaults, if 2653 # any. 2654 return self.cli_sock.sendmsg( 2655 *(args + self.sendmsg_to_server_defaults[len(args):])) 2656 2657 def doRecvmsg(self, sock, bufsize, *args): 2658 # Call recvmsg() on sock with given arguments and return its 2659 # result. Should be used for tests which can use either 2660 # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides 2661 # this method with one which emulates it using recvmsg_into(), 2662 # thus allowing the same test to be used for both methods. 2663 result = sock.recvmsg(bufsize, *args) 2664 self.registerRecvmsgResult(result) 2665 return result 2666 2667 def registerRecvmsgResult(self, result): 2668 # Called by doRecvmsg() with the return value of recvmsg() or 2669 # recvmsg_into(). Can be overridden to arrange cleanup based 2670 # on the returned ancillary data, for instance. 2671 pass 2672 2673 def checkRecvmsgAddress(self, addr1, addr2): 2674 # Called to compare the received address with the address of 2675 # the peer. 2676 self.assertEqual(addr1, addr2) 2677 2678 # Flags that are normally unset in msg_flags 2679 msg_flags_common_unset = 0 2680 for name in ("MSG_CTRUNC", "MSG_OOB"): 2681 msg_flags_common_unset |= getattr(socket, name, 0) 2682 2683 # Flags that are normally set 2684 msg_flags_common_set = 0 2685 2686 # Flags set when a complete record has been received (e.g. MSG_EOR 2687 # for SCTP) 2688 msg_flags_eor_indicator = 0 2689 2690 # Flags set when a complete record has not been received 2691 # (e.g. MSG_TRUNC for datagram sockets) 2692 msg_flags_non_eor_indicator = 0 2693 2694 def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): 2695 # Method to check the value of msg_flags returned by recvmsg[_into](). 2696 # 2697 # Checks that all bits in msg_flags_common_set attribute are 2698 # set in "flags" and all bits in msg_flags_common_unset are 2699 # unset. 2700 # 2701 # The "eor" argument specifies whether the flags should 2702 # indicate that a full record (or datagram) has been received. 2703 # If "eor" is None, no checks are done; otherwise, checks 2704 # that: 2705 # 2706 # * if "eor" is true, all bits in msg_flags_eor_indicator are 2707 # set and all bits in msg_flags_non_eor_indicator are unset 2708 # 2709 # * if "eor" is false, all bits in msg_flags_non_eor_indicator 2710 # are set and all bits in msg_flags_eor_indicator are unset 2711 # 2712 # If "checkset" and/or "checkunset" are supplied, they require 2713 # the given bits to be set or unset respectively, overriding 2714 # what the attributes require for those bits. 2715 # 2716 # If any bits are set in "ignore", they will not be checked, 2717 # regardless of the other inputs. 2718 # 2719 # Will raise Exception if the inputs require a bit to be both 2720 # set and unset, and it is not ignored. 2721 2722 defaultset = self.msg_flags_common_set 2723 defaultunset = self.msg_flags_common_unset 2724 2725 if eor: 2726 defaultset |= self.msg_flags_eor_indicator 2727 defaultunset |= self.msg_flags_non_eor_indicator 2728 elif eor is not None: 2729 defaultset |= self.msg_flags_non_eor_indicator 2730 defaultunset |= self.msg_flags_eor_indicator 2731 2732 # Function arguments override defaults 2733 defaultset &= ~checkunset 2734 defaultunset &= ~checkset 2735 2736 # Merge arguments with remaining defaults, and check for conflicts 2737 checkset |= defaultset 2738 checkunset |= defaultunset 2739 inboth = checkset & checkunset & ~ignore 2740 if inboth: 2741 raise Exception("contradictory set, unset requirements for flags " 2742 "{0:#x}".format(inboth)) 2743 2744 # Compare with given msg_flags value 2745 mask = (checkset | checkunset) & ~ignore 2746 self.assertEqual(flags & mask, checkset & mask) 2747 2748 2749class RecvmsgIntoMixin(SendrecvmsgBase): 2750 # Mixin to implement doRecvmsg() using recvmsg_into(). 2751 2752 def doRecvmsg(self, sock, bufsize, *args): 2753 buf = bytearray(bufsize) 2754 result = sock.recvmsg_into([buf], *args) 2755 self.registerRecvmsgResult(result) 2756 self.assertGreaterEqual(result[0], 0) 2757 self.assertLessEqual(result[0], bufsize) 2758 return (bytes(buf[:result[0]]),) + result[1:] 2759 2760 2761class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): 2762 # Defines flags to be checked in msg_flags for datagram sockets. 2763 2764 @property 2765 def msg_flags_non_eor_indicator(self): 2766 return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC 2767 2768 2769class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): 2770 # Defines flags to be checked in msg_flags for SCTP sockets. 2771 2772 @property 2773 def msg_flags_eor_indicator(self): 2774 return super().msg_flags_eor_indicator | socket.MSG_EOR 2775 2776 2777class SendrecvmsgConnectionlessBase(SendrecvmsgBase): 2778 # Base class for tests on connectionless-mode sockets. Users must 2779 # supply sockets on attributes cli and serv to be mapped to 2780 # cli_sock and serv_sock respectively. 2781 2782 @property 2783 def serv_sock(self): 2784 return self.serv 2785 2786 @property 2787 def cli_sock(self): 2788 return self.cli 2789 2790 @property 2791 def sendmsg_to_server_defaults(self): 2792 return ([], [], 0, self.serv_addr) 2793 2794 def sendToServer(self, msg): 2795 return self.cli_sock.sendto(msg, self.serv_addr) 2796 2797 2798class SendrecvmsgConnectedBase(SendrecvmsgBase): 2799 # Base class for tests on connected sockets. Users must supply 2800 # sockets on attributes serv_conn and cli_conn (representing the 2801 # connections *to* the server and the client), to be mapped to 2802 # cli_sock and serv_sock respectively. 2803 2804 @property 2805 def serv_sock(self): 2806 return self.cli_conn 2807 2808 @property 2809 def cli_sock(self): 2810 return self.serv_conn 2811 2812 def checkRecvmsgAddress(self, addr1, addr2): 2813 # Address is currently "unspecified" for a connected socket, 2814 # so we don't examine it 2815 pass 2816 2817 2818class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): 2819 # Base class to set a timeout on server's socket. 2820 2821 def setUp(self): 2822 super().setUp() 2823 self.serv_sock.settimeout(self.fail_timeout) 2824 2825 2826class SendmsgTests(SendrecvmsgServerTimeoutBase): 2827 # Tests for sendmsg() which can use any socket type and do not 2828 # involve recvmsg() or recvmsg_into(). 2829 2830 def testSendmsg(self): 2831 # Send a simple message with sendmsg(). 2832 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2833 2834 def _testSendmsg(self): 2835 self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) 2836 2837 def testSendmsgDataGenerator(self): 2838 # Send from buffer obtained from a generator (not a sequence). 2839 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2840 2841 def _testSendmsgDataGenerator(self): 2842 self.assertEqual(self.sendmsgToServer((o for o in [MSG])), 2843 len(MSG)) 2844 2845 def testSendmsgAncillaryGenerator(self): 2846 # Gather (empty) ancillary data from a generator. 2847 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2848 2849 def _testSendmsgAncillaryGenerator(self): 2850 self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), 2851 len(MSG)) 2852 2853 def testSendmsgArray(self): 2854 # Send data from an array instead of the usual bytes object. 2855 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2856 2857 def _testSendmsgArray(self): 2858 self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), 2859 len(MSG)) 2860 2861 def testSendmsgGather(self): 2862 # Send message data from more than one buffer (gather write). 2863 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2864 2865 def _testSendmsgGather(self): 2866 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2867 2868 def testSendmsgBadArgs(self): 2869 # Check that sendmsg() rejects invalid arguments. 2870 self.assertEqual(self.serv_sock.recv(1000), b"done") 2871 2872 def _testSendmsgBadArgs(self): 2873 self.assertRaises(TypeError, self.cli_sock.sendmsg) 2874 self.assertRaises(TypeError, self.sendmsgToServer, 2875 b"not in an iterable") 2876 self.assertRaises(TypeError, self.sendmsgToServer, 2877 object()) 2878 self.assertRaises(TypeError, self.sendmsgToServer, 2879 [object()]) 2880 self.assertRaises(TypeError, self.sendmsgToServer, 2881 [MSG, object()]) 2882 self.assertRaises(TypeError, self.sendmsgToServer, 2883 [MSG], object()) 2884 self.assertRaises(TypeError, self.sendmsgToServer, 2885 [MSG], [], object()) 2886 self.assertRaises(TypeError, self.sendmsgToServer, 2887 [MSG], [], 0, object()) 2888 self.sendToServer(b"done") 2889 2890 def testSendmsgBadCmsg(self): 2891 # Check that invalid ancillary data items are rejected. 2892 self.assertEqual(self.serv_sock.recv(1000), b"done") 2893 2894 def _testSendmsgBadCmsg(self): 2895 self.assertRaises(TypeError, self.sendmsgToServer, 2896 [MSG], [object()]) 2897 self.assertRaises(TypeError, self.sendmsgToServer, 2898 [MSG], [(object(), 0, b"data")]) 2899 self.assertRaises(TypeError, self.sendmsgToServer, 2900 [MSG], [(0, object(), b"data")]) 2901 self.assertRaises(TypeError, self.sendmsgToServer, 2902 [MSG], [(0, 0, object())]) 2903 self.assertRaises(TypeError, self.sendmsgToServer, 2904 [MSG], [(0, 0)]) 2905 self.assertRaises(TypeError, self.sendmsgToServer, 2906 [MSG], [(0, 0, b"data", 42)]) 2907 self.sendToServer(b"done") 2908 2909 @requireAttrs(socket, "CMSG_SPACE") 2910 def testSendmsgBadMultiCmsg(self): 2911 # Check that invalid ancillary data items are rejected when 2912 # more than one item is present. 2913 self.assertEqual(self.serv_sock.recv(1000), b"done") 2914 2915 @testSendmsgBadMultiCmsg.client_skip 2916 def _testSendmsgBadMultiCmsg(self): 2917 self.assertRaises(TypeError, self.sendmsgToServer, 2918 [MSG], [0, 0, b""]) 2919 self.assertRaises(TypeError, self.sendmsgToServer, 2920 [MSG], [(0, 0, b""), object()]) 2921 self.sendToServer(b"done") 2922 2923 def testSendmsgExcessCmsgReject(self): 2924 # Check that sendmsg() rejects excess ancillary data items 2925 # when the number that can be sent is limited. 2926 self.assertEqual(self.serv_sock.recv(1000), b"done") 2927 2928 def _testSendmsgExcessCmsgReject(self): 2929 if not hasattr(socket, "CMSG_SPACE"): 2930 # Can only send one item 2931 with self.assertRaises(OSError) as cm: 2932 self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) 2933 self.assertIsNone(cm.exception.errno) 2934 self.sendToServer(b"done") 2935 2936 def testSendmsgAfterClose(self): 2937 # Check that sendmsg() fails on a closed socket. 2938 pass 2939 2940 def _testSendmsgAfterClose(self): 2941 self.cli_sock.close() 2942 self.assertRaises(OSError, self.sendmsgToServer, [MSG]) 2943 2944 2945class SendmsgStreamTests(SendmsgTests): 2946 # Tests for sendmsg() which require a stream socket and do not 2947 # involve recvmsg() or recvmsg_into(). 2948 2949 def testSendmsgExplicitNoneAddr(self): 2950 # Check that peer address can be specified as None. 2951 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2952 2953 def _testSendmsgExplicitNoneAddr(self): 2954 self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) 2955 2956 def testSendmsgTimeout(self): 2957 # Check that timeout works with sendmsg(). 2958 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2959 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2960 2961 def _testSendmsgTimeout(self): 2962 try: 2963 self.cli_sock.settimeout(0.03) 2964 try: 2965 while True: 2966 self.sendmsgToServer([b"a"*512]) 2967 except TimeoutError: 2968 pass 2969 except OSError as exc: 2970 if exc.errno != errno.ENOMEM: 2971 raise 2972 # bpo-33937 the test randomly fails on Travis CI with 2973 # "OSError: [Errno 12] Cannot allocate memory" 2974 else: 2975 self.fail("TimeoutError not raised") 2976 finally: 2977 self.misc_event.set() 2978 2979 # XXX: would be nice to have more tests for sendmsg flags argument. 2980 2981 # Linux supports MSG_DONTWAIT when sending, but in general, it 2982 # only works when receiving. Could add other platforms if they 2983 # support it too. 2984 @skipWithClientIf(sys.platform not in {"linux"}, 2985 "MSG_DONTWAIT not known to work on this platform when " 2986 "sending") 2987 def testSendmsgDontWait(self): 2988 # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. 2989 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2990 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2991 2992 @testSendmsgDontWait.client_skip 2993 def _testSendmsgDontWait(self): 2994 try: 2995 with self.assertRaises(OSError) as cm: 2996 while True: 2997 self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) 2998 # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI 2999 # with "OSError: [Errno 12] Cannot allocate memory" 3000 self.assertIn(cm.exception.errno, 3001 (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM)) 3002 finally: 3003 self.misc_event.set() 3004 3005 3006class SendmsgConnectionlessTests(SendmsgTests): 3007 # Tests for sendmsg() which require a connectionless-mode 3008 # (e.g. datagram) socket, and do not involve recvmsg() or 3009 # recvmsg_into(). 3010 3011 def testSendmsgNoDestAddr(self): 3012 # Check that sendmsg() fails when no destination address is 3013 # given for unconnected socket. 3014 pass 3015 3016 def _testSendmsgNoDestAddr(self): 3017 self.assertRaises(OSError, self.cli_sock.sendmsg, 3018 [MSG]) 3019 self.assertRaises(OSError, self.cli_sock.sendmsg, 3020 [MSG], [], 0, None) 3021 3022 3023class RecvmsgGenericTests(SendrecvmsgBase): 3024 # Tests for recvmsg() which can also be emulated using 3025 # recvmsg_into(), and can use any socket type. 3026 3027 def testRecvmsg(self): 3028 # Receive a simple message with recvmsg[_into](). 3029 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3030 self.assertEqual(msg, MSG) 3031 self.checkRecvmsgAddress(addr, self.cli_addr) 3032 self.assertEqual(ancdata, []) 3033 self.checkFlags(flags, eor=True) 3034 3035 def _testRecvmsg(self): 3036 self.sendToServer(MSG) 3037 3038 def testRecvmsgExplicitDefaults(self): 3039 # Test recvmsg[_into]() with default arguments provided explicitly. 3040 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3041 len(MSG), 0, 0) 3042 self.assertEqual(msg, MSG) 3043 self.checkRecvmsgAddress(addr, self.cli_addr) 3044 self.assertEqual(ancdata, []) 3045 self.checkFlags(flags, eor=True) 3046 3047 def _testRecvmsgExplicitDefaults(self): 3048 self.sendToServer(MSG) 3049 3050 def testRecvmsgShorter(self): 3051 # Receive a message smaller than buffer. 3052 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3053 len(MSG) + 42) 3054 self.assertEqual(msg, MSG) 3055 self.checkRecvmsgAddress(addr, self.cli_addr) 3056 self.assertEqual(ancdata, []) 3057 self.checkFlags(flags, eor=True) 3058 3059 def _testRecvmsgShorter(self): 3060 self.sendToServer(MSG) 3061 3062 def testRecvmsgTrunc(self): 3063 # Receive part of message, check for truncation indicators. 3064 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3065 len(MSG) - 3) 3066 self.assertEqual(msg, MSG[:-3]) 3067 self.checkRecvmsgAddress(addr, self.cli_addr) 3068 self.assertEqual(ancdata, []) 3069 self.checkFlags(flags, eor=False) 3070 3071 def _testRecvmsgTrunc(self): 3072 self.sendToServer(MSG) 3073 3074 def testRecvmsgShortAncillaryBuf(self): 3075 # Test ancillary data buffer too small to hold any ancillary data. 3076 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3077 len(MSG), 1) 3078 self.assertEqual(msg, MSG) 3079 self.checkRecvmsgAddress(addr, self.cli_addr) 3080 self.assertEqual(ancdata, []) 3081 self.checkFlags(flags, eor=True) 3082 3083 def _testRecvmsgShortAncillaryBuf(self): 3084 self.sendToServer(MSG) 3085 3086 def testRecvmsgLongAncillaryBuf(self): 3087 # Test large ancillary data buffer. 3088 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3089 len(MSG), 10240) 3090 self.assertEqual(msg, MSG) 3091 self.checkRecvmsgAddress(addr, self.cli_addr) 3092 self.assertEqual(ancdata, []) 3093 self.checkFlags(flags, eor=True) 3094 3095 def _testRecvmsgLongAncillaryBuf(self): 3096 self.sendToServer(MSG) 3097 3098 def testRecvmsgAfterClose(self): 3099 # Check that recvmsg[_into]() fails on a closed socket. 3100 self.serv_sock.close() 3101 self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) 3102 3103 def _testRecvmsgAfterClose(self): 3104 pass 3105 3106 def testRecvmsgTimeout(self): 3107 # Check that timeout works. 3108 try: 3109 self.serv_sock.settimeout(0.03) 3110 self.assertRaises(TimeoutError, 3111 self.doRecvmsg, self.serv_sock, len(MSG)) 3112 finally: 3113 self.misc_event.set() 3114 3115 def _testRecvmsgTimeout(self): 3116 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3117 3118 @requireAttrs(socket, "MSG_PEEK") 3119 def testRecvmsgPeek(self): 3120 # Check that MSG_PEEK in flags enables examination of pending 3121 # data without consuming it. 3122 3123 # Receive part of data with MSG_PEEK. 3124 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3125 len(MSG) - 3, 0, 3126 socket.MSG_PEEK) 3127 self.assertEqual(msg, MSG[:-3]) 3128 self.checkRecvmsgAddress(addr, self.cli_addr) 3129 self.assertEqual(ancdata, []) 3130 # Ignoring MSG_TRUNC here (so this test is the same for stream 3131 # and datagram sockets). Some wording in POSIX seems to 3132 # suggest that it needn't be set when peeking, but that may 3133 # just be a slip. 3134 self.checkFlags(flags, eor=False, 3135 ignore=getattr(socket, "MSG_TRUNC", 0)) 3136 3137 # Receive all data with MSG_PEEK. 3138 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3139 len(MSG), 0, 3140 socket.MSG_PEEK) 3141 self.assertEqual(msg, MSG) 3142 self.checkRecvmsgAddress(addr, self.cli_addr) 3143 self.assertEqual(ancdata, []) 3144 self.checkFlags(flags, eor=True) 3145 3146 # Check that the same data can still be received normally. 3147 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3148 self.assertEqual(msg, MSG) 3149 self.checkRecvmsgAddress(addr, self.cli_addr) 3150 self.assertEqual(ancdata, []) 3151 self.checkFlags(flags, eor=True) 3152 3153 @testRecvmsgPeek.client_skip 3154 def _testRecvmsgPeek(self): 3155 self.sendToServer(MSG) 3156 3157 @requireAttrs(socket.socket, "sendmsg") 3158 def testRecvmsgFromSendmsg(self): 3159 # Test receiving with recvmsg[_into]() when message is sent 3160 # using sendmsg(). 3161 self.serv_sock.settimeout(self.fail_timeout) 3162 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3163 self.assertEqual(msg, MSG) 3164 self.checkRecvmsgAddress(addr, self.cli_addr) 3165 self.assertEqual(ancdata, []) 3166 self.checkFlags(flags, eor=True) 3167 3168 @testRecvmsgFromSendmsg.client_skip 3169 def _testRecvmsgFromSendmsg(self): 3170 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 3171 3172 3173class RecvmsgGenericStreamTests(RecvmsgGenericTests): 3174 # Tests which require a stream socket and can use either recvmsg() 3175 # or recvmsg_into(). 3176 3177 def testRecvmsgEOF(self): 3178 # Receive end-of-stream indicator (b"", peer socket closed). 3179 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3180 self.assertEqual(msg, b"") 3181 self.checkRecvmsgAddress(addr, self.cli_addr) 3182 self.assertEqual(ancdata, []) 3183 self.checkFlags(flags, eor=None) # Might not have end-of-record marker 3184 3185 def _testRecvmsgEOF(self): 3186 self.cli_sock.close() 3187 3188 def testRecvmsgOverflow(self): 3189 # Receive a message in more than one chunk. 3190 seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3191 len(MSG) - 3) 3192 self.checkRecvmsgAddress(addr, self.cli_addr) 3193 self.assertEqual(ancdata, []) 3194 self.checkFlags(flags, eor=False) 3195 3196 seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3197 self.checkRecvmsgAddress(addr, self.cli_addr) 3198 self.assertEqual(ancdata, []) 3199 self.checkFlags(flags, eor=True) 3200 3201 msg = seg1 + seg2 3202 self.assertEqual(msg, MSG) 3203 3204 def _testRecvmsgOverflow(self): 3205 self.sendToServer(MSG) 3206 3207 3208class RecvmsgTests(RecvmsgGenericTests): 3209 # Tests for recvmsg() which can use any socket type. 3210 3211 def testRecvmsgBadArgs(self): 3212 # Check that recvmsg() rejects invalid arguments. 3213 self.assertRaises(TypeError, self.serv_sock.recvmsg) 3214 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3215 -1, 0, 0) 3216 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3217 len(MSG), -1, 0) 3218 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3219 [bytearray(10)], 0, 0) 3220 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3221 object(), 0, 0) 3222 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3223 len(MSG), object(), 0) 3224 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3225 len(MSG), 0, object()) 3226 3227 msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) 3228 self.assertEqual(msg, MSG) 3229 self.checkRecvmsgAddress(addr, self.cli_addr) 3230 self.assertEqual(ancdata, []) 3231 self.checkFlags(flags, eor=True) 3232 3233 def _testRecvmsgBadArgs(self): 3234 self.sendToServer(MSG) 3235 3236 3237class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): 3238 # Tests for recvmsg_into() which can use any socket type. 3239 3240 def testRecvmsgIntoBadArgs(self): 3241 # Check that recvmsg_into() rejects invalid arguments. 3242 buf = bytearray(len(MSG)) 3243 self.assertRaises(TypeError, self.serv_sock.recvmsg_into) 3244 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3245 len(MSG), 0, 0) 3246 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3247 buf, 0, 0) 3248 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3249 [object()], 0, 0) 3250 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3251 [b"I'm not writable"], 0, 0) 3252 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3253 [buf, object()], 0, 0) 3254 self.assertRaises(ValueError, self.serv_sock.recvmsg_into, 3255 [buf], -1, 0) 3256 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3257 [buf], object(), 0) 3258 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3259 [buf], 0, object()) 3260 3261 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) 3262 self.assertEqual(nbytes, len(MSG)) 3263 self.assertEqual(buf, bytearray(MSG)) 3264 self.checkRecvmsgAddress(addr, self.cli_addr) 3265 self.assertEqual(ancdata, []) 3266 self.checkFlags(flags, eor=True) 3267 3268 def _testRecvmsgIntoBadArgs(self): 3269 self.sendToServer(MSG) 3270 3271 def testRecvmsgIntoGenerator(self): 3272 # Receive into buffer obtained from a generator (not a sequence). 3273 buf = bytearray(len(MSG)) 3274 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3275 (o for o in [buf])) 3276 self.assertEqual(nbytes, len(MSG)) 3277 self.assertEqual(buf, bytearray(MSG)) 3278 self.checkRecvmsgAddress(addr, self.cli_addr) 3279 self.assertEqual(ancdata, []) 3280 self.checkFlags(flags, eor=True) 3281 3282 def _testRecvmsgIntoGenerator(self): 3283 self.sendToServer(MSG) 3284 3285 def testRecvmsgIntoArray(self): 3286 # Receive into an array rather than the usual bytearray. 3287 buf = array.array("B", [0] * len(MSG)) 3288 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) 3289 self.assertEqual(nbytes, len(MSG)) 3290 self.assertEqual(buf.tobytes(), MSG) 3291 self.checkRecvmsgAddress(addr, self.cli_addr) 3292 self.assertEqual(ancdata, []) 3293 self.checkFlags(flags, eor=True) 3294 3295 def _testRecvmsgIntoArray(self): 3296 self.sendToServer(MSG) 3297 3298 def testRecvmsgIntoScatter(self): 3299 # Receive into multiple buffers (scatter write). 3300 b1 = bytearray(b"----") 3301 b2 = bytearray(b"0123456789") 3302 b3 = bytearray(b"--------------") 3303 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3304 [b1, memoryview(b2)[2:9], b3]) 3305 self.assertEqual(nbytes, len(b"Mary had a little lamb")) 3306 self.assertEqual(b1, bytearray(b"Mary")) 3307 self.assertEqual(b2, bytearray(b"01 had a 9")) 3308 self.assertEqual(b3, bytearray(b"little lamb---")) 3309 self.checkRecvmsgAddress(addr, self.cli_addr) 3310 self.assertEqual(ancdata, []) 3311 self.checkFlags(flags, eor=True) 3312 3313 def _testRecvmsgIntoScatter(self): 3314 self.sendToServer(b"Mary had a little lamb") 3315 3316 3317class CmsgMacroTests(unittest.TestCase): 3318 # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests 3319 # assumptions used by sendmsg() and recvmsg[_into](), which share 3320 # code with these functions. 3321 3322 # Match the definition in socketmodule.c 3323 try: 3324 import _testcapi 3325 except ImportError: 3326 socklen_t_limit = 0x7fffffff 3327 else: 3328 socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) 3329 3330 @requireAttrs(socket, "CMSG_LEN") 3331 def testCMSG_LEN(self): 3332 # Test CMSG_LEN() with various valid and invalid values, 3333 # checking the assumptions used by recvmsg() and sendmsg(). 3334 toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 3335 values = list(range(257)) + list(range(toobig - 257, toobig)) 3336 3337 # struct cmsghdr has at least three members, two of which are ints 3338 self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) 3339 for n in values: 3340 ret = socket.CMSG_LEN(n) 3341 # This is how recvmsg() calculates the data size 3342 self.assertEqual(ret - socket.CMSG_LEN(0), n) 3343 self.assertLessEqual(ret, self.socklen_t_limit) 3344 3345 self.assertRaises(OverflowError, socket.CMSG_LEN, -1) 3346 # sendmsg() shares code with these functions, and requires 3347 # that it reject values over the limit. 3348 self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) 3349 self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) 3350 3351 @requireAttrs(socket, "CMSG_SPACE") 3352 def testCMSG_SPACE(self): 3353 # Test CMSG_SPACE() with various valid and invalid values, 3354 # checking the assumptions used by sendmsg(). 3355 toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 3356 values = list(range(257)) + list(range(toobig - 257, toobig)) 3357 3358 last = socket.CMSG_SPACE(0) 3359 # struct cmsghdr has at least three members, two of which are ints 3360 self.assertGreater(last, array.array("i").itemsize * 2) 3361 for n in values: 3362 ret = socket.CMSG_SPACE(n) 3363 self.assertGreaterEqual(ret, last) 3364 self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) 3365 self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) 3366 self.assertLessEqual(ret, self.socklen_t_limit) 3367 last = ret 3368 3369 self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) 3370 # sendmsg() shares code with these functions, and requires 3371 # that it reject values over the limit. 3372 self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) 3373 self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) 3374 3375 3376class SCMRightsTest(SendrecvmsgServerTimeoutBase): 3377 # Tests for file descriptor passing on Unix-domain sockets. 3378 3379 # Invalid file descriptor value that's unlikely to evaluate to a 3380 # real FD even if one of its bytes is replaced with a different 3381 # value (which shouldn't actually happen). 3382 badfd = -0x5555 3383 3384 def newFDs(self, n): 3385 # Return a list of n file descriptors for newly-created files 3386 # containing their list indices as ASCII numbers. 3387 fds = [] 3388 for i in range(n): 3389 fd, path = tempfile.mkstemp() 3390 self.addCleanup(os.unlink, path) 3391 self.addCleanup(os.close, fd) 3392 os.write(fd, str(i).encode()) 3393 fds.append(fd) 3394 return fds 3395 3396 def checkFDs(self, fds): 3397 # Check that the file descriptors in the given list contain 3398 # their correct list indices as ASCII numbers. 3399 for n, fd in enumerate(fds): 3400 os.lseek(fd, 0, os.SEEK_SET) 3401 self.assertEqual(os.read(fd, 1024), str(n).encode()) 3402 3403 def registerRecvmsgResult(self, result): 3404 self.addCleanup(self.closeRecvmsgFDs, result) 3405 3406 def closeRecvmsgFDs(self, recvmsg_result): 3407 # Close all file descriptors specified in the ancillary data 3408 # of the given return value from recvmsg() or recvmsg_into(). 3409 for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: 3410 if (cmsg_level == socket.SOL_SOCKET and 3411 cmsg_type == socket.SCM_RIGHTS): 3412 fds = array.array("i") 3413 fds.frombytes(cmsg_data[: 3414 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3415 for fd in fds: 3416 os.close(fd) 3417 3418 def createAndSendFDs(self, n): 3419 # Send n new file descriptors created by newFDs() to the 3420 # server, with the constant MSG as the non-ancillary data. 3421 self.assertEqual( 3422 self.sendmsgToServer([MSG], 3423 [(socket.SOL_SOCKET, 3424 socket.SCM_RIGHTS, 3425 array.array("i", self.newFDs(n)))]), 3426 len(MSG)) 3427 3428 def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): 3429 # Check that constant MSG was received with numfds file 3430 # descriptors in a maximum of maxcmsgs control messages (which 3431 # must contain only complete integers). By default, check 3432 # that MSG_CTRUNC is unset, but ignore any flags in 3433 # ignoreflags. 3434 msg, ancdata, flags, addr = result 3435 self.assertEqual(msg, MSG) 3436 self.checkRecvmsgAddress(addr, self.cli_addr) 3437 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3438 ignore=ignoreflags) 3439 3440 self.assertIsInstance(ancdata, list) 3441 self.assertLessEqual(len(ancdata), maxcmsgs) 3442 fds = array.array("i") 3443 for item in ancdata: 3444 self.assertIsInstance(item, tuple) 3445 cmsg_level, cmsg_type, cmsg_data = item 3446 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3447 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3448 self.assertIsInstance(cmsg_data, bytes) 3449 self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) 3450 fds.frombytes(cmsg_data) 3451 3452 self.assertEqual(len(fds), numfds) 3453 self.checkFDs(fds) 3454 3455 def testFDPassSimple(self): 3456 # Pass a single FD (array read from bytes object). 3457 self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, 3458 len(MSG), 10240)) 3459 3460 def _testFDPassSimple(self): 3461 self.assertEqual( 3462 self.sendmsgToServer( 3463 [MSG], 3464 [(socket.SOL_SOCKET, 3465 socket.SCM_RIGHTS, 3466 array.array("i", self.newFDs(1)).tobytes())]), 3467 len(MSG)) 3468 3469 def testMultipleFDPass(self): 3470 # Pass multiple FDs in a single array. 3471 self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, 3472 len(MSG), 10240)) 3473 3474 def _testMultipleFDPass(self): 3475 self.createAndSendFDs(4) 3476 3477 @requireAttrs(socket, "CMSG_SPACE") 3478 def testFDPassCMSG_SPACE(self): 3479 # Test using CMSG_SPACE() to calculate ancillary buffer size. 3480 self.checkRecvmsgFDs( 3481 4, self.doRecvmsg(self.serv_sock, len(MSG), 3482 socket.CMSG_SPACE(4 * SIZEOF_INT))) 3483 3484 @testFDPassCMSG_SPACE.client_skip 3485 def _testFDPassCMSG_SPACE(self): 3486 self.createAndSendFDs(4) 3487 3488 def testFDPassCMSG_LEN(self): 3489 # Test using CMSG_LEN() to calculate ancillary buffer size. 3490 self.checkRecvmsgFDs(1, 3491 self.doRecvmsg(self.serv_sock, len(MSG), 3492 socket.CMSG_LEN(4 * SIZEOF_INT)), 3493 # RFC 3542 says implementations may set 3494 # MSG_CTRUNC if there isn't enough space 3495 # for trailing padding. 3496 ignoreflags=socket.MSG_CTRUNC) 3497 3498 def _testFDPassCMSG_LEN(self): 3499 self.createAndSendFDs(1) 3500 3501 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3502 @unittest.skipIf(AIX, "skipping, see issue #22397") 3503 @requireAttrs(socket, "CMSG_SPACE") 3504 def testFDPassSeparate(self): 3505 # Pass two FDs in two separate arrays. Arrays may be combined 3506 # into a single control message by the OS. 3507 self.checkRecvmsgFDs(2, 3508 self.doRecvmsg(self.serv_sock, len(MSG), 10240), 3509 maxcmsgs=2) 3510 3511 @testFDPassSeparate.client_skip 3512 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3513 @unittest.skipIf(AIX, "skipping, see issue #22397") 3514 def _testFDPassSeparate(self): 3515 fd0, fd1 = self.newFDs(2) 3516 self.assertEqual( 3517 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3518 socket.SCM_RIGHTS, 3519 array.array("i", [fd0])), 3520 (socket.SOL_SOCKET, 3521 socket.SCM_RIGHTS, 3522 array.array("i", [fd1]))]), 3523 len(MSG)) 3524 3525 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3526 @unittest.skipIf(AIX, "skipping, see issue #22397") 3527 @requireAttrs(socket, "CMSG_SPACE") 3528 def testFDPassSeparateMinSpace(self): 3529 # Pass two FDs in two separate arrays, receiving them into the 3530 # minimum space for two arrays. 3531 num_fds = 2 3532 self.checkRecvmsgFDs(num_fds, 3533 self.doRecvmsg(self.serv_sock, len(MSG), 3534 socket.CMSG_SPACE(SIZEOF_INT) + 3535 socket.CMSG_LEN(SIZEOF_INT * num_fds)), 3536 maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) 3537 3538 @testFDPassSeparateMinSpace.client_skip 3539 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3540 @unittest.skipIf(AIX, "skipping, see issue #22397") 3541 def _testFDPassSeparateMinSpace(self): 3542 fd0, fd1 = self.newFDs(2) 3543 self.assertEqual( 3544 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3545 socket.SCM_RIGHTS, 3546 array.array("i", [fd0])), 3547 (socket.SOL_SOCKET, 3548 socket.SCM_RIGHTS, 3549 array.array("i", [fd1]))]), 3550 len(MSG)) 3551 3552 def sendAncillaryIfPossible(self, msg, ancdata): 3553 # Try to send msg and ancdata to server, but if the system 3554 # call fails, just send msg with no ancillary data. 3555 try: 3556 nbytes = self.sendmsgToServer([msg], ancdata) 3557 except OSError as e: 3558 # Check that it was the system call that failed 3559 self.assertIsInstance(e.errno, int) 3560 nbytes = self.sendmsgToServer([msg]) 3561 self.assertEqual(nbytes, len(msg)) 3562 3563 @unittest.skipIf(sys.platform == "darwin", "see issue #24725") 3564 def testFDPassEmpty(self): 3565 # Try to pass an empty FD array. Can receive either no array 3566 # or an empty array. 3567 self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, 3568 len(MSG), 10240), 3569 ignoreflags=socket.MSG_CTRUNC) 3570 3571 def _testFDPassEmpty(self): 3572 self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, 3573 socket.SCM_RIGHTS, 3574 b"")]) 3575 3576 def testFDPassPartialInt(self): 3577 # Try to pass a truncated FD array. 3578 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3579 len(MSG), 10240) 3580 self.assertEqual(msg, MSG) 3581 self.checkRecvmsgAddress(addr, self.cli_addr) 3582 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3583 self.assertLessEqual(len(ancdata), 1) 3584 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3585 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3586 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3587 self.assertLess(len(cmsg_data), SIZEOF_INT) 3588 3589 def _testFDPassPartialInt(self): 3590 self.sendAncillaryIfPossible( 3591 MSG, 3592 [(socket.SOL_SOCKET, 3593 socket.SCM_RIGHTS, 3594 array.array("i", [self.badfd]).tobytes()[:-1])]) 3595 3596 @requireAttrs(socket, "CMSG_SPACE") 3597 def testFDPassPartialIntInMiddle(self): 3598 # Try to pass two FD arrays, the first of which is truncated. 3599 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3600 len(MSG), 10240) 3601 self.assertEqual(msg, MSG) 3602 self.checkRecvmsgAddress(addr, self.cli_addr) 3603 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3604 self.assertLessEqual(len(ancdata), 2) 3605 fds = array.array("i") 3606 # Arrays may have been combined in a single control message 3607 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3608 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3609 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3610 fds.frombytes(cmsg_data[: 3611 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3612 self.assertLessEqual(len(fds), 2) 3613 self.checkFDs(fds) 3614 3615 @testFDPassPartialIntInMiddle.client_skip 3616 def _testFDPassPartialIntInMiddle(self): 3617 fd0, fd1 = self.newFDs(2) 3618 self.sendAncillaryIfPossible( 3619 MSG, 3620 [(socket.SOL_SOCKET, 3621 socket.SCM_RIGHTS, 3622 array.array("i", [fd0, self.badfd]).tobytes()[:-1]), 3623 (socket.SOL_SOCKET, 3624 socket.SCM_RIGHTS, 3625 array.array("i", [fd1]))]) 3626 3627 def checkTruncatedHeader(self, result, ignoreflags=0): 3628 # Check that no ancillary data items are returned when data is 3629 # truncated inside the cmsghdr structure. 3630 msg, ancdata, flags, addr = result 3631 self.assertEqual(msg, MSG) 3632 self.checkRecvmsgAddress(addr, self.cli_addr) 3633 self.assertEqual(ancdata, []) 3634 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3635 ignore=ignoreflags) 3636 3637 def testCmsgTruncNoBufSize(self): 3638 # Check that no ancillary data is received when no buffer size 3639 # is specified. 3640 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), 3641 # BSD seems to set MSG_CTRUNC only 3642 # if an item has been partially 3643 # received. 3644 ignoreflags=socket.MSG_CTRUNC) 3645 3646 def _testCmsgTruncNoBufSize(self): 3647 self.createAndSendFDs(1) 3648 3649 def testCmsgTrunc0(self): 3650 # Check that no ancillary data is received when buffer size is 0. 3651 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), 3652 ignoreflags=socket.MSG_CTRUNC) 3653 3654 def _testCmsgTrunc0(self): 3655 self.createAndSendFDs(1) 3656 3657 # Check that no ancillary data is returned for various non-zero 3658 # (but still too small) buffer sizes. 3659 3660 def testCmsgTrunc1(self): 3661 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) 3662 3663 def _testCmsgTrunc1(self): 3664 self.createAndSendFDs(1) 3665 3666 def testCmsgTrunc2Int(self): 3667 # The cmsghdr structure has at least three members, two of 3668 # which are ints, so we still shouldn't see any ancillary 3669 # data. 3670 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3671 SIZEOF_INT * 2)) 3672 3673 def _testCmsgTrunc2Int(self): 3674 self.createAndSendFDs(1) 3675 3676 def testCmsgTruncLen0Minus1(self): 3677 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3678 socket.CMSG_LEN(0) - 1)) 3679 3680 def _testCmsgTruncLen0Minus1(self): 3681 self.createAndSendFDs(1) 3682 3683 # The following tests try to truncate the control message in the 3684 # middle of the FD array. 3685 3686 def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): 3687 # Check that file descriptor data is truncated to between 3688 # mindata and maxdata bytes when received with buffer size 3689 # ancbuf, and that any complete file descriptor numbers are 3690 # valid. 3691 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3692 len(MSG), ancbuf) 3693 self.assertEqual(msg, MSG) 3694 self.checkRecvmsgAddress(addr, self.cli_addr) 3695 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3696 3697 if mindata == 0 and ancdata == []: 3698 return 3699 self.assertEqual(len(ancdata), 1) 3700 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3701 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3702 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3703 self.assertGreaterEqual(len(cmsg_data), mindata) 3704 self.assertLessEqual(len(cmsg_data), maxdata) 3705 fds = array.array("i") 3706 fds.frombytes(cmsg_data[: 3707 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3708 self.checkFDs(fds) 3709 3710 def testCmsgTruncLen0(self): 3711 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) 3712 3713 def _testCmsgTruncLen0(self): 3714 self.createAndSendFDs(1) 3715 3716 def testCmsgTruncLen0Plus1(self): 3717 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) 3718 3719 def _testCmsgTruncLen0Plus1(self): 3720 self.createAndSendFDs(2) 3721 3722 def testCmsgTruncLen1(self): 3723 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), 3724 maxdata=SIZEOF_INT) 3725 3726 def _testCmsgTruncLen1(self): 3727 self.createAndSendFDs(2) 3728 3729 def testCmsgTruncLen2Minus1(self): 3730 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, 3731 maxdata=(2 * SIZEOF_INT) - 1) 3732 3733 def _testCmsgTruncLen2Minus1(self): 3734 self.createAndSendFDs(2) 3735 3736 3737class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): 3738 # Test sendmsg() and recvmsg[_into]() using the ancillary data 3739 # features of the RFC 3542 Advanced Sockets API for IPv6. 3740 # Currently we can only handle certain data items (e.g. traffic 3741 # class, hop limit, MTU discovery and fragmentation settings) 3742 # without resorting to unportable means such as the struct module, 3743 # but the tests here are aimed at testing the ancillary data 3744 # handling in sendmsg() and recvmsg() rather than the IPv6 API 3745 # itself. 3746 3747 # Test value to use when setting hop limit of packet 3748 hop_limit = 2 3749 3750 # Test value to use when setting traffic class of packet. 3751 # -1 means "use kernel default". 3752 traffic_class = -1 3753 3754 def ancillaryMapping(self, ancdata): 3755 # Given ancillary data list ancdata, return a mapping from 3756 # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. 3757 # Check that no (level, type) pair appears more than once. 3758 d = {} 3759 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3760 self.assertNotIn((cmsg_level, cmsg_type), d) 3761 d[(cmsg_level, cmsg_type)] = cmsg_data 3762 return d 3763 3764 def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): 3765 # Receive hop limit into ancbufsize bytes of ancillary data 3766 # space. Check that data is MSG, ancillary data is not 3767 # truncated (but ignore any flags in ignoreflags), and hop 3768 # limit is between 0 and maxhop inclusive. 3769 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3770 socket.IPV6_RECVHOPLIMIT, 1) 3771 self.misc_event.set() 3772 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3773 len(MSG), ancbufsize) 3774 3775 self.assertEqual(msg, MSG) 3776 self.checkRecvmsgAddress(addr, self.cli_addr) 3777 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3778 ignore=ignoreflags) 3779 3780 self.assertEqual(len(ancdata), 1) 3781 self.assertIsInstance(ancdata[0], tuple) 3782 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3783 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3784 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3785 self.assertIsInstance(cmsg_data, bytes) 3786 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3787 a = array.array("i") 3788 a.frombytes(cmsg_data) 3789 self.assertGreaterEqual(a[0], 0) 3790 self.assertLessEqual(a[0], maxhop) 3791 3792 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3793 def testRecvHopLimit(self): 3794 # Test receiving the packet hop limit as ancillary data. 3795 self.checkHopLimit(ancbufsize=10240) 3796 3797 @testRecvHopLimit.client_skip 3798 def _testRecvHopLimit(self): 3799 # Need to wait until server has asked to receive ancillary 3800 # data, as implementations are not required to buffer it 3801 # otherwise. 3802 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3803 self.sendToServer(MSG) 3804 3805 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3806 def testRecvHopLimitCMSG_SPACE(self): 3807 # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. 3808 self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) 3809 3810 @testRecvHopLimitCMSG_SPACE.client_skip 3811 def _testRecvHopLimitCMSG_SPACE(self): 3812 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3813 self.sendToServer(MSG) 3814 3815 # Could test receiving into buffer sized using CMSG_LEN, but RFC 3816 # 3542 says portable applications must provide space for trailing 3817 # padding. Implementations may set MSG_CTRUNC if there isn't 3818 # enough space for the padding. 3819 3820 @requireAttrs(socket.socket, "sendmsg") 3821 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3822 def testSetHopLimit(self): 3823 # Test setting hop limit on outgoing packet and receiving it 3824 # at the other end. 3825 self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) 3826 3827 @testSetHopLimit.client_skip 3828 def _testSetHopLimit(self): 3829 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3830 self.assertEqual( 3831 self.sendmsgToServer([MSG], 3832 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3833 array.array("i", [self.hop_limit]))]), 3834 len(MSG)) 3835 3836 def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, 3837 ignoreflags=0): 3838 # Receive traffic class and hop limit into ancbufsize bytes of 3839 # ancillary data space. Check that data is MSG, ancillary 3840 # data is not truncated (but ignore any flags in ignoreflags), 3841 # and traffic class and hop limit are in range (hop limit no 3842 # more than maxhop). 3843 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3844 socket.IPV6_RECVHOPLIMIT, 1) 3845 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3846 socket.IPV6_RECVTCLASS, 1) 3847 self.misc_event.set() 3848 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3849 len(MSG), ancbufsize) 3850 3851 self.assertEqual(msg, MSG) 3852 self.checkRecvmsgAddress(addr, self.cli_addr) 3853 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3854 ignore=ignoreflags) 3855 self.assertEqual(len(ancdata), 2) 3856 ancmap = self.ancillaryMapping(ancdata) 3857 3858 tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] 3859 self.assertEqual(len(tcdata), SIZEOF_INT) 3860 a = array.array("i") 3861 a.frombytes(tcdata) 3862 self.assertGreaterEqual(a[0], 0) 3863 self.assertLessEqual(a[0], 255) 3864 3865 hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] 3866 self.assertEqual(len(hldata), SIZEOF_INT) 3867 a = array.array("i") 3868 a.frombytes(hldata) 3869 self.assertGreaterEqual(a[0], 0) 3870 self.assertLessEqual(a[0], maxhop) 3871 3872 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3873 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3874 def testRecvTrafficClassAndHopLimit(self): 3875 # Test receiving traffic class and hop limit as ancillary data. 3876 self.checkTrafficClassAndHopLimit(ancbufsize=10240) 3877 3878 @testRecvTrafficClassAndHopLimit.client_skip 3879 def _testRecvTrafficClassAndHopLimit(self): 3880 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3881 self.sendToServer(MSG) 3882 3883 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3884 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3885 def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3886 # Test receiving traffic class and hop limit, using 3887 # CMSG_SPACE() to calculate buffer size. 3888 self.checkTrafficClassAndHopLimit( 3889 ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) 3890 3891 @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip 3892 def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3893 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3894 self.sendToServer(MSG) 3895 3896 @requireAttrs(socket.socket, "sendmsg") 3897 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3898 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3899 def testSetTrafficClassAndHopLimit(self): 3900 # Test setting traffic class and hop limit on outgoing packet, 3901 # and receiving them at the other end. 3902 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3903 maxhop=self.hop_limit) 3904 3905 @testSetTrafficClassAndHopLimit.client_skip 3906 def _testSetTrafficClassAndHopLimit(self): 3907 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3908 self.assertEqual( 3909 self.sendmsgToServer([MSG], 3910 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3911 array.array("i", [self.traffic_class])), 3912 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3913 array.array("i", [self.hop_limit]))]), 3914 len(MSG)) 3915 3916 @requireAttrs(socket.socket, "sendmsg") 3917 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3918 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3919 def testOddCmsgSize(self): 3920 # Try to send ancillary data with first item one byte too 3921 # long. Fall back to sending with correct size if this fails, 3922 # and check that second item was handled correctly. 3923 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3924 maxhop=self.hop_limit) 3925 3926 @testOddCmsgSize.client_skip 3927 def _testOddCmsgSize(self): 3928 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3929 try: 3930 nbytes = self.sendmsgToServer( 3931 [MSG], 3932 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3933 array.array("i", [self.traffic_class]).tobytes() + b"\x00"), 3934 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3935 array.array("i", [self.hop_limit]))]) 3936 except OSError as e: 3937 self.assertIsInstance(e.errno, int) 3938 nbytes = self.sendmsgToServer( 3939 [MSG], 3940 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3941 array.array("i", [self.traffic_class])), 3942 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3943 array.array("i", [self.hop_limit]))]) 3944 self.assertEqual(nbytes, len(MSG)) 3945 3946 # Tests for proper handling of truncated ancillary data 3947 3948 def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): 3949 # Receive hop limit into ancbufsize bytes of ancillary data 3950 # space, which should be too small to contain the ancillary 3951 # data header (if ancbufsize is None, pass no second argument 3952 # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set 3953 # (unless included in ignoreflags), and no ancillary data is 3954 # returned. 3955 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3956 socket.IPV6_RECVHOPLIMIT, 1) 3957 self.misc_event.set() 3958 args = () if ancbufsize is None else (ancbufsize,) 3959 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3960 len(MSG), *args) 3961 3962 self.assertEqual(msg, MSG) 3963 self.checkRecvmsgAddress(addr, self.cli_addr) 3964 self.assertEqual(ancdata, []) 3965 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3966 ignore=ignoreflags) 3967 3968 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3969 def testCmsgTruncNoBufSize(self): 3970 # Check that no ancillary data is received when no ancillary 3971 # buffer size is provided. 3972 self.checkHopLimitTruncatedHeader(ancbufsize=None, 3973 # BSD seems to set 3974 # MSG_CTRUNC only if an item 3975 # has been partially 3976 # received. 3977 ignoreflags=socket.MSG_CTRUNC) 3978 3979 @testCmsgTruncNoBufSize.client_skip 3980 def _testCmsgTruncNoBufSize(self): 3981 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3982 self.sendToServer(MSG) 3983 3984 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3985 def testSingleCmsgTrunc0(self): 3986 # Check that no ancillary data is received when ancillary 3987 # buffer size is zero. 3988 self.checkHopLimitTruncatedHeader(ancbufsize=0, 3989 ignoreflags=socket.MSG_CTRUNC) 3990 3991 @testSingleCmsgTrunc0.client_skip 3992 def _testSingleCmsgTrunc0(self): 3993 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3994 self.sendToServer(MSG) 3995 3996 # Check that no ancillary data is returned for various non-zero 3997 # (but still too small) buffer sizes. 3998 3999 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4000 def testSingleCmsgTrunc1(self): 4001 self.checkHopLimitTruncatedHeader(ancbufsize=1) 4002 4003 @testSingleCmsgTrunc1.client_skip 4004 def _testSingleCmsgTrunc1(self): 4005 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4006 self.sendToServer(MSG) 4007 4008 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4009 def testSingleCmsgTrunc2Int(self): 4010 self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) 4011 4012 @testSingleCmsgTrunc2Int.client_skip 4013 def _testSingleCmsgTrunc2Int(self): 4014 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4015 self.sendToServer(MSG) 4016 4017 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4018 def testSingleCmsgTruncLen0Minus1(self): 4019 self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) 4020 4021 @testSingleCmsgTruncLen0Minus1.client_skip 4022 def _testSingleCmsgTruncLen0Minus1(self): 4023 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4024 self.sendToServer(MSG) 4025 4026 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4027 def testSingleCmsgTruncInData(self): 4028 # Test truncation of a control message inside its associated 4029 # data. The message may be returned with its data truncated, 4030 # or not returned at all. 4031 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4032 socket.IPV6_RECVHOPLIMIT, 1) 4033 self.misc_event.set() 4034 msg, ancdata, flags, addr = self.doRecvmsg( 4035 self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) 4036 4037 self.assertEqual(msg, MSG) 4038 self.checkRecvmsgAddress(addr, self.cli_addr) 4039 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4040 4041 self.assertLessEqual(len(ancdata), 1) 4042 if ancdata: 4043 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4044 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4045 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 4046 self.assertLess(len(cmsg_data), SIZEOF_INT) 4047 4048 @testSingleCmsgTruncInData.client_skip 4049 def _testSingleCmsgTruncInData(self): 4050 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4051 self.sendToServer(MSG) 4052 4053 def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): 4054 # Receive traffic class and hop limit into ancbufsize bytes of 4055 # ancillary data space, which should be large enough to 4056 # contain the first item, but too small to contain the header 4057 # of the second. Check that data is MSG, MSG_CTRUNC is set 4058 # (unless included in ignoreflags), and only one ancillary 4059 # data item is returned. 4060 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4061 socket.IPV6_RECVHOPLIMIT, 1) 4062 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4063 socket.IPV6_RECVTCLASS, 1) 4064 self.misc_event.set() 4065 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 4066 len(MSG), ancbufsize) 4067 4068 self.assertEqual(msg, MSG) 4069 self.checkRecvmsgAddress(addr, self.cli_addr) 4070 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 4071 ignore=ignoreflags) 4072 4073 self.assertEqual(len(ancdata), 1) 4074 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4075 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4076 self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) 4077 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4078 a = array.array("i") 4079 a.frombytes(cmsg_data) 4080 self.assertGreaterEqual(a[0], 0) 4081 self.assertLessEqual(a[0], 255) 4082 4083 # Try the above test with various buffer sizes. 4084 4085 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4086 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4087 def testSecondCmsgTrunc0(self): 4088 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), 4089 ignoreflags=socket.MSG_CTRUNC) 4090 4091 @testSecondCmsgTrunc0.client_skip 4092 def _testSecondCmsgTrunc0(self): 4093 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4094 self.sendToServer(MSG) 4095 4096 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4097 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4098 def testSecondCmsgTrunc1(self): 4099 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) 4100 4101 @testSecondCmsgTrunc1.client_skip 4102 def _testSecondCmsgTrunc1(self): 4103 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4104 self.sendToServer(MSG) 4105 4106 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4107 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4108 def testSecondCmsgTrunc2Int(self): 4109 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4110 2 * SIZEOF_INT) 4111 4112 @testSecondCmsgTrunc2Int.client_skip 4113 def _testSecondCmsgTrunc2Int(self): 4114 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4115 self.sendToServer(MSG) 4116 4117 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4118 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4119 def testSecondCmsgTruncLen0Minus1(self): 4120 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4121 socket.CMSG_LEN(0) - 1) 4122 4123 @testSecondCmsgTruncLen0Minus1.client_skip 4124 def _testSecondCmsgTruncLen0Minus1(self): 4125 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4126 self.sendToServer(MSG) 4127 4128 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4129 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4130 def testSecondCmsgTruncInData(self): 4131 # Test truncation of the second of two control messages inside 4132 # its associated data. 4133 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4134 socket.IPV6_RECVHOPLIMIT, 1) 4135 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4136 socket.IPV6_RECVTCLASS, 1) 4137 self.misc_event.set() 4138 msg, ancdata, flags, addr = self.doRecvmsg( 4139 self.serv_sock, len(MSG), 4140 socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) 4141 4142 self.assertEqual(msg, MSG) 4143 self.checkRecvmsgAddress(addr, self.cli_addr) 4144 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4145 4146 cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} 4147 4148 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4149 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4150 cmsg_types.remove(cmsg_type) 4151 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4152 a = array.array("i") 4153 a.frombytes(cmsg_data) 4154 self.assertGreaterEqual(a[0], 0) 4155 self.assertLessEqual(a[0], 255) 4156 4157 if ancdata: 4158 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4159 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4160 cmsg_types.remove(cmsg_type) 4161 self.assertLess(len(cmsg_data), SIZEOF_INT) 4162 4163 self.assertEqual(ancdata, []) 4164 4165 @testSecondCmsgTruncInData.client_skip 4166 def _testSecondCmsgTruncInData(self): 4167 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4168 self.sendToServer(MSG) 4169 4170 4171# Derive concrete test classes for different socket types. 4172 4173class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, 4174 SendrecvmsgConnectionlessBase, 4175 ThreadedSocketTestMixin, UDPTestBase): 4176 pass 4177 4178@requireAttrs(socket.socket, "sendmsg") 4179class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): 4180 pass 4181 4182@requireAttrs(socket.socket, "recvmsg") 4183class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): 4184 pass 4185 4186@requireAttrs(socket.socket, "recvmsg_into") 4187class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): 4188 pass 4189 4190 4191class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, 4192 SendrecvmsgConnectionlessBase, 4193 ThreadedSocketTestMixin, UDP6TestBase): 4194 4195 def checkRecvmsgAddress(self, addr1, addr2): 4196 # Called to compare the received address with the address of 4197 # the peer, ignoring scope ID 4198 self.assertEqual(addr1[:-1], addr2[:-1]) 4199 4200@requireAttrs(socket.socket, "sendmsg") 4201@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4202@requireSocket("AF_INET6", "SOCK_DGRAM") 4203class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): 4204 pass 4205 4206@requireAttrs(socket.socket, "recvmsg") 4207@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4208@requireSocket("AF_INET6", "SOCK_DGRAM") 4209class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): 4210 pass 4211 4212@requireAttrs(socket.socket, "recvmsg_into") 4213@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4214@requireSocket("AF_INET6", "SOCK_DGRAM") 4215class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): 4216 pass 4217 4218@requireAttrs(socket.socket, "recvmsg") 4219@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4220@requireAttrs(socket, "IPPROTO_IPV6") 4221@requireSocket("AF_INET6", "SOCK_DGRAM") 4222class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, 4223 SendrecvmsgUDP6TestBase): 4224 pass 4225 4226@requireAttrs(socket.socket, "recvmsg_into") 4227@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4228@requireAttrs(socket, "IPPROTO_IPV6") 4229@requireSocket("AF_INET6", "SOCK_DGRAM") 4230class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, 4231 RFC3542AncillaryTest, 4232 SendrecvmsgUDP6TestBase): 4233 pass 4234 4235 4236@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4237 'UDPLITE sockets required for this test.') 4238class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase, 4239 SendrecvmsgConnectionlessBase, 4240 ThreadedSocketTestMixin, UDPLITETestBase): 4241 pass 4242 4243@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4244 'UDPLITE sockets required for this test.') 4245@requireAttrs(socket.socket, "sendmsg") 4246class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase): 4247 pass 4248 4249@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4250 'UDPLITE sockets required for this test.') 4251@requireAttrs(socket.socket, "recvmsg") 4252class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase): 4253 pass 4254 4255@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4256 'UDPLITE sockets required for this test.') 4257@requireAttrs(socket.socket, "recvmsg_into") 4258class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase): 4259 pass 4260 4261 4262@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4263 'UDPLITE sockets required for this test.') 4264class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase, 4265 SendrecvmsgConnectionlessBase, 4266 ThreadedSocketTestMixin, UDPLITE6TestBase): 4267 4268 def checkRecvmsgAddress(self, addr1, addr2): 4269 # Called to compare the received address with the address of 4270 # the peer, ignoring scope ID 4271 self.assertEqual(addr1[:-1], addr2[:-1]) 4272 4273@requireAttrs(socket.socket, "sendmsg") 4274@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4275@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4276 'UDPLITE sockets required for this test.') 4277@requireSocket("AF_INET6", "SOCK_DGRAM") 4278class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase): 4279 pass 4280 4281@requireAttrs(socket.socket, "recvmsg") 4282@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4283@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4284 'UDPLITE sockets required for this test.') 4285@requireSocket("AF_INET6", "SOCK_DGRAM") 4286class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase): 4287 pass 4288 4289@requireAttrs(socket.socket, "recvmsg_into") 4290@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4291@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4292 'UDPLITE sockets required for this test.') 4293@requireSocket("AF_INET6", "SOCK_DGRAM") 4294class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase): 4295 pass 4296 4297@requireAttrs(socket.socket, "recvmsg") 4298@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4299@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4300 'UDPLITE sockets required for this test.') 4301@requireAttrs(socket, "IPPROTO_IPV6") 4302@requireSocket("AF_INET6", "SOCK_DGRAM") 4303class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest, 4304 SendrecvmsgUDPLITE6TestBase): 4305 pass 4306 4307@requireAttrs(socket.socket, "recvmsg_into") 4308@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4309@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4310 'UDPLITE sockets required for this test.') 4311@requireAttrs(socket, "IPPROTO_IPV6") 4312@requireSocket("AF_INET6", "SOCK_DGRAM") 4313class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin, 4314 RFC3542AncillaryTest, 4315 SendrecvmsgUDPLITE6TestBase): 4316 pass 4317 4318 4319class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, 4320 ConnectedStreamTestMixin, TCPTestBase): 4321 pass 4322 4323@requireAttrs(socket.socket, "sendmsg") 4324class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): 4325 pass 4326 4327@requireAttrs(socket.socket, "recvmsg") 4328class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, 4329 SendrecvmsgTCPTestBase): 4330 pass 4331 4332@requireAttrs(socket.socket, "recvmsg_into") 4333class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4334 SendrecvmsgTCPTestBase): 4335 pass 4336 4337 4338class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, 4339 SendrecvmsgConnectedBase, 4340 ConnectedStreamTestMixin, SCTPStreamBase): 4341 pass 4342 4343@requireAttrs(socket.socket, "sendmsg") 4344@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4345@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4346class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): 4347 pass 4348 4349@requireAttrs(socket.socket, "recvmsg") 4350@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4351@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4352class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4353 SendrecvmsgSCTPStreamTestBase): 4354 4355 def testRecvmsgEOF(self): 4356 try: 4357 super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() 4358 except OSError as e: 4359 if e.errno != errno.ENOTCONN: 4360 raise 4361 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4362 4363@requireAttrs(socket.socket, "recvmsg_into") 4364@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4365@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4366class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4367 SendrecvmsgSCTPStreamTestBase): 4368 4369 def testRecvmsgEOF(self): 4370 try: 4371 super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() 4372 except OSError as e: 4373 if e.errno != errno.ENOTCONN: 4374 raise 4375 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4376 4377 4378class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, 4379 ConnectedStreamTestMixin, UnixStreamBase): 4380 pass 4381 4382@requireAttrs(socket.socket, "sendmsg") 4383@requireAttrs(socket, "AF_UNIX") 4384class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): 4385 pass 4386 4387@requireAttrs(socket.socket, "recvmsg") 4388@requireAttrs(socket, "AF_UNIX") 4389class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4390 SendrecvmsgUnixStreamTestBase): 4391 pass 4392 4393@requireAttrs(socket.socket, "recvmsg_into") 4394@requireAttrs(socket, "AF_UNIX") 4395class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4396 SendrecvmsgUnixStreamTestBase): 4397 pass 4398 4399@requireAttrs(socket.socket, "sendmsg", "recvmsg") 4400@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4401class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): 4402 pass 4403 4404@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") 4405@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4406class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, 4407 SendrecvmsgUnixStreamTestBase): 4408 pass 4409 4410 4411# Test interrupting the interruptible send/receive methods with a 4412# signal when a timeout is set. These tests avoid having multiple 4413# threads alive during the test so that the OS cannot deliver the 4414# signal to the wrong one. 4415 4416class InterruptedTimeoutBase: 4417 # Base class for interrupted send/receive tests. Installs an 4418 # empty handler for SIGALRM and removes it on teardown, along with 4419 # any scheduled alarms. 4420 4421 def setUp(self): 4422 super().setUp() 4423 orig_alrm_handler = signal.signal(signal.SIGALRM, 4424 lambda signum, frame: 1 / 0) 4425 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) 4426 4427 # Timeout for socket operations 4428 timeout = support.LOOPBACK_TIMEOUT 4429 4430 # Provide setAlarm() method to schedule delivery of SIGALRM after 4431 # given number of seconds, or cancel it if zero, and an 4432 # appropriate time value to use. Use setitimer() if available. 4433 if hasattr(signal, "setitimer"): 4434 alarm_time = 0.05 4435 4436 def setAlarm(self, seconds): 4437 signal.setitimer(signal.ITIMER_REAL, seconds) 4438 else: 4439 # Old systems may deliver the alarm up to one second early 4440 alarm_time = 2 4441 4442 def setAlarm(self, seconds): 4443 signal.alarm(seconds) 4444 4445 4446# Require siginterrupt() in order to ensure that system calls are 4447# interrupted by default. 4448@requireAttrs(signal, "siginterrupt") 4449@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4450 "Don't have signal.alarm or signal.setitimer") 4451class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): 4452 # Test interrupting the recv*() methods with signals when a 4453 # timeout is set. 4454 4455 def setUp(self): 4456 super().setUp() 4457 self.serv.settimeout(self.timeout) 4458 4459 def checkInterruptedRecv(self, func, *args, **kwargs): 4460 # Check that func(*args, **kwargs) raises 4461 # errno of EINTR when interrupted by a signal. 4462 try: 4463 self.setAlarm(self.alarm_time) 4464 with self.assertRaises(ZeroDivisionError) as cm: 4465 func(*args, **kwargs) 4466 finally: 4467 self.setAlarm(0) 4468 4469 def testInterruptedRecvTimeout(self): 4470 self.checkInterruptedRecv(self.serv.recv, 1024) 4471 4472 def testInterruptedRecvIntoTimeout(self): 4473 self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) 4474 4475 def testInterruptedRecvfromTimeout(self): 4476 self.checkInterruptedRecv(self.serv.recvfrom, 1024) 4477 4478 def testInterruptedRecvfromIntoTimeout(self): 4479 self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) 4480 4481 @requireAttrs(socket.socket, "recvmsg") 4482 def testInterruptedRecvmsgTimeout(self): 4483 self.checkInterruptedRecv(self.serv.recvmsg, 1024) 4484 4485 @requireAttrs(socket.socket, "recvmsg_into") 4486 def testInterruptedRecvmsgIntoTimeout(self): 4487 self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) 4488 4489 4490# Require siginterrupt() in order to ensure that system calls are 4491# interrupted by default. 4492@requireAttrs(signal, "siginterrupt") 4493@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4494 "Don't have signal.alarm or signal.setitimer") 4495class InterruptedSendTimeoutTest(InterruptedTimeoutBase, 4496 ThreadSafeCleanupTestCase, 4497 SocketListeningTestMixin, TCPTestBase): 4498 # Test interrupting the interruptible send*() methods with signals 4499 # when a timeout is set. 4500 4501 def setUp(self): 4502 super().setUp() 4503 self.serv_conn = self.newSocket() 4504 self.addCleanup(self.serv_conn.close) 4505 # Use a thread to complete the connection, but wait for it to 4506 # terminate before running the test, so that there is only one 4507 # thread to accept the signal. 4508 cli_thread = threading.Thread(target=self.doConnect) 4509 cli_thread.start() 4510 self.cli_conn, addr = self.serv.accept() 4511 self.addCleanup(self.cli_conn.close) 4512 cli_thread.join() 4513 self.serv_conn.settimeout(self.timeout) 4514 4515 def doConnect(self): 4516 self.serv_conn.connect(self.serv_addr) 4517 4518 def checkInterruptedSend(self, func, *args, **kwargs): 4519 # Check that func(*args, **kwargs), run in a loop, raises 4520 # OSError with an errno of EINTR when interrupted by a 4521 # signal. 4522 try: 4523 with self.assertRaises(ZeroDivisionError) as cm: 4524 while True: 4525 self.setAlarm(self.alarm_time) 4526 func(*args, **kwargs) 4527 finally: 4528 self.setAlarm(0) 4529 4530 # Issue #12958: The following tests have problems on OS X prior to 10.7 4531 @support.requires_mac_ver(10, 7) 4532 def testInterruptedSendTimeout(self): 4533 self.checkInterruptedSend(self.serv_conn.send, b"a"*512) 4534 4535 @support.requires_mac_ver(10, 7) 4536 def testInterruptedSendtoTimeout(self): 4537 # Passing an actual address here as Python's wrapper for 4538 # sendto() doesn't allow passing a zero-length one; POSIX 4539 # requires that the address is ignored since the socket is 4540 # connection-mode, however. 4541 self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, 4542 self.serv_addr) 4543 4544 @support.requires_mac_ver(10, 7) 4545 @requireAttrs(socket.socket, "sendmsg") 4546 def testInterruptedSendmsgTimeout(self): 4547 self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) 4548 4549 4550class TCPCloserTest(ThreadedTCPSocketTest): 4551 4552 def testClose(self): 4553 conn, addr = self.serv.accept() 4554 conn.close() 4555 4556 sd = self.cli 4557 read, write, err = select.select([sd], [], [], 1.0) 4558 self.assertEqual(read, [sd]) 4559 self.assertEqual(sd.recv(1), b'') 4560 4561 # Calling close() many times should be safe. 4562 conn.close() 4563 conn.close() 4564 4565 def _testClose(self): 4566 self.cli.connect((HOST, self.port)) 4567 time.sleep(1.0) 4568 4569 4570class BasicSocketPairTest(SocketPairTest): 4571 4572 def __init__(self, methodName='runTest'): 4573 SocketPairTest.__init__(self, methodName=methodName) 4574 4575 def _check_defaults(self, sock): 4576 self.assertIsInstance(sock, socket.socket) 4577 if hasattr(socket, 'AF_UNIX'): 4578 self.assertEqual(sock.family, socket.AF_UNIX) 4579 else: 4580 self.assertEqual(sock.family, socket.AF_INET) 4581 self.assertEqual(sock.type, socket.SOCK_STREAM) 4582 self.assertEqual(sock.proto, 0) 4583 4584 def _testDefaults(self): 4585 self._check_defaults(self.cli) 4586 4587 def testDefaults(self): 4588 self._check_defaults(self.serv) 4589 4590 def testRecv(self): 4591 msg = self.serv.recv(1024) 4592 self.assertEqual(msg, MSG) 4593 4594 def _testRecv(self): 4595 self.cli.send(MSG) 4596 4597 def testSend(self): 4598 self.serv.send(MSG) 4599 4600 def _testSend(self): 4601 msg = self.cli.recv(1024) 4602 self.assertEqual(msg, MSG) 4603 4604 4605class NonBlockingTCPTests(ThreadedTCPSocketTest): 4606 4607 def __init__(self, methodName='runTest'): 4608 self.event = threading.Event() 4609 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 4610 4611 def assert_sock_timeout(self, sock, timeout): 4612 self.assertEqual(self.serv.gettimeout(), timeout) 4613 4614 blocking = (timeout != 0.0) 4615 self.assertEqual(sock.getblocking(), blocking) 4616 4617 if fcntl is not None: 4618 # When a Python socket has a non-zero timeout, it's switched 4619 # internally to a non-blocking mode. Later, sock.sendall(), 4620 # sock.recv(), and other socket operations use a select() call and 4621 # handle EWOULDBLOCK/EGAIN on all socket operations. That's how 4622 # timeouts are enforced. 4623 fd_blocking = (timeout is None) 4624 4625 flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK) 4626 self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking) 4627 4628 def testSetBlocking(self): 4629 # Test setblocking() and settimeout() methods 4630 self.serv.setblocking(True) 4631 self.assert_sock_timeout(self.serv, None) 4632 4633 self.serv.setblocking(False) 4634 self.assert_sock_timeout(self.serv, 0.0) 4635 4636 self.serv.settimeout(None) 4637 self.assert_sock_timeout(self.serv, None) 4638 4639 self.serv.settimeout(0) 4640 self.assert_sock_timeout(self.serv, 0) 4641 4642 self.serv.settimeout(10) 4643 self.assert_sock_timeout(self.serv, 10) 4644 4645 self.serv.settimeout(0) 4646 self.assert_sock_timeout(self.serv, 0) 4647 4648 def _testSetBlocking(self): 4649 pass 4650 4651 @support.cpython_only 4652 def testSetBlocking_overflow(self): 4653 # Issue 15989 4654 import _testcapi 4655 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 4656 self.skipTest('needs UINT_MAX < ULONG_MAX') 4657 4658 self.serv.setblocking(False) 4659 self.assertEqual(self.serv.gettimeout(), 0.0) 4660 4661 self.serv.setblocking(_testcapi.UINT_MAX + 1) 4662 self.assertIsNone(self.serv.gettimeout()) 4663 4664 _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) 4665 4666 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 4667 'test needs socket.SOCK_NONBLOCK') 4668 @support.requires_linux_version(2, 6, 28) 4669 def testInitNonBlocking(self): 4670 # create a socket with SOCK_NONBLOCK 4671 self.serv.close() 4672 self.serv = socket.socket(socket.AF_INET, 4673 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 4674 self.assert_sock_timeout(self.serv, 0) 4675 4676 def _testInitNonBlocking(self): 4677 pass 4678 4679 def testInheritFlagsBlocking(self): 4680 # bpo-7995: accept() on a listening socket with a timeout and the 4681 # default timeout is None, the resulting socket must be blocking. 4682 with socket_setdefaulttimeout(None): 4683 self.serv.settimeout(10) 4684 conn, addr = self.serv.accept() 4685 self.addCleanup(conn.close) 4686 self.assertIsNone(conn.gettimeout()) 4687 4688 def _testInheritFlagsBlocking(self): 4689 self.cli.connect((HOST, self.port)) 4690 4691 def testInheritFlagsTimeout(self): 4692 # bpo-7995: accept() on a listening socket with a timeout and the 4693 # default timeout is None, the resulting socket must inherit 4694 # the default timeout. 4695 default_timeout = 20.0 4696 with socket_setdefaulttimeout(default_timeout): 4697 self.serv.settimeout(10) 4698 conn, addr = self.serv.accept() 4699 self.addCleanup(conn.close) 4700 self.assertEqual(conn.gettimeout(), default_timeout) 4701 4702 def _testInheritFlagsTimeout(self): 4703 self.cli.connect((HOST, self.port)) 4704 4705 def testAccept(self): 4706 # Testing non-blocking accept 4707 self.serv.setblocking(False) 4708 4709 # connect() didn't start: non-blocking accept() fails 4710 start_time = time.monotonic() 4711 with self.assertRaises(BlockingIOError): 4712 conn, addr = self.serv.accept() 4713 dt = time.monotonic() - start_time 4714 self.assertLess(dt, 1.0) 4715 4716 self.event.set() 4717 4718 read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT) 4719 if self.serv not in read: 4720 self.fail("Error trying to do accept after select.") 4721 4722 # connect() completed: non-blocking accept() doesn't block 4723 conn, addr = self.serv.accept() 4724 self.addCleanup(conn.close) 4725 self.assertIsNone(conn.gettimeout()) 4726 4727 def _testAccept(self): 4728 # don't connect before event is set to check 4729 # that non-blocking accept() raises BlockingIOError 4730 self.event.wait() 4731 4732 self.cli.connect((HOST, self.port)) 4733 4734 def testRecv(self): 4735 # Testing non-blocking recv 4736 conn, addr = self.serv.accept() 4737 self.addCleanup(conn.close) 4738 conn.setblocking(False) 4739 4740 # the server didn't send data yet: non-blocking recv() fails 4741 with self.assertRaises(BlockingIOError): 4742 msg = conn.recv(len(MSG)) 4743 4744 self.event.set() 4745 4746 read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT) 4747 if conn not in read: 4748 self.fail("Error during select call to non-blocking socket.") 4749 4750 # the server sent data yet: non-blocking recv() doesn't block 4751 msg = conn.recv(len(MSG)) 4752 self.assertEqual(msg, MSG) 4753 4754 def _testRecv(self): 4755 self.cli.connect((HOST, self.port)) 4756 4757 # don't send anything before event is set to check 4758 # that non-blocking recv() raises BlockingIOError 4759 self.event.wait() 4760 4761 # send data: recv() will no longer block 4762 self.cli.sendall(MSG) 4763 4764 4765class FileObjectClassTestCase(SocketConnectedTest): 4766 """Unit tests for the object returned by socket.makefile() 4767 4768 self.read_file is the io object returned by makefile() on 4769 the client connection. You can read from this file to 4770 get output from the server. 4771 4772 self.write_file is the io object returned by makefile() on the 4773 server connection. You can write to this file to send output 4774 to the client. 4775 """ 4776 4777 bufsize = -1 # Use default buffer size 4778 encoding = 'utf-8' 4779 errors = 'strict' 4780 newline = None 4781 4782 read_mode = 'rb' 4783 read_msg = MSG 4784 write_mode = 'wb' 4785 write_msg = MSG 4786 4787 def __init__(self, methodName='runTest'): 4788 SocketConnectedTest.__init__(self, methodName=methodName) 4789 4790 def setUp(self): 4791 self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ 4792 threading.Event() for i in range(4)] 4793 SocketConnectedTest.setUp(self) 4794 self.read_file = self.cli_conn.makefile( 4795 self.read_mode, self.bufsize, 4796 encoding = self.encoding, 4797 errors = self.errors, 4798 newline = self.newline) 4799 4800 def tearDown(self): 4801 self.serv_finished.set() 4802 self.read_file.close() 4803 self.assertTrue(self.read_file.closed) 4804 self.read_file = None 4805 SocketConnectedTest.tearDown(self) 4806 4807 def clientSetUp(self): 4808 SocketConnectedTest.clientSetUp(self) 4809 self.write_file = self.serv_conn.makefile( 4810 self.write_mode, self.bufsize, 4811 encoding = self.encoding, 4812 errors = self.errors, 4813 newline = self.newline) 4814 4815 def clientTearDown(self): 4816 self.cli_finished.set() 4817 self.write_file.close() 4818 self.assertTrue(self.write_file.closed) 4819 self.write_file = None 4820 SocketConnectedTest.clientTearDown(self) 4821 4822 def testReadAfterTimeout(self): 4823 # Issue #7322: A file object must disallow further reads 4824 # after a timeout has occurred. 4825 self.cli_conn.settimeout(1) 4826 self.read_file.read(3) 4827 # First read raises a timeout 4828 self.assertRaises(TimeoutError, self.read_file.read, 1) 4829 # Second read is disallowed 4830 with self.assertRaises(OSError) as ctx: 4831 self.read_file.read(1) 4832 self.assertIn("cannot read from timed out object", str(ctx.exception)) 4833 4834 def _testReadAfterTimeout(self): 4835 self.write_file.write(self.write_msg[0:3]) 4836 self.write_file.flush() 4837 self.serv_finished.wait() 4838 4839 def testSmallRead(self): 4840 # Performing small file read test 4841 first_seg = self.read_file.read(len(self.read_msg)-3) 4842 second_seg = self.read_file.read(3) 4843 msg = first_seg + second_seg 4844 self.assertEqual(msg, self.read_msg) 4845 4846 def _testSmallRead(self): 4847 self.write_file.write(self.write_msg) 4848 self.write_file.flush() 4849 4850 def testFullRead(self): 4851 # read until EOF 4852 msg = self.read_file.read() 4853 self.assertEqual(msg, self.read_msg) 4854 4855 def _testFullRead(self): 4856 self.write_file.write(self.write_msg) 4857 self.write_file.close() 4858 4859 def testUnbufferedRead(self): 4860 # Performing unbuffered file read test 4861 buf = type(self.read_msg)() 4862 while 1: 4863 char = self.read_file.read(1) 4864 if not char: 4865 break 4866 buf += char 4867 self.assertEqual(buf, self.read_msg) 4868 4869 def _testUnbufferedRead(self): 4870 self.write_file.write(self.write_msg) 4871 self.write_file.flush() 4872 4873 def testReadline(self): 4874 # Performing file readline test 4875 line = self.read_file.readline() 4876 self.assertEqual(line, self.read_msg) 4877 4878 def _testReadline(self): 4879 self.write_file.write(self.write_msg) 4880 self.write_file.flush() 4881 4882 def testCloseAfterMakefile(self): 4883 # The file returned by makefile should keep the socket open. 4884 self.cli_conn.close() 4885 # read until EOF 4886 msg = self.read_file.read() 4887 self.assertEqual(msg, self.read_msg) 4888 4889 def _testCloseAfterMakefile(self): 4890 self.write_file.write(self.write_msg) 4891 self.write_file.flush() 4892 4893 def testMakefileAfterMakefileClose(self): 4894 self.read_file.close() 4895 msg = self.cli_conn.recv(len(MSG)) 4896 if isinstance(self.read_msg, str): 4897 msg = msg.decode() 4898 self.assertEqual(msg, self.read_msg) 4899 4900 def _testMakefileAfterMakefileClose(self): 4901 self.write_file.write(self.write_msg) 4902 self.write_file.flush() 4903 4904 def testClosedAttr(self): 4905 self.assertTrue(not self.read_file.closed) 4906 4907 def _testClosedAttr(self): 4908 self.assertTrue(not self.write_file.closed) 4909 4910 def testAttributes(self): 4911 self.assertEqual(self.read_file.mode, self.read_mode) 4912 self.assertEqual(self.read_file.name, self.cli_conn.fileno()) 4913 4914 def _testAttributes(self): 4915 self.assertEqual(self.write_file.mode, self.write_mode) 4916 self.assertEqual(self.write_file.name, self.serv_conn.fileno()) 4917 4918 def testRealClose(self): 4919 self.read_file.close() 4920 self.assertRaises(ValueError, self.read_file.fileno) 4921 self.cli_conn.close() 4922 self.assertRaises(OSError, self.cli_conn.getsockname) 4923 4924 def _testRealClose(self): 4925 pass 4926 4927 4928class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 4929 4930 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 4931 4932 In this case (and in this case only), it should be possible to 4933 create a file object, read a line from it, create another file 4934 object, read another line from it, without loss of data in the 4935 first file object's buffer. Note that http.client relies on this 4936 when reading multiple requests from the same socket.""" 4937 4938 bufsize = 0 # Use unbuffered mode 4939 4940 def testUnbufferedReadline(self): 4941 # Read a line, create a new file object, read another line with it 4942 line = self.read_file.readline() # first line 4943 self.assertEqual(line, b"A. " + self.write_msg) # first line 4944 self.read_file = self.cli_conn.makefile('rb', 0) 4945 line = self.read_file.readline() # second line 4946 self.assertEqual(line, b"B. " + self.write_msg) # second line 4947 4948 def _testUnbufferedReadline(self): 4949 self.write_file.write(b"A. " + self.write_msg) 4950 self.write_file.write(b"B. " + self.write_msg) 4951 self.write_file.flush() 4952 4953 def testMakefileClose(self): 4954 # The file returned by makefile should keep the socket open... 4955 self.cli_conn.close() 4956 msg = self.cli_conn.recv(1024) 4957 self.assertEqual(msg, self.read_msg) 4958 # ...until the file is itself closed 4959 self.read_file.close() 4960 self.assertRaises(OSError, self.cli_conn.recv, 1024) 4961 4962 def _testMakefileClose(self): 4963 self.write_file.write(self.write_msg) 4964 self.write_file.flush() 4965 4966 def testMakefileCloseSocketDestroy(self): 4967 refcount_before = sys.getrefcount(self.cli_conn) 4968 self.read_file.close() 4969 refcount_after = sys.getrefcount(self.cli_conn) 4970 self.assertEqual(refcount_before - 1, refcount_after) 4971 4972 def _testMakefileCloseSocketDestroy(self): 4973 pass 4974 4975 # Non-blocking ops 4976 # NOTE: to set `read_file` as non-blocking, we must call 4977 # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). 4978 4979 def testSmallReadNonBlocking(self): 4980 self.cli_conn.setblocking(False) 4981 self.assertEqual(self.read_file.readinto(bytearray(10)), None) 4982 self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) 4983 self.evt1.set() 4984 self.evt2.wait(1.0) 4985 first_seg = self.read_file.read(len(self.read_msg) - 3) 4986 if first_seg is None: 4987 # Data not arrived (can happen under Windows), wait a bit 4988 time.sleep(0.5) 4989 first_seg = self.read_file.read(len(self.read_msg) - 3) 4990 buf = bytearray(10) 4991 n = self.read_file.readinto(buf) 4992 self.assertEqual(n, 3) 4993 msg = first_seg + buf[:n] 4994 self.assertEqual(msg, self.read_msg) 4995 self.assertEqual(self.read_file.readinto(bytearray(16)), None) 4996 self.assertEqual(self.read_file.read(1), None) 4997 4998 def _testSmallReadNonBlocking(self): 4999 self.evt1.wait(1.0) 5000 self.write_file.write(self.write_msg) 5001 self.write_file.flush() 5002 self.evt2.set() 5003 # Avoid closing the socket before the server test has finished, 5004 # otherwise system recv() will return 0 instead of EWOULDBLOCK. 5005 self.serv_finished.wait(5.0) 5006 5007 def testWriteNonBlocking(self): 5008 self.cli_finished.wait(5.0) 5009 # The client thread can't skip directly - the SkipTest exception 5010 # would appear as a failure. 5011 if self.serv_skipped: 5012 self.skipTest(self.serv_skipped) 5013 5014 def _testWriteNonBlocking(self): 5015 self.serv_skipped = None 5016 self.serv_conn.setblocking(False) 5017 # Try to saturate the socket buffer pipe with repeated large writes. 5018 BIG = b"x" * support.SOCK_MAX_SIZE 5019 LIMIT = 10 5020 # The first write() succeeds since a chunk of data can be buffered 5021 n = self.write_file.write(BIG) 5022 self.assertGreater(n, 0) 5023 for i in range(LIMIT): 5024 n = self.write_file.write(BIG) 5025 if n is None: 5026 # Succeeded 5027 break 5028 self.assertGreater(n, 0) 5029 else: 5030 # Let us know that this test didn't manage to establish 5031 # the expected conditions. This is not a failure in itself but, 5032 # if it happens repeatedly, the test should be fixed. 5033 self.serv_skipped = "failed to saturate the socket buffer" 5034 5035 5036class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5037 5038 bufsize = 1 # Default-buffered for reading; line-buffered for writing 5039 5040 5041class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5042 5043 bufsize = 2 # Exercise the buffering code 5044 5045 5046class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): 5047 """Tests for socket.makefile() in text mode (rather than binary)""" 5048 5049 read_mode = 'r' 5050 read_msg = MSG.decode('utf-8') 5051 write_mode = 'wb' 5052 write_msg = MSG 5053 newline = '' 5054 5055 5056class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): 5057 """Tests for socket.makefile() in text mode (rather than binary)""" 5058 5059 read_mode = 'rb' 5060 read_msg = MSG 5061 write_mode = 'w' 5062 write_msg = MSG.decode('utf-8') 5063 newline = '' 5064 5065 5066class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): 5067 """Tests for socket.makefile() in text mode (rather than binary)""" 5068 5069 read_mode = 'r' 5070 read_msg = MSG.decode('utf-8') 5071 write_mode = 'w' 5072 write_msg = MSG.decode('utf-8') 5073 newline = '' 5074 5075 5076class NetworkConnectionTest(object): 5077 """Prove network connection.""" 5078 5079 def clientSetUp(self): 5080 # We're inherited below by BasicTCPTest2, which also inherits 5081 # BasicTCPTest, which defines self.port referenced below. 5082 self.cli = socket.create_connection((HOST, self.port)) 5083 self.serv_conn = self.cli 5084 5085class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 5086 """Tests that NetworkConnection does not break existing TCP functionality. 5087 """ 5088 5089class NetworkConnectionNoServer(unittest.TestCase): 5090 5091 class MockSocket(socket.socket): 5092 def connect(self, *args): 5093 raise TimeoutError('timed out') 5094 5095 @contextlib.contextmanager 5096 def mocked_socket_module(self): 5097 """Return a socket which times out on connect""" 5098 old_socket = socket.socket 5099 socket.socket = self.MockSocket 5100 try: 5101 yield 5102 finally: 5103 socket.socket = old_socket 5104 5105 def test_connect(self): 5106 port = socket_helper.find_unused_port() 5107 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5108 self.addCleanup(cli.close) 5109 with self.assertRaises(OSError) as cm: 5110 cli.connect((HOST, port)) 5111 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 5112 5113 def test_create_connection(self): 5114 # Issue #9792: errors raised by create_connection() should have 5115 # a proper errno attribute. 5116 port = socket_helper.find_unused_port() 5117 with self.assertRaises(OSError) as cm: 5118 socket.create_connection((HOST, port)) 5119 5120 # Issue #16257: create_connection() calls getaddrinfo() against 5121 # 'localhost'. This may result in an IPV6 addr being returned 5122 # as well as an IPV4 one: 5123 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 5124 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 5125 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 5126 # 5127 # create_connection() enumerates through all the addresses returned 5128 # and if it doesn't successfully bind to any of them, it propagates 5129 # the last exception it encountered. 5130 # 5131 # On Solaris, ENETUNREACH is returned in this circumstance instead 5132 # of ECONNREFUSED. So, if that errno exists, add it to our list of 5133 # expected errnos. 5134 expected_errnos = socket_helper.get_socket_conn_refused_errs() 5135 self.assertIn(cm.exception.errno, expected_errnos) 5136 5137 def test_create_connection_timeout(self): 5138 # Issue #9792: create_connection() should not recast timeout errors 5139 # as generic socket errors. 5140 with self.mocked_socket_module(): 5141 try: 5142 socket.create_connection((HOST, 1234)) 5143 except TimeoutError: 5144 pass 5145 except OSError as exc: 5146 if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: 5147 raise 5148 else: 5149 self.fail('TimeoutError not raised') 5150 5151 5152class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 5153 5154 def __init__(self, methodName='runTest'): 5155 SocketTCPTest.__init__(self, methodName=methodName) 5156 ThreadableTest.__init__(self) 5157 5158 def clientSetUp(self): 5159 self.source_port = socket_helper.find_unused_port() 5160 5161 def clientTearDown(self): 5162 self.cli.close() 5163 self.cli = None 5164 ThreadableTest.clientTearDown(self) 5165 5166 def _justAccept(self): 5167 conn, addr = self.serv.accept() 5168 conn.close() 5169 5170 testFamily = _justAccept 5171 def _testFamily(self): 5172 self.cli = socket.create_connection((HOST, self.port), 5173 timeout=support.LOOPBACK_TIMEOUT) 5174 self.addCleanup(self.cli.close) 5175 self.assertEqual(self.cli.family, 2) 5176 5177 testSourceAddress = _justAccept 5178 def _testSourceAddress(self): 5179 self.cli = socket.create_connection((HOST, self.port), 5180 timeout=support.LOOPBACK_TIMEOUT, 5181 source_address=('', self.source_port)) 5182 self.addCleanup(self.cli.close) 5183 self.assertEqual(self.cli.getsockname()[1], self.source_port) 5184 # The port number being used is sufficient to show that the bind() 5185 # call happened. 5186 5187 testTimeoutDefault = _justAccept 5188 def _testTimeoutDefault(self): 5189 # passing no explicit timeout uses socket's global default 5190 self.assertTrue(socket.getdefaulttimeout() is None) 5191 socket.setdefaulttimeout(42) 5192 try: 5193 self.cli = socket.create_connection((HOST, self.port)) 5194 self.addCleanup(self.cli.close) 5195 finally: 5196 socket.setdefaulttimeout(None) 5197 self.assertEqual(self.cli.gettimeout(), 42) 5198 5199 testTimeoutNone = _justAccept 5200 def _testTimeoutNone(self): 5201 # None timeout means the same as sock.settimeout(None) 5202 self.assertTrue(socket.getdefaulttimeout() is None) 5203 socket.setdefaulttimeout(30) 5204 try: 5205 self.cli = socket.create_connection((HOST, self.port), timeout=None) 5206 self.addCleanup(self.cli.close) 5207 finally: 5208 socket.setdefaulttimeout(None) 5209 self.assertEqual(self.cli.gettimeout(), None) 5210 5211 testTimeoutValueNamed = _justAccept 5212 def _testTimeoutValueNamed(self): 5213 self.cli = socket.create_connection((HOST, self.port), timeout=30) 5214 self.assertEqual(self.cli.gettimeout(), 30) 5215 5216 testTimeoutValueNonamed = _justAccept 5217 def _testTimeoutValueNonamed(self): 5218 self.cli = socket.create_connection((HOST, self.port), 30) 5219 self.addCleanup(self.cli.close) 5220 self.assertEqual(self.cli.gettimeout(), 30) 5221 5222 5223class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 5224 5225 def __init__(self, methodName='runTest'): 5226 SocketTCPTest.__init__(self, methodName=methodName) 5227 ThreadableTest.__init__(self) 5228 5229 def clientSetUp(self): 5230 pass 5231 5232 def clientTearDown(self): 5233 self.cli.close() 5234 self.cli = None 5235 ThreadableTest.clientTearDown(self) 5236 5237 def testInsideTimeout(self): 5238 conn, addr = self.serv.accept() 5239 self.addCleanup(conn.close) 5240 time.sleep(3) 5241 conn.send(b"done!") 5242 testOutsideTimeout = testInsideTimeout 5243 5244 def _testInsideTimeout(self): 5245 self.cli = sock = socket.create_connection((HOST, self.port)) 5246 data = sock.recv(5) 5247 self.assertEqual(data, b"done!") 5248 5249 def _testOutsideTimeout(self): 5250 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 5251 self.assertRaises(TimeoutError, lambda: sock.recv(5)) 5252 5253 5254class TCPTimeoutTest(SocketTCPTest): 5255 5256 def testTCPTimeout(self): 5257 def raise_timeout(*args, **kwargs): 5258 self.serv.settimeout(1.0) 5259 self.serv.accept() 5260 self.assertRaises(TimeoutError, raise_timeout, 5261 "Error generating a timeout exception (TCP)") 5262 5263 def testTimeoutZero(self): 5264 ok = False 5265 try: 5266 self.serv.settimeout(0.0) 5267 foo = self.serv.accept() 5268 except TimeoutError: 5269 self.fail("caught timeout instead of error (TCP)") 5270 except OSError: 5271 ok = True 5272 except: 5273 self.fail("caught unexpected exception (TCP)") 5274 if not ok: 5275 self.fail("accept() returned success when we did not expect it") 5276 5277 @unittest.skipUnless(hasattr(signal, 'alarm'), 5278 'test needs signal.alarm()') 5279 def testInterruptedTimeout(self): 5280 # XXX I don't know how to do this test on MSWindows or any other 5281 # platform that doesn't support signal.alarm() or os.kill(), though 5282 # the bug should have existed on all platforms. 5283 self.serv.settimeout(5.0) # must be longer than alarm 5284 class Alarm(Exception): 5285 pass 5286 def alarm_handler(signal, frame): 5287 raise Alarm 5288 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 5289 try: 5290 try: 5291 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 5292 foo = self.serv.accept() 5293 except TimeoutError: 5294 self.fail("caught timeout instead of Alarm") 5295 except Alarm: 5296 pass 5297 except: 5298 self.fail("caught other exception instead of Alarm:" 5299 " %s(%s):\n%s" % 5300 (sys.exc_info()[:2] + (traceback.format_exc(),))) 5301 else: 5302 self.fail("nothing caught") 5303 finally: 5304 signal.alarm(0) # shut off alarm 5305 except Alarm: 5306 self.fail("got Alarm in wrong place") 5307 finally: 5308 # no alarm can be pending. Safe to restore old handler. 5309 signal.signal(signal.SIGALRM, old_alarm) 5310 5311class UDPTimeoutTest(SocketUDPTest): 5312 5313 def testUDPTimeout(self): 5314 def raise_timeout(*args, **kwargs): 5315 self.serv.settimeout(1.0) 5316 self.serv.recv(1024) 5317 self.assertRaises(TimeoutError, raise_timeout, 5318 "Error generating a timeout exception (UDP)") 5319 5320 def testTimeoutZero(self): 5321 ok = False 5322 try: 5323 self.serv.settimeout(0.0) 5324 foo = self.serv.recv(1024) 5325 except TimeoutError: 5326 self.fail("caught timeout instead of error (UDP)") 5327 except OSError: 5328 ok = True 5329 except: 5330 self.fail("caught unexpected exception (UDP)") 5331 if not ok: 5332 self.fail("recv() returned success when we did not expect it") 5333 5334@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 5335 'UDPLITE sockets required for this test.') 5336class UDPLITETimeoutTest(SocketUDPLITETest): 5337 5338 def testUDPLITETimeout(self): 5339 def raise_timeout(*args, **kwargs): 5340 self.serv.settimeout(1.0) 5341 self.serv.recv(1024) 5342 self.assertRaises(TimeoutError, raise_timeout, 5343 "Error generating a timeout exception (UDPLITE)") 5344 5345 def testTimeoutZero(self): 5346 ok = False 5347 try: 5348 self.serv.settimeout(0.0) 5349 foo = self.serv.recv(1024) 5350 except TimeoutError: 5351 self.fail("caught timeout instead of error (UDPLITE)") 5352 except OSError: 5353 ok = True 5354 except: 5355 self.fail("caught unexpected exception (UDPLITE)") 5356 if not ok: 5357 self.fail("recv() returned success when we did not expect it") 5358 5359class TestExceptions(unittest.TestCase): 5360 5361 def testExceptionTree(self): 5362 self.assertTrue(issubclass(OSError, Exception)) 5363 self.assertTrue(issubclass(socket.herror, OSError)) 5364 self.assertTrue(issubclass(socket.gaierror, OSError)) 5365 self.assertTrue(issubclass(socket.timeout, OSError)) 5366 self.assertIs(socket.error, OSError) 5367 self.assertIs(socket.timeout, TimeoutError) 5368 5369 def test_setblocking_invalidfd(self): 5370 # Regression test for issue #28471 5371 5372 sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 5373 sock = socket.socket( 5374 socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) 5375 sock0.close() 5376 self.addCleanup(sock.detach) 5377 5378 with self.assertRaises(OSError): 5379 sock.setblocking(False) 5380 5381 5382@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 5383class TestLinuxAbstractNamespace(unittest.TestCase): 5384 5385 UNIX_PATH_MAX = 108 5386 5387 def testLinuxAbstractNamespace(self): 5388 address = b"\x00python-test-hello\x00\xff" 5389 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 5390 s1.bind(address) 5391 s1.listen() 5392 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 5393 s2.connect(s1.getsockname()) 5394 with s1.accept()[0] as s3: 5395 self.assertEqual(s1.getsockname(), address) 5396 self.assertEqual(s2.getpeername(), address) 5397 5398 def testMaxName(self): 5399 address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) 5400 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5401 s.bind(address) 5402 self.assertEqual(s.getsockname(), address) 5403 5404 def testNameOverflow(self): 5405 address = "\x00" + "h" * self.UNIX_PATH_MAX 5406 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5407 self.assertRaises(OSError, s.bind, address) 5408 5409 def testStrName(self): 5410 # Check that an abstract name can be passed as a string. 5411 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5412 try: 5413 s.bind("\x00python\x00test\x00") 5414 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5415 finally: 5416 s.close() 5417 5418 def testBytearrayName(self): 5419 # Check that an abstract name can be passed as a bytearray. 5420 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5421 s.bind(bytearray(b"\x00python\x00test\x00")) 5422 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5423 5424@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') 5425class TestUnixDomain(unittest.TestCase): 5426 5427 def setUp(self): 5428 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5429 5430 def tearDown(self): 5431 self.sock.close() 5432 5433 def encoded(self, path): 5434 # Return the given path encoded in the file system encoding, 5435 # or skip the test if this is not possible. 5436 try: 5437 return os.fsencode(path) 5438 except UnicodeEncodeError: 5439 self.skipTest( 5440 "Pathname {0!a} cannot be represented in file " 5441 "system encoding {1!r}".format( 5442 path, sys.getfilesystemencoding())) 5443 5444 def bind(self, sock, path): 5445 # Bind the socket 5446 try: 5447 socket_helper.bind_unix_socket(sock, path) 5448 except OSError as e: 5449 if str(e) == "AF_UNIX path too long": 5450 self.skipTest( 5451 "Pathname {0!a} is too long to serve as an AF_UNIX path" 5452 .format(path)) 5453 else: 5454 raise 5455 5456 def testUnbound(self): 5457 # Issue #30205 (note getsockname() can return None on OS X) 5458 self.assertIn(self.sock.getsockname(), ('', None)) 5459 5460 def testStrAddr(self): 5461 # Test binding to and retrieving a normal string pathname. 5462 path = os.path.abspath(os_helper.TESTFN) 5463 self.bind(self.sock, path) 5464 self.addCleanup(os_helper.unlink, path) 5465 self.assertEqual(self.sock.getsockname(), path) 5466 5467 def testBytesAddr(self): 5468 # Test binding to a bytes pathname. 5469 path = os.path.abspath(os_helper.TESTFN) 5470 self.bind(self.sock, self.encoded(path)) 5471 self.addCleanup(os_helper.unlink, path) 5472 self.assertEqual(self.sock.getsockname(), path) 5473 5474 def testSurrogateescapeBind(self): 5475 # Test binding to a valid non-ASCII pathname, with the 5476 # non-ASCII bytes supplied using surrogateescape encoding. 5477 path = os.path.abspath(os_helper.TESTFN_UNICODE) 5478 b = self.encoded(path) 5479 self.bind(self.sock, b.decode("ascii", "surrogateescape")) 5480 self.addCleanup(os_helper.unlink, path) 5481 self.assertEqual(self.sock.getsockname(), path) 5482 5483 def testUnencodableAddr(self): 5484 # Test binding to a pathname that cannot be encoded in the 5485 # file system encoding. 5486 if os_helper.TESTFN_UNENCODABLE is None: 5487 self.skipTest("No unencodable filename available") 5488 path = os.path.abspath(os_helper.TESTFN_UNENCODABLE) 5489 self.bind(self.sock, path) 5490 self.addCleanup(os_helper.unlink, path) 5491 self.assertEqual(self.sock.getsockname(), path) 5492 5493 5494class BufferIOTest(SocketConnectedTest): 5495 """ 5496 Test the buffer versions of socket.recv() and socket.send(). 5497 """ 5498 def __init__(self, methodName='runTest'): 5499 SocketConnectedTest.__init__(self, methodName=methodName) 5500 5501 def testRecvIntoArray(self): 5502 buf = array.array("B", [0] * len(MSG)) 5503 nbytes = self.cli_conn.recv_into(buf) 5504 self.assertEqual(nbytes, len(MSG)) 5505 buf = buf.tobytes() 5506 msg = buf[:len(MSG)] 5507 self.assertEqual(msg, MSG) 5508 5509 def _testRecvIntoArray(self): 5510 buf = bytes(MSG) 5511 self.serv_conn.send(buf) 5512 5513 def testRecvIntoBytearray(self): 5514 buf = bytearray(1024) 5515 nbytes = self.cli_conn.recv_into(buf) 5516 self.assertEqual(nbytes, len(MSG)) 5517 msg = buf[:len(MSG)] 5518 self.assertEqual(msg, MSG) 5519 5520 _testRecvIntoBytearray = _testRecvIntoArray 5521 5522 def testRecvIntoMemoryview(self): 5523 buf = bytearray(1024) 5524 nbytes = self.cli_conn.recv_into(memoryview(buf)) 5525 self.assertEqual(nbytes, len(MSG)) 5526 msg = buf[:len(MSG)] 5527 self.assertEqual(msg, MSG) 5528 5529 _testRecvIntoMemoryview = _testRecvIntoArray 5530 5531 def testRecvFromIntoArray(self): 5532 buf = array.array("B", [0] * len(MSG)) 5533 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5534 self.assertEqual(nbytes, len(MSG)) 5535 buf = buf.tobytes() 5536 msg = buf[:len(MSG)] 5537 self.assertEqual(msg, MSG) 5538 5539 def _testRecvFromIntoArray(self): 5540 buf = bytes(MSG) 5541 self.serv_conn.send(buf) 5542 5543 def testRecvFromIntoBytearray(self): 5544 buf = bytearray(1024) 5545 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5546 self.assertEqual(nbytes, len(MSG)) 5547 msg = buf[:len(MSG)] 5548 self.assertEqual(msg, MSG) 5549 5550 _testRecvFromIntoBytearray = _testRecvFromIntoArray 5551 5552 def testRecvFromIntoMemoryview(self): 5553 buf = bytearray(1024) 5554 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 5555 self.assertEqual(nbytes, len(MSG)) 5556 msg = buf[:len(MSG)] 5557 self.assertEqual(msg, MSG) 5558 5559 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 5560 5561 def testRecvFromIntoSmallBuffer(self): 5562 # See issue #20246. 5563 buf = bytearray(8) 5564 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 5565 5566 def _testRecvFromIntoSmallBuffer(self): 5567 self.serv_conn.send(MSG) 5568 5569 def testRecvFromIntoEmptyBuffer(self): 5570 buf = bytearray() 5571 self.cli_conn.recvfrom_into(buf) 5572 self.cli_conn.recvfrom_into(buf, 0) 5573 5574 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 5575 5576 5577TIPC_STYPE = 2000 5578TIPC_LOWER = 200 5579TIPC_UPPER = 210 5580 5581def isTipcAvailable(): 5582 """Check if the TIPC module is loaded 5583 5584 The TIPC module is not loaded automatically on Ubuntu and probably 5585 other Linux distros. 5586 """ 5587 if not hasattr(socket, "AF_TIPC"): 5588 return False 5589 try: 5590 f = open("/proc/modules", encoding="utf-8") 5591 except (FileNotFoundError, IsADirectoryError, PermissionError): 5592 # It's ok if the file does not exist, is a directory or if we 5593 # have not the permission to read it. 5594 return False 5595 with f: 5596 for line in f: 5597 if line.startswith("tipc "): 5598 return True 5599 return False 5600 5601@unittest.skipUnless(isTipcAvailable(), 5602 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5603class TIPCTest(unittest.TestCase): 5604 def testRDM(self): 5605 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5606 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5607 self.addCleanup(srv.close) 5608 self.addCleanup(cli.close) 5609 5610 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5611 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5612 TIPC_LOWER, TIPC_UPPER) 5613 srv.bind(srvaddr) 5614 5615 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5616 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5617 cli.sendto(MSG, sendaddr) 5618 5619 msg, recvaddr = srv.recvfrom(1024) 5620 5621 self.assertEqual(cli.getsockname(), recvaddr) 5622 self.assertEqual(msg, MSG) 5623 5624 5625@unittest.skipUnless(isTipcAvailable(), 5626 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5627class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 5628 def __init__(self, methodName = 'runTest'): 5629 unittest.TestCase.__init__(self, methodName = methodName) 5630 ThreadableTest.__init__(self) 5631 5632 def setUp(self): 5633 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5634 self.addCleanup(self.srv.close) 5635 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5636 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5637 TIPC_LOWER, TIPC_UPPER) 5638 self.srv.bind(srvaddr) 5639 self.srv.listen() 5640 self.serverExplicitReady() 5641 self.conn, self.connaddr = self.srv.accept() 5642 self.addCleanup(self.conn.close) 5643 5644 def clientSetUp(self): 5645 # There is a hittable race between serverExplicitReady() and the 5646 # accept() call; sleep a little while to avoid it, otherwise 5647 # we could get an exception 5648 time.sleep(0.1) 5649 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5650 self.addCleanup(self.cli.close) 5651 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5652 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5653 self.cli.connect(addr) 5654 self.cliaddr = self.cli.getsockname() 5655 5656 def testStream(self): 5657 msg = self.conn.recv(1024) 5658 self.assertEqual(msg, MSG) 5659 self.assertEqual(self.cliaddr, self.connaddr) 5660 5661 def _testStream(self): 5662 self.cli.send(MSG) 5663 self.cli.close() 5664 5665 5666class ContextManagersTest(ThreadedTCPSocketTest): 5667 5668 def _testSocketClass(self): 5669 # base test 5670 with socket.socket() as sock: 5671 self.assertFalse(sock._closed) 5672 self.assertTrue(sock._closed) 5673 # close inside with block 5674 with socket.socket() as sock: 5675 sock.close() 5676 self.assertTrue(sock._closed) 5677 # exception inside with block 5678 with socket.socket() as sock: 5679 self.assertRaises(OSError, sock.sendall, b'foo') 5680 self.assertTrue(sock._closed) 5681 5682 def testCreateConnectionBase(self): 5683 conn, addr = self.serv.accept() 5684 self.addCleanup(conn.close) 5685 data = conn.recv(1024) 5686 conn.sendall(data) 5687 5688 def _testCreateConnectionBase(self): 5689 address = self.serv.getsockname() 5690 with socket.create_connection(address) as sock: 5691 self.assertFalse(sock._closed) 5692 sock.sendall(b'foo') 5693 self.assertEqual(sock.recv(1024), b'foo') 5694 self.assertTrue(sock._closed) 5695 5696 def testCreateConnectionClose(self): 5697 conn, addr = self.serv.accept() 5698 self.addCleanup(conn.close) 5699 data = conn.recv(1024) 5700 conn.sendall(data) 5701 5702 def _testCreateConnectionClose(self): 5703 address = self.serv.getsockname() 5704 with socket.create_connection(address) as sock: 5705 sock.close() 5706 self.assertTrue(sock._closed) 5707 self.assertRaises(OSError, sock.sendall, b'foo') 5708 5709 5710class InheritanceTest(unittest.TestCase): 5711 @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), 5712 "SOCK_CLOEXEC not defined") 5713 @support.requires_linux_version(2, 6, 28) 5714 def test_SOCK_CLOEXEC(self): 5715 with socket.socket(socket.AF_INET, 5716 socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: 5717 self.assertEqual(s.type, socket.SOCK_STREAM) 5718 self.assertFalse(s.get_inheritable()) 5719 5720 def test_default_inheritable(self): 5721 sock = socket.socket() 5722 with sock: 5723 self.assertEqual(sock.get_inheritable(), False) 5724 5725 def test_dup(self): 5726 sock = socket.socket() 5727 with sock: 5728 newsock = sock.dup() 5729 sock.close() 5730 with newsock: 5731 self.assertEqual(newsock.get_inheritable(), False) 5732 5733 def test_set_inheritable(self): 5734 sock = socket.socket() 5735 with sock: 5736 sock.set_inheritable(True) 5737 self.assertEqual(sock.get_inheritable(), True) 5738 5739 sock.set_inheritable(False) 5740 self.assertEqual(sock.get_inheritable(), False) 5741 5742 @unittest.skipIf(fcntl is None, "need fcntl") 5743 def test_get_inheritable_cloexec(self): 5744 sock = socket.socket() 5745 with sock: 5746 fd = sock.fileno() 5747 self.assertEqual(sock.get_inheritable(), False) 5748 5749 # clear FD_CLOEXEC flag 5750 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 5751 flags &= ~fcntl.FD_CLOEXEC 5752 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 5753 5754 self.assertEqual(sock.get_inheritable(), True) 5755 5756 @unittest.skipIf(fcntl is None, "need fcntl") 5757 def test_set_inheritable_cloexec(self): 5758 sock = socket.socket() 5759 with sock: 5760 fd = sock.fileno() 5761 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5762 fcntl.FD_CLOEXEC) 5763 5764 sock.set_inheritable(True) 5765 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5766 0) 5767 5768 5769 def test_socketpair(self): 5770 s1, s2 = socket.socketpair() 5771 self.addCleanup(s1.close) 5772 self.addCleanup(s2.close) 5773 self.assertEqual(s1.get_inheritable(), False) 5774 self.assertEqual(s2.get_inheritable(), False) 5775 5776 5777@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), 5778 "SOCK_NONBLOCK not defined") 5779class NonblockConstantTest(unittest.TestCase): 5780 def checkNonblock(self, s, nonblock=True, timeout=0.0): 5781 if nonblock: 5782 self.assertEqual(s.type, socket.SOCK_STREAM) 5783 self.assertEqual(s.gettimeout(), timeout) 5784 self.assertTrue( 5785 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5786 if timeout == 0: 5787 # timeout == 0: means that getblocking() must be False. 5788 self.assertFalse(s.getblocking()) 5789 else: 5790 # If timeout > 0, the socket will be in a "blocking" mode 5791 # from the standpoint of the Python API. For Python socket 5792 # object, "blocking" means that operations like 'sock.recv()' 5793 # will block. Internally, file descriptors for 5794 # "blocking" Python sockets *with timeouts* are in a 5795 # *non-blocking* mode, and 'sock.recv()' uses 'select()' 5796 # and handles EWOULDBLOCK/EAGAIN to enforce the timeout. 5797 self.assertTrue(s.getblocking()) 5798 else: 5799 self.assertEqual(s.type, socket.SOCK_STREAM) 5800 self.assertEqual(s.gettimeout(), None) 5801 self.assertFalse( 5802 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5803 self.assertTrue(s.getblocking()) 5804 5805 @support.requires_linux_version(2, 6, 28) 5806 def test_SOCK_NONBLOCK(self): 5807 # a lot of it seems silly and redundant, but I wanted to test that 5808 # changing back and forth worked ok 5809 with socket.socket(socket.AF_INET, 5810 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: 5811 self.checkNonblock(s) 5812 s.setblocking(True) 5813 self.checkNonblock(s, nonblock=False) 5814 s.setblocking(False) 5815 self.checkNonblock(s) 5816 s.settimeout(None) 5817 self.checkNonblock(s, nonblock=False) 5818 s.settimeout(2.0) 5819 self.checkNonblock(s, timeout=2.0) 5820 s.setblocking(True) 5821 self.checkNonblock(s, nonblock=False) 5822 # defaulttimeout 5823 t = socket.getdefaulttimeout() 5824 socket.setdefaulttimeout(0.0) 5825 with socket.socket() as s: 5826 self.checkNonblock(s) 5827 socket.setdefaulttimeout(None) 5828 with socket.socket() as s: 5829 self.checkNonblock(s, False) 5830 socket.setdefaulttimeout(2.0) 5831 with socket.socket() as s: 5832 self.checkNonblock(s, timeout=2.0) 5833 socket.setdefaulttimeout(None) 5834 with socket.socket() as s: 5835 self.checkNonblock(s, False) 5836 socket.setdefaulttimeout(t) 5837 5838 5839@unittest.skipUnless(os.name == "nt", "Windows specific") 5840@unittest.skipUnless(multiprocessing, "need multiprocessing") 5841class TestSocketSharing(SocketTCPTest): 5842 # This must be classmethod and not staticmethod or multiprocessing 5843 # won't be able to bootstrap it. 5844 @classmethod 5845 def remoteProcessServer(cls, q): 5846 # Recreate socket from shared data 5847 sdata = q.get() 5848 message = q.get() 5849 5850 s = socket.fromshare(sdata) 5851 s2, c = s.accept() 5852 5853 # Send the message 5854 s2.sendall(message) 5855 s2.close() 5856 s.close() 5857 5858 def testShare(self): 5859 # Transfer the listening server socket to another process 5860 # and service it from there. 5861 5862 # Create process: 5863 q = multiprocessing.Queue() 5864 p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) 5865 p.start() 5866 5867 # Get the shared socket data 5868 data = self.serv.share(p.pid) 5869 5870 # Pass the shared socket to the other process 5871 addr = self.serv.getsockname() 5872 self.serv.close() 5873 q.put(data) 5874 5875 # The data that the server will send us 5876 message = b"slapmahfro" 5877 q.put(message) 5878 5879 # Connect 5880 s = socket.create_connection(addr) 5881 # listen for the data 5882 m = [] 5883 while True: 5884 data = s.recv(100) 5885 if not data: 5886 break 5887 m.append(data) 5888 s.close() 5889 received = b"".join(m) 5890 self.assertEqual(received, message) 5891 p.join() 5892 5893 def testShareLength(self): 5894 data = self.serv.share(os.getpid()) 5895 self.assertRaises(ValueError, socket.fromshare, data[:-1]) 5896 self.assertRaises(ValueError, socket.fromshare, data+b"foo") 5897 5898 def compareSockets(self, org, other): 5899 # socket sharing is expected to work only for blocking socket 5900 # since the internal python timeout value isn't transferred. 5901 self.assertEqual(org.gettimeout(), None) 5902 self.assertEqual(org.gettimeout(), other.gettimeout()) 5903 5904 self.assertEqual(org.family, other.family) 5905 self.assertEqual(org.type, other.type) 5906 # If the user specified "0" for proto, then 5907 # internally windows will have picked the correct value. 5908 # Python introspection on the socket however will still return 5909 # 0. For the shared socket, the python value is recreated 5910 # from the actual value, so it may not compare correctly. 5911 if org.proto != 0: 5912 self.assertEqual(org.proto, other.proto) 5913 5914 def testShareLocal(self): 5915 data = self.serv.share(os.getpid()) 5916 s = socket.fromshare(data) 5917 try: 5918 self.compareSockets(self.serv, s) 5919 finally: 5920 s.close() 5921 5922 def testTypes(self): 5923 families = [socket.AF_INET, socket.AF_INET6] 5924 types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] 5925 for f in families: 5926 for t in types: 5927 try: 5928 source = socket.socket(f, t) 5929 except OSError: 5930 continue # This combination is not supported 5931 try: 5932 data = source.share(os.getpid()) 5933 shared = socket.fromshare(data) 5934 try: 5935 self.compareSockets(source, shared) 5936 finally: 5937 shared.close() 5938 finally: 5939 source.close() 5940 5941 5942class SendfileUsingSendTest(ThreadedTCPSocketTest): 5943 """ 5944 Test the send() implementation of socket.sendfile(). 5945 """ 5946 5947 FILESIZE = (10 * 1024 * 1024) # 10 MiB 5948 BUFSIZE = 8192 5949 FILEDATA = b"" 5950 TIMEOUT = support.LOOPBACK_TIMEOUT 5951 5952 @classmethod 5953 def setUpClass(cls): 5954 def chunks(total, step): 5955 assert total >= step 5956 while total > step: 5957 yield step 5958 total -= step 5959 if total: 5960 yield total 5961 5962 chunk = b"".join([random.choice(string.ascii_letters).encode() 5963 for i in range(cls.BUFSIZE)]) 5964 with open(os_helper.TESTFN, 'wb') as f: 5965 for csize in chunks(cls.FILESIZE, cls.BUFSIZE): 5966 f.write(chunk) 5967 with open(os_helper.TESTFN, 'rb') as f: 5968 cls.FILEDATA = f.read() 5969 assert len(cls.FILEDATA) == cls.FILESIZE 5970 5971 @classmethod 5972 def tearDownClass(cls): 5973 os_helper.unlink(os_helper.TESTFN) 5974 5975 def accept_conn(self): 5976 self.serv.settimeout(support.LONG_TIMEOUT) 5977 conn, addr = self.serv.accept() 5978 conn.settimeout(self.TIMEOUT) 5979 self.addCleanup(conn.close) 5980 return conn 5981 5982 def recv_data(self, conn): 5983 received = [] 5984 while True: 5985 chunk = conn.recv(self.BUFSIZE) 5986 if not chunk: 5987 break 5988 received.append(chunk) 5989 return b''.join(received) 5990 5991 def meth_from_sock(self, sock): 5992 # Depending on the mixin class being run return either send() 5993 # or sendfile() method implementation. 5994 return getattr(sock, "_sendfile_use_send") 5995 5996 # regular file 5997 5998 def _testRegularFile(self): 5999 address = self.serv.getsockname() 6000 file = open(os_helper.TESTFN, 'rb') 6001 with socket.create_connection(address) as sock, file as file: 6002 meth = self.meth_from_sock(sock) 6003 sent = meth(file) 6004 self.assertEqual(sent, self.FILESIZE) 6005 self.assertEqual(file.tell(), self.FILESIZE) 6006 6007 def testRegularFile(self): 6008 conn = self.accept_conn() 6009 data = self.recv_data(conn) 6010 self.assertEqual(len(data), self.FILESIZE) 6011 self.assertEqual(data, self.FILEDATA) 6012 6013 # non regular file 6014 6015 def _testNonRegularFile(self): 6016 address = self.serv.getsockname() 6017 file = io.BytesIO(self.FILEDATA) 6018 with socket.create_connection(address) as sock, file as file: 6019 sent = sock.sendfile(file) 6020 self.assertEqual(sent, self.FILESIZE) 6021 self.assertEqual(file.tell(), self.FILESIZE) 6022 self.assertRaises(socket._GiveupOnSendfile, 6023 sock._sendfile_use_sendfile, file) 6024 6025 def testNonRegularFile(self): 6026 conn = self.accept_conn() 6027 data = self.recv_data(conn) 6028 self.assertEqual(len(data), self.FILESIZE) 6029 self.assertEqual(data, self.FILEDATA) 6030 6031 # empty file 6032 6033 def _testEmptyFileSend(self): 6034 address = self.serv.getsockname() 6035 filename = os_helper.TESTFN + "2" 6036 with open(filename, 'wb'): 6037 self.addCleanup(os_helper.unlink, filename) 6038 file = open(filename, 'rb') 6039 with socket.create_connection(address) as sock, file as file: 6040 meth = self.meth_from_sock(sock) 6041 sent = meth(file) 6042 self.assertEqual(sent, 0) 6043 self.assertEqual(file.tell(), 0) 6044 6045 def testEmptyFileSend(self): 6046 conn = self.accept_conn() 6047 data = self.recv_data(conn) 6048 self.assertEqual(data, b"") 6049 6050 # offset 6051 6052 def _testOffset(self): 6053 address = self.serv.getsockname() 6054 file = open(os_helper.TESTFN, 'rb') 6055 with socket.create_connection(address) as sock, file as file: 6056 meth = self.meth_from_sock(sock) 6057 sent = meth(file, offset=5000) 6058 self.assertEqual(sent, self.FILESIZE - 5000) 6059 self.assertEqual(file.tell(), self.FILESIZE) 6060 6061 def testOffset(self): 6062 conn = self.accept_conn() 6063 data = self.recv_data(conn) 6064 self.assertEqual(len(data), self.FILESIZE - 5000) 6065 self.assertEqual(data, self.FILEDATA[5000:]) 6066 6067 # count 6068 6069 def _testCount(self): 6070 address = self.serv.getsockname() 6071 file = open(os_helper.TESTFN, 'rb') 6072 sock = socket.create_connection(address, 6073 timeout=support.LOOPBACK_TIMEOUT) 6074 with sock, file: 6075 count = 5000007 6076 meth = self.meth_from_sock(sock) 6077 sent = meth(file, count=count) 6078 self.assertEqual(sent, count) 6079 self.assertEqual(file.tell(), count) 6080 6081 def testCount(self): 6082 count = 5000007 6083 conn = self.accept_conn() 6084 data = self.recv_data(conn) 6085 self.assertEqual(len(data), count) 6086 self.assertEqual(data, self.FILEDATA[:count]) 6087 6088 # count small 6089 6090 def _testCountSmall(self): 6091 address = self.serv.getsockname() 6092 file = open(os_helper.TESTFN, 'rb') 6093 sock = socket.create_connection(address, 6094 timeout=support.LOOPBACK_TIMEOUT) 6095 with sock, file: 6096 count = 1 6097 meth = self.meth_from_sock(sock) 6098 sent = meth(file, count=count) 6099 self.assertEqual(sent, count) 6100 self.assertEqual(file.tell(), count) 6101 6102 def testCountSmall(self): 6103 count = 1 6104 conn = self.accept_conn() 6105 data = self.recv_data(conn) 6106 self.assertEqual(len(data), count) 6107 self.assertEqual(data, self.FILEDATA[:count]) 6108 6109 # count + offset 6110 6111 def _testCountWithOffset(self): 6112 address = self.serv.getsockname() 6113 file = open(os_helper.TESTFN, 'rb') 6114 with socket.create_connection(address, timeout=2) as sock, file as file: 6115 count = 100007 6116 meth = self.meth_from_sock(sock) 6117 sent = meth(file, offset=2007, count=count) 6118 self.assertEqual(sent, count) 6119 self.assertEqual(file.tell(), count + 2007) 6120 6121 def testCountWithOffset(self): 6122 count = 100007 6123 conn = self.accept_conn() 6124 data = self.recv_data(conn) 6125 self.assertEqual(len(data), count) 6126 self.assertEqual(data, self.FILEDATA[2007:count+2007]) 6127 6128 # non blocking sockets are not supposed to work 6129 6130 def _testNonBlocking(self): 6131 address = self.serv.getsockname() 6132 file = open(os_helper.TESTFN, 'rb') 6133 with socket.create_connection(address) as sock, file as file: 6134 sock.setblocking(False) 6135 meth = self.meth_from_sock(sock) 6136 self.assertRaises(ValueError, meth, file) 6137 self.assertRaises(ValueError, sock.sendfile, file) 6138 6139 def testNonBlocking(self): 6140 conn = self.accept_conn() 6141 if conn.recv(8192): 6142 self.fail('was not supposed to receive any data') 6143 6144 # timeout (non-triggered) 6145 6146 def _testWithTimeout(self): 6147 address = self.serv.getsockname() 6148 file = open(os_helper.TESTFN, 'rb') 6149 sock = socket.create_connection(address, 6150 timeout=support.LOOPBACK_TIMEOUT) 6151 with sock, file: 6152 meth = self.meth_from_sock(sock) 6153 sent = meth(file) 6154 self.assertEqual(sent, self.FILESIZE) 6155 6156 def testWithTimeout(self): 6157 conn = self.accept_conn() 6158 data = self.recv_data(conn) 6159 self.assertEqual(len(data), self.FILESIZE) 6160 self.assertEqual(data, self.FILEDATA) 6161 6162 # timeout (triggered) 6163 6164 def _testWithTimeoutTriggeredSend(self): 6165 address = self.serv.getsockname() 6166 with open(os_helper.TESTFN, 'rb') as file: 6167 with socket.create_connection(address) as sock: 6168 sock.settimeout(0.01) 6169 meth = self.meth_from_sock(sock) 6170 self.assertRaises(TimeoutError, meth, file) 6171 6172 def testWithTimeoutTriggeredSend(self): 6173 conn = self.accept_conn() 6174 conn.recv(88192) 6175 time.sleep(1) 6176 6177 # errors 6178 6179 def _test_errors(self): 6180 pass 6181 6182 def test_errors(self): 6183 with open(os_helper.TESTFN, 'rb') as file: 6184 with socket.socket(type=socket.SOCK_DGRAM) as s: 6185 meth = self.meth_from_sock(s) 6186 self.assertRaisesRegex( 6187 ValueError, "SOCK_STREAM", meth, file) 6188 with open(os_helper.TESTFN, encoding="utf-8") as file: 6189 with socket.socket() as s: 6190 meth = self.meth_from_sock(s) 6191 self.assertRaisesRegex( 6192 ValueError, "binary mode", meth, file) 6193 with open(os_helper.TESTFN, 'rb') as file: 6194 with socket.socket() as s: 6195 meth = self.meth_from_sock(s) 6196 self.assertRaisesRegex(TypeError, "positive integer", 6197 meth, file, count='2') 6198 self.assertRaisesRegex(TypeError, "positive integer", 6199 meth, file, count=0.1) 6200 self.assertRaisesRegex(ValueError, "positive integer", 6201 meth, file, count=0) 6202 self.assertRaisesRegex(ValueError, "positive integer", 6203 meth, file, count=-1) 6204 6205 6206@unittest.skipUnless(hasattr(os, "sendfile"), 6207 'os.sendfile() required for this test.') 6208class SendfileUsingSendfileTest(SendfileUsingSendTest): 6209 """ 6210 Test the sendfile() implementation of socket.sendfile(). 6211 """ 6212 def meth_from_sock(self, sock): 6213 return getattr(sock, "_sendfile_use_sendfile") 6214 6215 6216@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') 6217class LinuxKernelCryptoAPI(unittest.TestCase): 6218 # tests for AF_ALG 6219 def create_alg(self, typ, name): 6220 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6221 try: 6222 sock.bind((typ, name)) 6223 except FileNotFoundError as e: 6224 # type / algorithm is not available 6225 sock.close() 6226 raise unittest.SkipTest(str(e), typ, name) 6227 else: 6228 return sock 6229 6230 # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY, 6231 # at least on ppc64le architecture 6232 @support.requires_linux_version(4, 5) 6233 def test_sha256(self): 6234 expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" 6235 "177a9cb410ff61f20015ad") 6236 with self.create_alg('hash', 'sha256') as algo: 6237 op, _ = algo.accept() 6238 with op: 6239 op.sendall(b"abc") 6240 self.assertEqual(op.recv(512), expected) 6241 6242 op, _ = algo.accept() 6243 with op: 6244 op.send(b'a', socket.MSG_MORE) 6245 op.send(b'b', socket.MSG_MORE) 6246 op.send(b'c', socket.MSG_MORE) 6247 op.send(b'') 6248 self.assertEqual(op.recv(512), expected) 6249 6250 def test_hmac_sha1(self): 6251 expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") 6252 with self.create_alg('hash', 'hmac(sha1)') as algo: 6253 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") 6254 op, _ = algo.accept() 6255 with op: 6256 op.sendall(b"what do ya want for nothing?") 6257 self.assertEqual(op.recv(512), expected) 6258 6259 # Although it should work with 3.19 and newer the test blocks on 6260 # Ubuntu 15.10 with Kernel 4.2.0-19. 6261 @support.requires_linux_version(4, 3) 6262 def test_aes_cbc(self): 6263 key = bytes.fromhex('06a9214036b8a15b512e03d534120006') 6264 iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') 6265 msg = b"Single block msg" 6266 ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') 6267 msglen = len(msg) 6268 with self.create_alg('skcipher', 'cbc(aes)') as algo: 6269 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6270 op, _ = algo.accept() 6271 with op: 6272 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6273 flags=socket.MSG_MORE) 6274 op.sendall(msg) 6275 self.assertEqual(op.recv(msglen), ciphertext) 6276 6277 op, _ = algo.accept() 6278 with op: 6279 op.sendmsg_afalg([ciphertext], 6280 op=socket.ALG_OP_DECRYPT, iv=iv) 6281 self.assertEqual(op.recv(msglen), msg) 6282 6283 # long message 6284 multiplier = 1024 6285 longmsg = [msg] * multiplier 6286 op, _ = algo.accept() 6287 with op: 6288 op.sendmsg_afalg(longmsg, 6289 op=socket.ALG_OP_ENCRYPT, iv=iv) 6290 enc = op.recv(msglen * multiplier) 6291 self.assertEqual(len(enc), msglen * multiplier) 6292 self.assertEqual(enc[:msglen], ciphertext) 6293 6294 op, _ = algo.accept() 6295 with op: 6296 op.sendmsg_afalg([enc], 6297 op=socket.ALG_OP_DECRYPT, iv=iv) 6298 dec = op.recv(msglen * multiplier) 6299 self.assertEqual(len(dec), msglen * multiplier) 6300 self.assertEqual(dec, msg * multiplier) 6301 6302 @support.requires_linux_version(4, 9) # see issue29324 6303 def test_aead_aes_gcm(self): 6304 key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') 6305 iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') 6306 plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') 6307 assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') 6308 expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') 6309 expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') 6310 6311 taglen = len(expected_tag) 6312 assoclen = len(assoc) 6313 6314 with self.create_alg('aead', 'gcm(aes)') as algo: 6315 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6316 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, 6317 None, taglen) 6318 6319 # send assoc, plain and tag buffer in separate steps 6320 op, _ = algo.accept() 6321 with op: 6322 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6323 assoclen=assoclen, flags=socket.MSG_MORE) 6324 op.sendall(assoc, socket.MSG_MORE) 6325 op.sendall(plain) 6326 res = op.recv(assoclen + len(plain) + taglen) 6327 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6328 self.assertEqual(expected_tag, res[-taglen:]) 6329 6330 # now with msg 6331 op, _ = algo.accept() 6332 with op: 6333 msg = assoc + plain 6334 op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, 6335 assoclen=assoclen) 6336 res = op.recv(assoclen + len(plain) + taglen) 6337 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6338 self.assertEqual(expected_tag, res[-taglen:]) 6339 6340 # create anc data manually 6341 pack_uint32 = struct.Struct('I').pack 6342 op, _ = algo.accept() 6343 with op: 6344 msg = assoc + plain 6345 op.sendmsg( 6346 [msg], 6347 ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], 6348 [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], 6349 [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], 6350 ) 6351 ) 6352 res = op.recv(len(msg) + taglen) 6353 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6354 self.assertEqual(expected_tag, res[-taglen:]) 6355 6356 # decrypt and verify 6357 op, _ = algo.accept() 6358 with op: 6359 msg = assoc + expected_ct + expected_tag 6360 op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, 6361 assoclen=assoclen) 6362 res = op.recv(len(msg) - taglen) 6363 self.assertEqual(plain, res[assoclen:]) 6364 6365 @support.requires_linux_version(4, 3) # see test_aes_cbc 6366 def test_drbg_pr_sha256(self): 6367 # deterministic random bit generator, prediction resistance, sha256 6368 with self.create_alg('rng', 'drbg_pr_sha256') as algo: 6369 extra_seed = os.urandom(32) 6370 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) 6371 op, _ = algo.accept() 6372 with op: 6373 rn = op.recv(32) 6374 self.assertEqual(len(rn), 32) 6375 6376 def test_sendmsg_afalg_args(self): 6377 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6378 with sock: 6379 with self.assertRaises(TypeError): 6380 sock.sendmsg_afalg() 6381 6382 with self.assertRaises(TypeError): 6383 sock.sendmsg_afalg(op=None) 6384 6385 with self.assertRaises(TypeError): 6386 sock.sendmsg_afalg(1) 6387 6388 with self.assertRaises(TypeError): 6389 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) 6390 6391 with self.assertRaises(TypeError): 6392 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) 6393 6394 def test_length_restriction(self): 6395 # bpo-35050, off-by-one error in length check 6396 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6397 self.addCleanup(sock.close) 6398 6399 # salg_type[14] 6400 with self.assertRaises(FileNotFoundError): 6401 sock.bind(("t" * 13, "name")) 6402 with self.assertRaisesRegex(ValueError, "type too long"): 6403 sock.bind(("t" * 14, "name")) 6404 6405 # salg_name[64] 6406 with self.assertRaises(FileNotFoundError): 6407 sock.bind(("type", "n" * 63)) 6408 with self.assertRaisesRegex(ValueError, "name too long"): 6409 sock.bind(("type", "n" * 64)) 6410 6411 6412@unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') 6413class TestMacOSTCPFlags(unittest.TestCase): 6414 def test_tcp_keepalive(self): 6415 self.assertTrue(socket.TCP_KEEPALIVE) 6416 6417 6418@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") 6419class TestMSWindowsTCPFlags(unittest.TestCase): 6420 knownTCPFlags = { 6421 # available since long time ago 6422 'TCP_MAXSEG', 6423 'TCP_NODELAY', 6424 # available starting with Windows 10 1607 6425 'TCP_FASTOPEN', 6426 # available starting with Windows 10 1703 6427 'TCP_KEEPCNT', 6428 # available starting with Windows 10 1709 6429 'TCP_KEEPIDLE', 6430 'TCP_KEEPINTVL' 6431 } 6432 6433 def test_new_tcp_flags(self): 6434 provided = [s for s in dir(socket) if s.startswith('TCP')] 6435 unknown = [s for s in provided if s not in self.knownTCPFlags] 6436 6437 self.assertEqual([], unknown, 6438 "New TCP flags were discovered. See bpo-32394 for more information") 6439 6440 6441class CreateServerTest(unittest.TestCase): 6442 6443 def test_address(self): 6444 port = socket_helper.find_unused_port() 6445 with socket.create_server(("127.0.0.1", port)) as sock: 6446 self.assertEqual(sock.getsockname()[0], "127.0.0.1") 6447 self.assertEqual(sock.getsockname()[1], port) 6448 if socket_helper.IPV6_ENABLED: 6449 with socket.create_server(("::1", port), 6450 family=socket.AF_INET6) as sock: 6451 self.assertEqual(sock.getsockname()[0], "::1") 6452 self.assertEqual(sock.getsockname()[1], port) 6453 6454 def test_family_and_type(self): 6455 with socket.create_server(("127.0.0.1", 0)) as sock: 6456 self.assertEqual(sock.family, socket.AF_INET) 6457 self.assertEqual(sock.type, socket.SOCK_STREAM) 6458 if socket_helper.IPV6_ENABLED: 6459 with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: 6460 self.assertEqual(s.family, socket.AF_INET6) 6461 self.assertEqual(sock.type, socket.SOCK_STREAM) 6462 6463 def test_reuse_port(self): 6464 if not hasattr(socket, "SO_REUSEPORT"): 6465 with self.assertRaises(ValueError): 6466 socket.create_server(("localhost", 0), reuse_port=True) 6467 else: 6468 with socket.create_server(("localhost", 0)) as sock: 6469 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6470 self.assertEqual(opt, 0) 6471 with socket.create_server(("localhost", 0), reuse_port=True) as sock: 6472 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6473 self.assertNotEqual(opt, 0) 6474 6475 @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or 6476 not hasattr(_socket, 'IPV6_V6ONLY'), 6477 "IPV6_V6ONLY option not supported") 6478 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6479 def test_ipv6_only_default(self): 6480 with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: 6481 assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) 6482 6483 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6484 "dualstack_ipv6 not supported") 6485 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6486 def test_dualstack_ipv6_family(self): 6487 with socket.create_server(("::1", 0), family=socket.AF_INET6, 6488 dualstack_ipv6=True) as sock: 6489 self.assertEqual(sock.family, socket.AF_INET6) 6490 6491 6492class CreateServerFunctionalTest(unittest.TestCase): 6493 timeout = support.LOOPBACK_TIMEOUT 6494 6495 def echo_server(self, sock): 6496 def run(sock): 6497 with sock: 6498 conn, _ = sock.accept() 6499 with conn: 6500 event.wait(self.timeout) 6501 msg = conn.recv(1024) 6502 if not msg: 6503 return 6504 conn.sendall(msg) 6505 6506 event = threading.Event() 6507 sock.settimeout(self.timeout) 6508 thread = threading.Thread(target=run, args=(sock, )) 6509 thread.start() 6510 self.addCleanup(thread.join, self.timeout) 6511 event.set() 6512 6513 def echo_client(self, addr, family): 6514 with socket.socket(family=family) as sock: 6515 sock.settimeout(self.timeout) 6516 sock.connect(addr) 6517 sock.sendall(b'foo') 6518 self.assertEqual(sock.recv(1024), b'foo') 6519 6520 def test_tcp4(self): 6521 port = socket_helper.find_unused_port() 6522 with socket.create_server(("", port)) as sock: 6523 self.echo_server(sock) 6524 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6525 6526 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6527 def test_tcp6(self): 6528 port = socket_helper.find_unused_port() 6529 with socket.create_server(("", port), 6530 family=socket.AF_INET6) as sock: 6531 self.echo_server(sock) 6532 self.echo_client(("::1", port), socket.AF_INET6) 6533 6534 # --- dual stack tests 6535 6536 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6537 "dualstack_ipv6 not supported") 6538 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6539 def test_dual_stack_client_v4(self): 6540 port = socket_helper.find_unused_port() 6541 with socket.create_server(("", port), family=socket.AF_INET6, 6542 dualstack_ipv6=True) as sock: 6543 self.echo_server(sock) 6544 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6545 6546 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6547 "dualstack_ipv6 not supported") 6548 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6549 def test_dual_stack_client_v6(self): 6550 port = socket_helper.find_unused_port() 6551 with socket.create_server(("", port), family=socket.AF_INET6, 6552 dualstack_ipv6=True) as sock: 6553 self.echo_server(sock) 6554 self.echo_client(("::1", port), socket.AF_INET6) 6555 6556@requireAttrs(socket, "send_fds") 6557@requireAttrs(socket, "recv_fds") 6558@requireAttrs(socket, "AF_UNIX") 6559class SendRecvFdsTests(unittest.TestCase): 6560 def testSendAndRecvFds(self): 6561 def close_pipes(pipes): 6562 for fd1, fd2 in pipes: 6563 os.close(fd1) 6564 os.close(fd2) 6565 6566 def close_fds(fds): 6567 for fd in fds: 6568 os.close(fd) 6569 6570 # send 10 file descriptors 6571 pipes = [os.pipe() for _ in range(10)] 6572 self.addCleanup(close_pipes, pipes) 6573 fds = [rfd for rfd, wfd in pipes] 6574 6575 # use a UNIX socket pair to exchange file descriptors locally 6576 sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) 6577 with sock1, sock2: 6578 socket.send_fds(sock1, [MSG], fds) 6579 # request more data and file descriptors than expected 6580 msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2) 6581 self.addCleanup(close_fds, fds2) 6582 6583 self.assertEqual(msg, MSG) 6584 self.assertEqual(len(fds2), len(fds)) 6585 self.assertEqual(flags, 0) 6586 # don't test addr 6587 6588 # test that file descriptors are connected 6589 for index, fds in enumerate(pipes): 6590 rfd, wfd = fds 6591 os.write(wfd, str(index).encode()) 6592 6593 for index, rfd in enumerate(fds2): 6594 data = os.read(rfd, 100) 6595 self.assertEqual(data, str(index).encode()) 6596 6597 6598def setUpModule(): 6599 thread_info = threading_helper.threading_setup() 6600 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 6601 6602 6603if __name__ == "__main__": 6604 unittest.main() 6605