1import unittest 2from test import support 3 4import errno 5import io 6import itertools 7import socket 8import select 9import tempfile 10import time 11import traceback 12import queue 13import sys 14import os 15import array 16import contextlib 17from weakref import proxy 18import signal 19import math 20import pickle 21import struct 22import random 23import string 24try: 25 import multiprocessing 26except ImportError: 27 multiprocessing = False 28try: 29 import fcntl 30except ImportError: 31 fcntl = None 32 33HOST = support.HOST 34MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return 35 36try: 37 import _thread as thread 38 import threading 39except ImportError: 40 thread = None 41 threading = None 42try: 43 import _socket 44except ImportError: 45 _socket = None 46 47 48def _have_socket_can(): 49 """Check whether CAN sockets are supported on this host.""" 50 try: 51 s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 52 except (AttributeError, OSError): 53 return False 54 else: 55 s.close() 56 return True 57 58def _have_socket_rds(): 59 """Check whether RDS sockets are supported on this host.""" 60 try: 61 s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 62 except (AttributeError, OSError): 63 return False 64 else: 65 s.close() 66 return True 67 68def _have_socket_alg(): 69 """Check whether AF_ALG sockets are supported on this host.""" 70 try: 71 s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 72 except (AttributeError, OSError): 73 return False 74 else: 75 s.close() 76 return True 77 78HAVE_SOCKET_CAN = _have_socket_can() 79 80HAVE_SOCKET_RDS = _have_socket_rds() 81 82HAVE_SOCKET_ALG = _have_socket_alg() 83 84# Size in bytes of the int type 85SIZEOF_INT = array.array("i").itemsize 86 87class SocketTCPTest(unittest.TestCase): 88 89 def setUp(self): 90 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 91 self.port = support.bind_port(self.serv) 92 self.serv.listen() 93 94 def tearDown(self): 95 self.serv.close() 96 self.serv = None 97 98class SocketUDPTest(unittest.TestCase): 99 100 def setUp(self): 101 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 102 self.port = support.bind_port(self.serv) 103 104 def tearDown(self): 105 self.serv.close() 106 self.serv = None 107 108class ThreadSafeCleanupTestCase(unittest.TestCase): 109 """Subclass of unittest.TestCase with thread-safe cleanup methods. 110 111 This subclass protects the addCleanup() and doCleanups() methods 112 with a recursive lock. 113 """ 114 115 if threading: 116 def __init__(self, *args, **kwargs): 117 super().__init__(*args, **kwargs) 118 self._cleanup_lock = threading.RLock() 119 120 def addCleanup(self, *args, **kwargs): 121 with self._cleanup_lock: 122 return super().addCleanup(*args, **kwargs) 123 124 def doCleanups(self, *args, **kwargs): 125 with self._cleanup_lock: 126 return super().doCleanups(*args, **kwargs) 127 128class SocketCANTest(unittest.TestCase): 129 130 """To be able to run this test, a `vcan0` CAN interface can be created with 131 the following commands: 132 # modprobe vcan 133 # ip link add dev vcan0 type vcan 134 # ifconfig vcan0 up 135 """ 136 interface = 'vcan0' 137 bufsize = 128 138 139 """The CAN frame structure is defined in <linux/can.h>: 140 141 struct can_frame { 142 canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ 143 __u8 can_dlc; /* data length code: 0 .. 8 */ 144 __u8 data[8] __attribute__((aligned(8))); 145 }; 146 """ 147 can_frame_fmt = "=IB3x8s" 148 can_frame_size = struct.calcsize(can_frame_fmt) 149 150 """The Broadcast Management Command frame structure is defined 151 in <linux/can/bcm.h>: 152 153 struct bcm_msg_head { 154 __u32 opcode; 155 __u32 flags; 156 __u32 count; 157 struct timeval ival1, ival2; 158 canid_t can_id; 159 __u32 nframes; 160 struct can_frame frames[0]; 161 } 162 163 `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see 164 `struct can_frame` definition). Must use native not standard types for packing. 165 """ 166 bcm_cmd_msg_fmt = "@3I4l2I" 167 bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) 168 169 def setUp(self): 170 self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 171 self.addCleanup(self.s.close) 172 try: 173 self.s.bind((self.interface,)) 174 except OSError: 175 self.skipTest('network interface `%s` does not exist' % 176 self.interface) 177 178 179class SocketRDSTest(unittest.TestCase): 180 181 """To be able to run this test, the `rds` kernel module must be loaded: 182 # modprobe rds 183 """ 184 bufsize = 8192 185 186 def setUp(self): 187 self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 188 self.addCleanup(self.serv.close) 189 try: 190 self.port = support.bind_port(self.serv) 191 except OSError: 192 self.skipTest('unable to bind RDS socket') 193 194 195class ThreadableTest: 196 """Threadable Test class 197 198 The ThreadableTest class makes it easy to create a threaded 199 client/server pair from an existing unit test. To create a 200 new threaded class from an existing unit test, use multiple 201 inheritance: 202 203 class NewClass (OldClass, ThreadableTest): 204 pass 205 206 This class defines two new fixture functions with obvious 207 purposes for overriding: 208 209 clientSetUp () 210 clientTearDown () 211 212 Any new test functions within the class must then define 213 tests in pairs, where the test name is preceded with a 214 '_' to indicate the client portion of the test. Ex: 215 216 def testFoo(self): 217 # Server portion 218 219 def _testFoo(self): 220 # Client portion 221 222 Any exceptions raised by the clients during their tests 223 are caught and transferred to the main thread to alert 224 the testing framework. 225 226 Note, the server setup function cannot call any blocking 227 functions that rely on the client thread during setup, 228 unless serverExplicitReady() is called just before 229 the blocking call (such as in setting up a client/server 230 connection and performing the accept() in setUp(). 231 """ 232 233 def __init__(self): 234 # Swap the true setup function 235 self.__setUp = self.setUp 236 self.__tearDown = self.tearDown 237 self.setUp = self._setUp 238 self.tearDown = self._tearDown 239 240 def serverExplicitReady(self): 241 """This method allows the server to explicitly indicate that 242 it wants the client thread to proceed. This is useful if the 243 server is about to execute a blocking routine that is 244 dependent upon the client thread during its setup routine.""" 245 self.server_ready.set() 246 247 def _setUp(self): 248 self.server_ready = threading.Event() 249 self.client_ready = threading.Event() 250 self.done = threading.Event() 251 self.queue = queue.Queue(1) 252 self.server_crashed = False 253 254 # Do some munging to start the client test. 255 methodname = self.id() 256 i = methodname.rfind('.') 257 methodname = methodname[i+1:] 258 test_method = getattr(self, '_' + methodname) 259 self.client_thread = thread.start_new_thread( 260 self.clientRun, (test_method,)) 261 262 try: 263 self.__setUp() 264 except: 265 self.server_crashed = True 266 raise 267 finally: 268 self.server_ready.set() 269 self.client_ready.wait() 270 271 def _tearDown(self): 272 self.__tearDown() 273 self.done.wait() 274 275 if self.queue.qsize(): 276 exc = self.queue.get() 277 raise exc 278 279 def clientRun(self, test_func): 280 self.server_ready.wait() 281 try: 282 self.clientSetUp() 283 except BaseException as e: 284 self.queue.put(e) 285 self.clientTearDown() 286 return 287 finally: 288 self.client_ready.set() 289 if self.server_crashed: 290 self.clientTearDown() 291 return 292 if not hasattr(test_func, '__call__'): 293 raise TypeError("test_func must be a callable function") 294 try: 295 test_func() 296 except BaseException as e: 297 self.queue.put(e) 298 finally: 299 self.clientTearDown() 300 301 def clientSetUp(self): 302 raise NotImplementedError("clientSetUp must be implemented.") 303 304 def clientTearDown(self): 305 self.done.set() 306 thread.exit() 307 308class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 309 310 def __init__(self, methodName='runTest'): 311 SocketTCPTest.__init__(self, methodName=methodName) 312 ThreadableTest.__init__(self) 313 314 def clientSetUp(self): 315 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 316 317 def clientTearDown(self): 318 self.cli.close() 319 self.cli = None 320 ThreadableTest.clientTearDown(self) 321 322class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 323 324 def __init__(self, methodName='runTest'): 325 SocketUDPTest.__init__(self, methodName=methodName) 326 ThreadableTest.__init__(self) 327 328 def clientSetUp(self): 329 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 330 331 def clientTearDown(self): 332 self.cli.close() 333 self.cli = None 334 ThreadableTest.clientTearDown(self) 335 336class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): 337 338 def __init__(self, methodName='runTest'): 339 SocketCANTest.__init__(self, methodName=methodName) 340 ThreadableTest.__init__(self) 341 342 def clientSetUp(self): 343 self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 344 try: 345 self.cli.bind((self.interface,)) 346 except OSError: 347 # skipTest should not be called here, and will be called in the 348 # server instead 349 pass 350 351 def clientTearDown(self): 352 self.cli.close() 353 self.cli = None 354 ThreadableTest.clientTearDown(self) 355 356class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): 357 358 def __init__(self, methodName='runTest'): 359 SocketRDSTest.__init__(self, methodName=methodName) 360 ThreadableTest.__init__(self) 361 362 def clientSetUp(self): 363 self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 364 try: 365 # RDS sockets must be bound explicitly to send or receive data 366 self.cli.bind((HOST, 0)) 367 self.cli_addr = self.cli.getsockname() 368 except OSError: 369 # skipTest should not be called here, and will be called in the 370 # server instead 371 pass 372 373 def clientTearDown(self): 374 self.cli.close() 375 self.cli = None 376 ThreadableTest.clientTearDown(self) 377 378class SocketConnectedTest(ThreadedTCPSocketTest): 379 """Socket tests for client-server connection. 380 381 self.cli_conn is a client socket connected to the server. The 382 setUp() method guarantees that it is connected to the server. 383 """ 384 385 def __init__(self, methodName='runTest'): 386 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 387 388 def setUp(self): 389 ThreadedTCPSocketTest.setUp(self) 390 # Indicate explicitly we're ready for the client thread to 391 # proceed and then perform the blocking call to accept 392 self.serverExplicitReady() 393 conn, addr = self.serv.accept() 394 self.cli_conn = conn 395 396 def tearDown(self): 397 self.cli_conn.close() 398 self.cli_conn = None 399 ThreadedTCPSocketTest.tearDown(self) 400 401 def clientSetUp(self): 402 ThreadedTCPSocketTest.clientSetUp(self) 403 self.cli.connect((HOST, self.port)) 404 self.serv_conn = self.cli 405 406 def clientTearDown(self): 407 self.serv_conn.close() 408 self.serv_conn = None 409 ThreadedTCPSocketTest.clientTearDown(self) 410 411class SocketPairTest(unittest.TestCase, ThreadableTest): 412 413 def __init__(self, methodName='runTest'): 414 unittest.TestCase.__init__(self, methodName=methodName) 415 ThreadableTest.__init__(self) 416 417 def setUp(self): 418 self.serv, self.cli = socket.socketpair() 419 420 def tearDown(self): 421 self.serv.close() 422 self.serv = None 423 424 def clientSetUp(self): 425 pass 426 427 def clientTearDown(self): 428 self.cli.close() 429 self.cli = None 430 ThreadableTest.clientTearDown(self) 431 432 433# The following classes are used by the sendmsg()/recvmsg() tests. 434# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase 435# gives a drop-in replacement for SocketConnectedTest, but different 436# address families can be used, and the attributes serv_addr and 437# cli_addr will be set to the addresses of the endpoints. 438 439class SocketTestBase(unittest.TestCase): 440 """A base class for socket tests. 441 442 Subclasses must provide methods newSocket() to return a new socket 443 and bindSock(sock) to bind it to an unused address. 444 445 Creates a socket self.serv and sets self.serv_addr to its address. 446 """ 447 448 def setUp(self): 449 self.serv = self.newSocket() 450 self.bindServer() 451 452 def bindServer(self): 453 """Bind server socket and set self.serv_addr to its address.""" 454 self.bindSock(self.serv) 455 self.serv_addr = self.serv.getsockname() 456 457 def tearDown(self): 458 self.serv.close() 459 self.serv = None 460 461 462class SocketListeningTestMixin(SocketTestBase): 463 """Mixin to listen on the server socket.""" 464 465 def setUp(self): 466 super().setUp() 467 self.serv.listen() 468 469 470class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, 471 ThreadableTest): 472 """Mixin to add client socket and allow client/server tests. 473 474 Client socket is self.cli and its address is self.cli_addr. See 475 ThreadableTest for usage information. 476 """ 477 478 def __init__(self, *args, **kwargs): 479 super().__init__(*args, **kwargs) 480 ThreadableTest.__init__(self) 481 482 def clientSetUp(self): 483 self.cli = self.newClientSocket() 484 self.bindClient() 485 486 def newClientSocket(self): 487 """Return a new socket for use as client.""" 488 return self.newSocket() 489 490 def bindClient(self): 491 """Bind client socket and set self.cli_addr to its address.""" 492 self.bindSock(self.cli) 493 self.cli_addr = self.cli.getsockname() 494 495 def clientTearDown(self): 496 self.cli.close() 497 self.cli = None 498 ThreadableTest.clientTearDown(self) 499 500 501class ConnectedStreamTestMixin(SocketListeningTestMixin, 502 ThreadedSocketTestMixin): 503 """Mixin to allow client/server stream tests with connected client. 504 505 Server's socket representing connection to client is self.cli_conn 506 and client's connection to server is self.serv_conn. (Based on 507 SocketConnectedTest.) 508 """ 509 510 def setUp(self): 511 super().setUp() 512 # Indicate explicitly we're ready for the client thread to 513 # proceed and then perform the blocking call to accept 514 self.serverExplicitReady() 515 conn, addr = self.serv.accept() 516 self.cli_conn = conn 517 518 def tearDown(self): 519 self.cli_conn.close() 520 self.cli_conn = None 521 super().tearDown() 522 523 def clientSetUp(self): 524 super().clientSetUp() 525 self.cli.connect(self.serv_addr) 526 self.serv_conn = self.cli 527 528 def clientTearDown(self): 529 try: 530 self.serv_conn.close() 531 self.serv_conn = None 532 except AttributeError: 533 pass 534 super().clientTearDown() 535 536 537class UnixSocketTestBase(SocketTestBase): 538 """Base class for Unix-domain socket tests.""" 539 540 # This class is used for file descriptor passing tests, so we 541 # create the sockets in a private directory so that other users 542 # can't send anything that might be problematic for a privileged 543 # user running the tests. 544 545 def setUp(self): 546 self.dir_path = tempfile.mkdtemp() 547 self.addCleanup(os.rmdir, self.dir_path) 548 super().setUp() 549 550 def bindSock(self, sock): 551 path = tempfile.mktemp(dir=self.dir_path) 552 support.bind_unix_socket(sock, path) 553 self.addCleanup(support.unlink, path) 554 555class UnixStreamBase(UnixSocketTestBase): 556 """Base class for Unix-domain SOCK_STREAM tests.""" 557 558 def newSocket(self): 559 return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 560 561 562class InetTestBase(SocketTestBase): 563 """Base class for IPv4 socket tests.""" 564 565 host = HOST 566 567 def setUp(self): 568 super().setUp() 569 self.port = self.serv_addr[1] 570 571 def bindSock(self, sock): 572 support.bind_port(sock, host=self.host) 573 574class TCPTestBase(InetTestBase): 575 """Base class for TCP-over-IPv4 tests.""" 576 577 def newSocket(self): 578 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 579 580class UDPTestBase(InetTestBase): 581 """Base class for UDP-over-IPv4 tests.""" 582 583 def newSocket(self): 584 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 585 586class SCTPStreamBase(InetTestBase): 587 """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" 588 589 def newSocket(self): 590 return socket.socket(socket.AF_INET, socket.SOCK_STREAM, 591 socket.IPPROTO_SCTP) 592 593 594class Inet6TestBase(InetTestBase): 595 """Base class for IPv6 socket tests.""" 596 597 host = support.HOSTv6 598 599class UDP6TestBase(Inet6TestBase): 600 """Base class for UDP-over-IPv6 tests.""" 601 602 def newSocket(self): 603 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 604 605 606# Test-skipping decorators for use with ThreadableTest. 607 608def skipWithClientIf(condition, reason): 609 """Skip decorated test if condition is true, add client_skip decorator. 610 611 If the decorated object is not a class, sets its attribute 612 "client_skip" to a decorator which will return an empty function 613 if the test is to be skipped, or the original function if it is 614 not. This can be used to avoid running the client part of a 615 skipped test when using ThreadableTest. 616 """ 617 def client_pass(*args, **kwargs): 618 pass 619 def skipdec(obj): 620 retval = unittest.skip(reason)(obj) 621 if not isinstance(obj, type): 622 retval.client_skip = lambda f: client_pass 623 return retval 624 def noskipdec(obj): 625 if not (isinstance(obj, type) or hasattr(obj, "client_skip")): 626 obj.client_skip = lambda f: f 627 return obj 628 return skipdec if condition else noskipdec 629 630 631def requireAttrs(obj, *attributes): 632 """Skip decorated test if obj is missing any of the given attributes. 633 634 Sets client_skip attribute as skipWithClientIf() does. 635 """ 636 missing = [name for name in attributes if not hasattr(obj, name)] 637 return skipWithClientIf( 638 missing, "don't have " + ", ".join(name for name in missing)) 639 640 641def requireSocket(*args): 642 """Skip decorated test if a socket cannot be created with given arguments. 643 644 When an argument is given as a string, will use the value of that 645 attribute of the socket module, or skip the test if it doesn't 646 exist. Sets client_skip attribute as skipWithClientIf() does. 647 """ 648 err = None 649 missing = [obj for obj in args if 650 isinstance(obj, str) and not hasattr(socket, obj)] 651 if missing: 652 err = "don't have " + ", ".join(name for name in missing) 653 else: 654 callargs = [getattr(socket, obj) if isinstance(obj, str) else obj 655 for obj in args] 656 try: 657 s = socket.socket(*callargs) 658 except OSError as e: 659 # XXX: check errno? 660 err = str(e) 661 else: 662 s.close() 663 return skipWithClientIf( 664 err is not None, 665 "can't create socket({0}): {1}".format( 666 ", ".join(str(o) for o in args), err)) 667 668 669####################################################################### 670## Begin Tests 671 672class GeneralModuleTests(unittest.TestCase): 673 674 def test_SocketType_is_socketobject(self): 675 import _socket 676 self.assertTrue(socket.SocketType is _socket.socket) 677 s = socket.socket() 678 self.assertIsInstance(s, socket.SocketType) 679 s.close() 680 681 def test_repr(self): 682 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 683 with s: 684 self.assertIn('fd=%i' % s.fileno(), repr(s)) 685 self.assertIn('family=%s' % socket.AF_INET, repr(s)) 686 self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) 687 self.assertIn('proto=0', repr(s)) 688 self.assertNotIn('raddr', repr(s)) 689 s.bind(('127.0.0.1', 0)) 690 self.assertIn('laddr', repr(s)) 691 self.assertIn(str(s.getsockname()), repr(s)) 692 self.assertIn('[closed]', repr(s)) 693 self.assertNotIn('laddr', repr(s)) 694 695 @unittest.skipUnless(_socket is not None, 'need _socket module') 696 def test_csocket_repr(self): 697 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 698 try: 699 expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' 700 % (s.fileno(), s.family, s.type, s.proto)) 701 self.assertEqual(repr(s), expected) 702 finally: 703 s.close() 704 expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' 705 % (s.family, s.type, s.proto)) 706 self.assertEqual(repr(s), expected) 707 708 def test_weakref(self): 709 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 710 p = proxy(s) 711 self.assertEqual(p.fileno(), s.fileno()) 712 s.close() 713 s = None 714 try: 715 p.fileno() 716 except ReferenceError: 717 pass 718 else: 719 self.fail('Socket proxy still exists') 720 721 def testSocketError(self): 722 # Testing socket module exceptions 723 msg = "Error raising socket exception (%s)." 724 with self.assertRaises(OSError, msg=msg % 'OSError'): 725 raise OSError 726 with self.assertRaises(OSError, msg=msg % 'socket.herror'): 727 raise socket.herror 728 with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): 729 raise socket.gaierror 730 731 def testSendtoErrors(self): 732 # Testing that sendto doesn't mask failures. See #10169. 733 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 734 self.addCleanup(s.close) 735 s.bind(('', 0)) 736 sockname = s.getsockname() 737 # 2 args 738 with self.assertRaises(TypeError) as cm: 739 s.sendto('\u2620', sockname) 740 self.assertEqual(str(cm.exception), 741 "a bytes-like object is required, not 'str'") 742 with self.assertRaises(TypeError) as cm: 743 s.sendto(5j, sockname) 744 self.assertEqual(str(cm.exception), 745 "a bytes-like object is required, not 'complex'") 746 with self.assertRaises(TypeError) as cm: 747 s.sendto(b'foo', None) 748 self.assertIn('not NoneType',str(cm.exception)) 749 # 3 args 750 with self.assertRaises(TypeError) as cm: 751 s.sendto('\u2620', 0, sockname) 752 self.assertEqual(str(cm.exception), 753 "a bytes-like object is required, not 'str'") 754 with self.assertRaises(TypeError) as cm: 755 s.sendto(5j, 0, sockname) 756 self.assertEqual(str(cm.exception), 757 "a bytes-like object is required, not 'complex'") 758 with self.assertRaises(TypeError) as cm: 759 s.sendto(b'foo', 0, None) 760 self.assertIn('not NoneType', str(cm.exception)) 761 with self.assertRaises(TypeError) as cm: 762 s.sendto(b'foo', 'bar', sockname) 763 self.assertIn('an integer is required', str(cm.exception)) 764 with self.assertRaises(TypeError) as cm: 765 s.sendto(b'foo', None, None) 766 self.assertIn('an integer is required', str(cm.exception)) 767 # wrong number of args 768 with self.assertRaises(TypeError) as cm: 769 s.sendto(b'foo') 770 self.assertIn('(1 given)', str(cm.exception)) 771 with self.assertRaises(TypeError) as cm: 772 s.sendto(b'foo', 0, sockname, 4) 773 self.assertIn('(4 given)', str(cm.exception)) 774 775 def testCrucialConstants(self): 776 # Testing for mission critical constants 777 socket.AF_INET 778 socket.SOCK_STREAM 779 socket.SOCK_DGRAM 780 socket.SOCK_RAW 781 socket.SOCK_RDM 782 socket.SOCK_SEQPACKET 783 socket.SOL_SOCKET 784 socket.SO_REUSEADDR 785 786 def testHostnameRes(self): 787 # Testing hostname resolution mechanisms 788 hostname = socket.gethostname() 789 try: 790 ip = socket.gethostbyname(hostname) 791 except OSError: 792 # Probably name lookup wasn't set up right; skip this test 793 self.skipTest('name lookup failure') 794 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 795 try: 796 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 797 except OSError: 798 # Probably a similar problem as above; skip this test 799 self.skipTest('name lookup failure') 800 all_host_names = [hostname, hname] + aliases 801 fqhn = socket.getfqdn(ip) 802 if not fqhn in all_host_names: 803 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 804 805 def test_host_resolution(self): 806 for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', 807 '1:1:1:1:1:1:1:1:1']: 808 self.assertRaises(OSError, socket.gethostbyname, addr) 809 self.assertRaises(OSError, socket.gethostbyaddr, addr) 810 811 for addr in [support.HOST, '10.0.0.1', '255.255.255.255']: 812 self.assertEqual(socket.gethostbyname(addr), addr) 813 814 # we don't test support.HOSTv6 because there's a chance it doesn't have 815 # a matching name entry (e.g. 'ip6-localhost') 816 for host in [support.HOST]: 817 self.assertIn(host, socket.gethostbyaddr(host)[2]) 818 819 @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") 820 @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") 821 def test_sethostname(self): 822 oldhn = socket.gethostname() 823 try: 824 socket.sethostname('new') 825 except OSError as e: 826 if e.errno == errno.EPERM: 827 self.skipTest("test should be run as root") 828 else: 829 raise 830 try: 831 # running test as root! 832 self.assertEqual(socket.gethostname(), 'new') 833 # Should work with bytes objects too 834 socket.sethostname(b'bar') 835 self.assertEqual(socket.gethostname(), 'bar') 836 finally: 837 socket.sethostname(oldhn) 838 839 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 840 'socket.if_nameindex() not available.') 841 def testInterfaceNameIndex(self): 842 interfaces = socket.if_nameindex() 843 for index, name in interfaces: 844 self.assertIsInstance(index, int) 845 self.assertIsInstance(name, str) 846 # interface indices are non-zero integers 847 self.assertGreater(index, 0) 848 _index = socket.if_nametoindex(name) 849 self.assertIsInstance(_index, int) 850 self.assertEqual(index, _index) 851 _name = socket.if_indextoname(index) 852 self.assertIsInstance(_name, str) 853 self.assertEqual(name, _name) 854 855 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 856 'socket.if_nameindex() not available.') 857 def testInvalidInterfaceNameIndex(self): 858 # test nonexistent interface index/name 859 self.assertRaises(OSError, socket.if_indextoname, 0) 860 self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') 861 # test with invalid values 862 self.assertRaises(TypeError, socket.if_nametoindex, 0) 863 self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') 864 865 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 866 'test needs sys.getrefcount()') 867 def testRefCountGetNameInfo(self): 868 # Testing reference count for getnameinfo 869 try: 870 # On some versions, this loses a reference 871 orig = sys.getrefcount(__name__) 872 socket.getnameinfo(__name__,0) 873 except TypeError: 874 if sys.getrefcount(__name__) != orig: 875 self.fail("socket.getnameinfo loses a reference") 876 877 def testInterpreterCrash(self): 878 # Making sure getnameinfo doesn't crash the interpreter 879 try: 880 # On some versions, this crashes the interpreter. 881 socket.getnameinfo(('x', 0, 0, 0), 0) 882 except OSError: 883 pass 884 885 def testNtoH(self): 886 # This just checks that htons etc. are their own inverse, 887 # when looking at the lower 16 or 32 bits. 888 sizes = {socket.htonl: 32, socket.ntohl: 32, 889 socket.htons: 16, socket.ntohs: 16} 890 for func, size in sizes.items(): 891 mask = (1<<size) - 1 892 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 893 self.assertEqual(i & mask, func(func(i&mask)) & mask) 894 895 swapped = func(mask) 896 self.assertEqual(swapped & mask, mask) 897 self.assertRaises(OverflowError, func, 1<<34) 898 899 def testNtoHErrors(self): 900 good_values = [ 1, 2, 3, 1, 2, 3 ] 901 bad_values = [ -1, -2, -3, -1, -2, -3 ] 902 for k in good_values: 903 socket.ntohl(k) 904 socket.ntohs(k) 905 socket.htonl(k) 906 socket.htons(k) 907 for k in bad_values: 908 self.assertRaises(OverflowError, socket.ntohl, k) 909 self.assertRaises(OverflowError, socket.ntohs, k) 910 self.assertRaises(OverflowError, socket.htonl, k) 911 self.assertRaises(OverflowError, socket.htons, k) 912 913 def testGetServBy(self): 914 eq = self.assertEqual 915 # Find one service that exists, then check all the related interfaces. 916 # I've ordered this by protocols that have both a tcp and udp 917 # protocol, at least for modern Linuxes. 918 if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) 919 or sys.platform in ('linux', 'darwin')): 920 # avoid the 'echo' service on this platform, as there is an 921 # assumption breaking non-standard port/protocol entry 922 services = ('daytime', 'qotd', 'domain') 923 else: 924 services = ('echo', 'daytime', 'domain') 925 for service in services: 926 try: 927 port = socket.getservbyname(service, 'tcp') 928 break 929 except OSError: 930 pass 931 else: 932 raise OSError 933 # Try same call with optional protocol omitted 934 port2 = socket.getservbyname(service) 935 eq(port, port2) 936 # Try udp, but don't barf if it doesn't exist 937 try: 938 udpport = socket.getservbyname(service, 'udp') 939 except OSError: 940 udpport = None 941 else: 942 eq(udpport, port) 943 # Now make sure the lookup by port returns the same service name 944 eq(socket.getservbyport(port2), service) 945 eq(socket.getservbyport(port, 'tcp'), service) 946 if udpport is not None: 947 eq(socket.getservbyport(udpport, 'udp'), service) 948 # Make sure getservbyport does not accept out of range ports. 949 self.assertRaises(OverflowError, socket.getservbyport, -1) 950 self.assertRaises(OverflowError, socket.getservbyport, 65536) 951 952 def testDefaultTimeout(self): 953 # Testing default timeout 954 # The default timeout should initially be None 955 self.assertEqual(socket.getdefaulttimeout(), None) 956 s = socket.socket() 957 self.assertEqual(s.gettimeout(), None) 958 s.close() 959 960 # Set the default timeout to 10, and see if it propagates 961 socket.setdefaulttimeout(10) 962 self.assertEqual(socket.getdefaulttimeout(), 10) 963 s = socket.socket() 964 self.assertEqual(s.gettimeout(), 10) 965 s.close() 966 967 # Reset the default timeout to None, and see if it propagates 968 socket.setdefaulttimeout(None) 969 self.assertEqual(socket.getdefaulttimeout(), None) 970 s = socket.socket() 971 self.assertEqual(s.gettimeout(), None) 972 s.close() 973 974 # Check that setting it to an invalid value raises ValueError 975 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 976 977 # Check that setting it to an invalid type raises TypeError 978 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 979 980 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 981 'test needs socket.inet_aton()') 982 def testIPv4_inet_aton_fourbytes(self): 983 # Test that issue1008086 and issue767150 are fixed. 984 # It must return 4 bytes. 985 self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) 986 self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) 987 988 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 989 'test needs socket.inet_pton()') 990 def testIPv4toString(self): 991 from socket import inet_aton as f, inet_pton, AF_INET 992 g = lambda a: inet_pton(AF_INET, a) 993 994 assertInvalid = lambda func,a: self.assertRaises( 995 (OSError, ValueError), func, a 996 ) 997 998 self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) 999 self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) 1000 self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) 1001 self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) 1002 self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) 1003 assertInvalid(f, '0.0.0.') 1004 assertInvalid(f, '300.0.0.0') 1005 assertInvalid(f, 'a.0.0.0') 1006 assertInvalid(f, '1.2.3.4.5') 1007 assertInvalid(f, '::1') 1008 1009 self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) 1010 self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) 1011 self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) 1012 self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) 1013 assertInvalid(g, '0.0.0.') 1014 assertInvalid(g, '300.0.0.0') 1015 assertInvalid(g, 'a.0.0.0') 1016 assertInvalid(g, '1.2.3.4.5') 1017 assertInvalid(g, '::1') 1018 1019 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1020 'test needs socket.inet_pton()') 1021 def testIPv6toString(self): 1022 try: 1023 from socket import inet_pton, AF_INET6, has_ipv6 1024 if not has_ipv6: 1025 self.skipTest('IPv6 not available') 1026 except ImportError: 1027 self.skipTest('could not import needed symbols from socket') 1028 1029 if sys.platform == "win32": 1030 try: 1031 inet_pton(AF_INET6, '::') 1032 except OSError as e: 1033 if e.winerror == 10022: 1034 self.skipTest('IPv6 might not be supported') 1035 1036 f = lambda a: inet_pton(AF_INET6, a) 1037 assertInvalid = lambda a: self.assertRaises( 1038 (OSError, ValueError), f, a 1039 ) 1040 1041 self.assertEqual(b'\x00' * 16, f('::')) 1042 self.assertEqual(b'\x00' * 16, f('0::0')) 1043 self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) 1044 self.assertEqual( 1045 b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 1046 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 1047 ) 1048 self.assertEqual( 1049 b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', 1050 f('ad42:abc::127:0:254:2') 1051 ) 1052 self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) 1053 assertInvalid('0x20::') 1054 assertInvalid(':::') 1055 assertInvalid('::0::') 1056 assertInvalid('1::abc::') 1057 assertInvalid('1::abc::def') 1058 assertInvalid('1:2:3:4:5:6:') 1059 assertInvalid('1:2:3:4:5:6') 1060 assertInvalid('1:2:3:4:5:6:7:8:') 1061 assertInvalid('1:2:3:4:5:6:7:8:0') 1062 1063 self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', 1064 f('::254.42.23.64') 1065 ) 1066 self.assertEqual( 1067 b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', 1068 f('42::a29b:254.42.23.64') 1069 ) 1070 self.assertEqual( 1071 b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', 1072 f('42:a8b9:0:2:ffff:a29b:254.42.23.64') 1073 ) 1074 assertInvalid('255.254.253.252') 1075 assertInvalid('1::260.2.3.0') 1076 assertInvalid('1::0.be.e.0') 1077 assertInvalid('1:2:3:4:5:6:7:1.2.3.4') 1078 assertInvalid('::1.2.3.4:0') 1079 assertInvalid('0.100.200.0:3:4:5:6:7:8') 1080 1081 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1082 'test needs socket.inet_ntop()') 1083 def testStringToIPv4(self): 1084 from socket import inet_ntoa as f, inet_ntop, AF_INET 1085 g = lambda a: inet_ntop(AF_INET, a) 1086 assertInvalid = lambda func,a: self.assertRaises( 1087 (OSError, ValueError), func, a 1088 ) 1089 1090 self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) 1091 self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) 1092 self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) 1093 self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) 1094 assertInvalid(f, b'\x00' * 3) 1095 assertInvalid(f, b'\x00' * 5) 1096 assertInvalid(f, b'\x00' * 16) 1097 self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) 1098 1099 self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) 1100 self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) 1101 self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) 1102 assertInvalid(g, b'\x00' * 3) 1103 assertInvalid(g, b'\x00' * 5) 1104 assertInvalid(g, b'\x00' * 16) 1105 self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) 1106 1107 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1108 'test needs socket.inet_ntop()') 1109 def testStringToIPv6(self): 1110 try: 1111 from socket import inet_ntop, AF_INET6, has_ipv6 1112 if not has_ipv6: 1113 self.skipTest('IPv6 not available') 1114 except ImportError: 1115 self.skipTest('could not import needed symbols from socket') 1116 1117 if sys.platform == "win32": 1118 try: 1119 inet_ntop(AF_INET6, b'\x00' * 16) 1120 except OSError as e: 1121 if e.winerror == 10022: 1122 self.skipTest('IPv6 might not be supported') 1123 1124 f = lambda a: inet_ntop(AF_INET6, a) 1125 assertInvalid = lambda a: self.assertRaises( 1126 (OSError, ValueError), f, a 1127 ) 1128 1129 self.assertEqual('::', f(b'\x00' * 16)) 1130 self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) 1131 self.assertEqual( 1132 'aef:b01:506:1001:ffff:9997:55:170', 1133 f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 1134 ) 1135 self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) 1136 1137 assertInvalid(b'\x12' * 15) 1138 assertInvalid(b'\x12' * 17) 1139 assertInvalid(b'\x12' * 4) 1140 1141 # XXX The following don't test module-level functionality... 1142 1143 def testSockName(self): 1144 # Testing getsockname() 1145 port = support.find_unused_port() 1146 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1147 self.addCleanup(sock.close) 1148 sock.bind(("0.0.0.0", port)) 1149 name = sock.getsockname() 1150 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 1151 # it reasonable to get the host's addr in addition to 0.0.0.0. 1152 # At least for eCos. This is required for the S/390 to pass. 1153 try: 1154 my_ip_addr = socket.gethostbyname(socket.gethostname()) 1155 except OSError: 1156 # Probably name lookup wasn't set up right; skip this test 1157 self.skipTest('name lookup failure') 1158 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 1159 self.assertEqual(name[1], port) 1160 1161 def testGetSockOpt(self): 1162 # Testing getsockopt() 1163 # We know a socket should start without reuse==0 1164 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1165 self.addCleanup(sock.close) 1166 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1167 self.assertFalse(reuse != 0, "initial mode is reuse") 1168 1169 def testSetSockOpt(self): 1170 # Testing setsockopt() 1171 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1172 self.addCleanup(sock.close) 1173 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1174 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1175 self.assertFalse(reuse == 0, "failed to set reuse mode") 1176 1177 def testSendAfterClose(self): 1178 # testing send() after close() with timeout 1179 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1180 sock.settimeout(1) 1181 sock.close() 1182 self.assertRaises(OSError, sock.send, b"spam") 1183 1184 def testCloseException(self): 1185 sock = socket.socket() 1186 socket.socket(fileno=sock.fileno()).close() 1187 try: 1188 sock.close() 1189 except OSError as err: 1190 # Winsock apparently raises ENOTSOCK 1191 self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) 1192 else: 1193 self.fail("close() should raise EBADF/ENOTSOCK") 1194 1195 def testNewAttributes(self): 1196 # testing .family, .type and .protocol 1197 1198 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1199 self.assertEqual(sock.family, socket.AF_INET) 1200 if hasattr(socket, 'SOCK_CLOEXEC'): 1201 self.assertIn(sock.type, 1202 (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, 1203 socket.SOCK_STREAM)) 1204 else: 1205 self.assertEqual(sock.type, socket.SOCK_STREAM) 1206 self.assertEqual(sock.proto, 0) 1207 sock.close() 1208 1209 def test_getsockaddrarg(self): 1210 sock = socket.socket() 1211 self.addCleanup(sock.close) 1212 port = support.find_unused_port() 1213 big_port = port + 65536 1214 neg_port = port - 65536 1215 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 1216 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 1217 # Since find_unused_port() is inherently subject to race conditions, we 1218 # call it a couple times if necessary. 1219 for i in itertools.count(): 1220 port = support.find_unused_port() 1221 try: 1222 sock.bind((HOST, port)) 1223 except OSError as e: 1224 if e.errno != errno.EADDRINUSE or i == 5: 1225 raise 1226 else: 1227 break 1228 1229 @unittest.skipUnless(os.name == "nt", "Windows specific") 1230 def test_sock_ioctl(self): 1231 self.assertTrue(hasattr(socket.socket, 'ioctl')) 1232 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 1233 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 1234 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 1235 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 1236 s = socket.socket() 1237 self.addCleanup(s.close) 1238 self.assertRaises(ValueError, s.ioctl, -1, None) 1239 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 1240 1241 @unittest.skipUnless(os.name == "nt", "Windows specific") 1242 @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), 1243 'Loopback fast path support required for this test') 1244 def test_sio_loopback_fast_path(self): 1245 s = socket.socket() 1246 self.addCleanup(s.close) 1247 try: 1248 s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) 1249 except OSError as exc: 1250 WSAEOPNOTSUPP = 10045 1251 if exc.winerror == WSAEOPNOTSUPP: 1252 self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " 1253 "doesn't implemented in this Windows version") 1254 raise 1255 self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) 1256 1257 def testGetaddrinfo(self): 1258 try: 1259 socket.getaddrinfo('localhost', 80) 1260 except socket.gaierror as err: 1261 if err.errno == socket.EAI_SERVICE: 1262 # see http://bugs.python.org/issue1282647 1263 self.skipTest("buggy libc version") 1264 raise 1265 # len of every sequence is supposed to be == 5 1266 for info in socket.getaddrinfo(HOST, None): 1267 self.assertEqual(len(info), 5) 1268 # host can be a domain name, a string representation of an 1269 # IPv4/v6 address or None 1270 socket.getaddrinfo('localhost', 80) 1271 socket.getaddrinfo('127.0.0.1', 80) 1272 socket.getaddrinfo(None, 80) 1273 if support.IPV6_ENABLED: 1274 socket.getaddrinfo('::1', 80) 1275 # port can be a string service name such as "http", a numeric 1276 # port number or None 1277 socket.getaddrinfo(HOST, "http") 1278 socket.getaddrinfo(HOST, 80) 1279 socket.getaddrinfo(HOST, None) 1280 # test family and socktype filters 1281 infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) 1282 for family, type, _, _, _ in infos: 1283 self.assertEqual(family, socket.AF_INET) 1284 self.assertEqual(str(family), 'AddressFamily.AF_INET') 1285 self.assertEqual(type, socket.SOCK_STREAM) 1286 self.assertEqual(str(type), 'SocketKind.SOCK_STREAM') 1287 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1288 for _, socktype, _, _, _ in infos: 1289 self.assertEqual(socktype, socket.SOCK_STREAM) 1290 # test proto and flags arguments 1291 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1292 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1293 # a server willing to support both IPv4 and IPv6 will 1294 # usually do this 1295 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1296 socket.AI_PASSIVE) 1297 # test keyword arguments 1298 a = socket.getaddrinfo(HOST, None) 1299 b = socket.getaddrinfo(host=HOST, port=None) 1300 self.assertEqual(a, b) 1301 a = socket.getaddrinfo(HOST, None, socket.AF_INET) 1302 b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) 1303 self.assertEqual(a, b) 1304 a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1305 b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) 1306 self.assertEqual(a, b) 1307 a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1308 b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) 1309 self.assertEqual(a, b) 1310 a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1311 b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) 1312 self.assertEqual(a, b) 1313 a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1314 socket.AI_PASSIVE) 1315 b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, 1316 type=socket.SOCK_STREAM, proto=0, 1317 flags=socket.AI_PASSIVE) 1318 self.assertEqual(a, b) 1319 # Issue #6697. 1320 self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') 1321 1322 # Issue 17269: test workaround for OS X platform bug segfault 1323 if hasattr(socket, 'AI_NUMERICSERV'): 1324 try: 1325 # The arguments here are undefined and the call may succeed 1326 # or fail. All we care here is that it doesn't segfault. 1327 socket.getaddrinfo("localhost", None, 0, 0, 0, 1328 socket.AI_NUMERICSERV) 1329 except socket.gaierror: 1330 pass 1331 1332 def test_getnameinfo(self): 1333 # only IP addresses are allowed 1334 self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) 1335 1336 @unittest.skipUnless(support.is_resource_enabled('network'), 1337 'network is not enabled') 1338 def test_idna(self): 1339 # Check for internet access before running test 1340 # (issue #12804, issue #25138). 1341 with support.transient_internet('python.org'): 1342 socket.gethostbyname('python.org') 1343 1344 # these should all be successful 1345 domain = 'испытание.pythontest.net' 1346 socket.gethostbyname(domain) 1347 socket.gethostbyname_ex(domain) 1348 socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) 1349 # this may not work if the forward lookup choses the IPv6 address, as that doesn't 1350 # have a reverse entry yet 1351 # socket.gethostbyaddr('испытание.python.org') 1352 1353 def check_sendall_interrupted(self, with_timeout): 1354 # socketpair() is not strictly required, but it makes things easier. 1355 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 1356 self.skipTest("signal.alarm and socket.socketpair required for this test") 1357 # Our signal handlers clobber the C errno by calling a math function 1358 # with an invalid domain value. 1359 def ok_handler(*args): 1360 self.assertRaises(ValueError, math.acosh, 0) 1361 def raising_handler(*args): 1362 self.assertRaises(ValueError, math.acosh, 0) 1363 1 // 0 1364 c, s = socket.socketpair() 1365 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 1366 try: 1367 if with_timeout: 1368 # Just above the one second minimum for signal.alarm 1369 c.settimeout(1.5) 1370 with self.assertRaises(ZeroDivisionError): 1371 signal.alarm(1) 1372 c.sendall(b"x" * support.SOCK_MAX_SIZE) 1373 if with_timeout: 1374 signal.signal(signal.SIGALRM, ok_handler) 1375 signal.alarm(1) 1376 self.assertRaises(socket.timeout, c.sendall, 1377 b"x" * support.SOCK_MAX_SIZE) 1378 finally: 1379 signal.alarm(0) 1380 signal.signal(signal.SIGALRM, old_alarm) 1381 c.close() 1382 s.close() 1383 1384 def test_sendall_interrupted(self): 1385 self.check_sendall_interrupted(False) 1386 1387 def test_sendall_interrupted_with_timeout(self): 1388 self.check_sendall_interrupted(True) 1389 1390 def test_dealloc_warn(self): 1391 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1392 r = repr(sock) 1393 with self.assertWarns(ResourceWarning) as cm: 1394 sock = None 1395 support.gc_collect() 1396 self.assertIn(r, str(cm.warning.args[0])) 1397 # An open socket file object gets dereferenced after the socket 1398 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1399 f = sock.makefile('rb') 1400 r = repr(sock) 1401 sock = None 1402 support.gc_collect() 1403 with self.assertWarns(ResourceWarning): 1404 f = None 1405 support.gc_collect() 1406 1407 def test_name_closed_socketio(self): 1408 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1409 fp = sock.makefile("rb") 1410 fp.close() 1411 self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") 1412 1413 def test_unusable_closed_socketio(self): 1414 with socket.socket() as sock: 1415 fp = sock.makefile("rb", buffering=0) 1416 self.assertTrue(fp.readable()) 1417 self.assertFalse(fp.writable()) 1418 self.assertFalse(fp.seekable()) 1419 fp.close() 1420 self.assertRaises(ValueError, fp.readable) 1421 self.assertRaises(ValueError, fp.writable) 1422 self.assertRaises(ValueError, fp.seekable) 1423 1424 def test_makefile_mode(self): 1425 for mode in 'r', 'rb', 'rw', 'w', 'wb': 1426 with self.subTest(mode=mode): 1427 with socket.socket() as sock: 1428 with sock.makefile(mode) as fp: 1429 self.assertEqual(fp.mode, mode) 1430 1431 def test_makefile_invalid_mode(self): 1432 for mode in 'rt', 'x', '+', 'a': 1433 with self.subTest(mode=mode): 1434 with socket.socket() as sock: 1435 with self.assertRaisesRegex(ValueError, 'invalid mode'): 1436 sock.makefile(mode) 1437 1438 def test_pickle(self): 1439 sock = socket.socket() 1440 with sock: 1441 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1442 self.assertRaises(TypeError, pickle.dumps, sock, protocol) 1443 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1444 family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) 1445 self.assertEqual(family, socket.AF_INET) 1446 type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) 1447 self.assertEqual(type, socket.SOCK_STREAM) 1448 1449 def test_listen_backlog(self): 1450 for backlog in 0, -1: 1451 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1452 srv.bind((HOST, 0)) 1453 srv.listen(backlog) 1454 1455 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1456 srv.bind((HOST, 0)) 1457 srv.listen() 1458 1459 @support.cpython_only 1460 def test_listen_backlog_overflow(self): 1461 # Issue 15989 1462 import _testcapi 1463 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1464 srv.bind((HOST, 0)) 1465 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 1466 srv.close() 1467 1468 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1469 def test_flowinfo(self): 1470 self.assertRaises(OverflowError, socket.getnameinfo, 1471 (support.HOSTv6, 0, 0xffffffff), 0) 1472 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 1473 self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) 1474 1475 def test_str_for_enums(self): 1476 # Make sure that the AF_* and SOCK_* constants have enum-like string 1477 # reprs. 1478 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1479 self.assertEqual(str(s.family), 'AddressFamily.AF_INET') 1480 self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM') 1481 1482 @unittest.skipIf(os.name == 'nt', 'Will not work on Windows') 1483 def test_uknown_socket_family_repr(self): 1484 # Test that when created with a family that's not one of the known 1485 # AF_*/SOCK_* constants, socket.family just returns the number. 1486 # 1487 # To do this we fool socket.socket into believing it already has an 1488 # open fd because on this path it doesn't actually verify the family and 1489 # type and populates the socket object. 1490 # 1491 # On Windows this trick won't work, so the test is skipped. 1492 fd, path = tempfile.mkstemp() 1493 self.addCleanup(os.unlink, path) 1494 with socket.socket(family=42424, type=13331, fileno=fd) as s: 1495 self.assertEqual(s.family, 42424) 1496 self.assertEqual(s.type, 13331) 1497 1498 @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') 1499 def test__sendfile_use_sendfile(self): 1500 class File: 1501 def __init__(self, fd): 1502 self.fd = fd 1503 1504 def fileno(self): 1505 return self.fd 1506 with socket.socket() as sock: 1507 fd = os.open(os.curdir, os.O_RDONLY) 1508 os.close(fd) 1509 with self.assertRaises(socket._GiveupOnSendfile): 1510 sock._sendfile_use_sendfile(File(fd)) 1511 with self.assertRaises(OverflowError): 1512 sock._sendfile_use_sendfile(File(2**1000)) 1513 with self.assertRaises(TypeError): 1514 sock._sendfile_use_sendfile(File(None)) 1515 1516 1517@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1518class BasicCANTest(unittest.TestCase): 1519 1520 def testCrucialConstants(self): 1521 socket.AF_CAN 1522 socket.PF_CAN 1523 socket.CAN_RAW 1524 1525 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1526 'socket.CAN_BCM required for this test.') 1527 def testBCMConstants(self): 1528 socket.CAN_BCM 1529 1530 # opcodes 1531 socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task 1532 socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task 1533 socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task 1534 socket.CAN_BCM_TX_SEND # send one CAN frame 1535 socket.CAN_BCM_RX_SETUP # create RX content filter subscription 1536 socket.CAN_BCM_RX_DELETE # remove RX content filter subscription 1537 socket.CAN_BCM_RX_READ # read properties of RX content filter subscription 1538 socket.CAN_BCM_TX_STATUS # reply to TX_READ request 1539 socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) 1540 socket.CAN_BCM_RX_STATUS # reply to RX_READ request 1541 socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent 1542 socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) 1543 1544 def testCreateSocket(self): 1545 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1546 pass 1547 1548 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1549 'socket.CAN_BCM required for this test.') 1550 def testCreateBCMSocket(self): 1551 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: 1552 pass 1553 1554 def testBindAny(self): 1555 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1556 s.bind(('', )) 1557 1558 def testTooLongInterfaceName(self): 1559 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 1560 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1561 self.assertRaisesRegex(OSError, 'interface name too long', 1562 s.bind, ('x' * 1024,)) 1563 1564 @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), 1565 'socket.CAN_RAW_LOOPBACK required for this test.') 1566 def testLoopback(self): 1567 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1568 for loopback in (0, 1): 1569 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, 1570 loopback) 1571 self.assertEqual(loopback, 1572 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) 1573 1574 @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), 1575 'socket.CAN_RAW_FILTER required for this test.') 1576 def testFilter(self): 1577 can_id, can_mask = 0x200, 0x700 1578 can_filter = struct.pack("=II", can_id, can_mask) 1579 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1580 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) 1581 self.assertEqual(can_filter, 1582 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) 1583 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) 1584 1585 1586@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1587@unittest.skipUnless(thread, 'Threading required for this test.') 1588class CANTest(ThreadedCANSocketTest): 1589 1590 def __init__(self, methodName='runTest'): 1591 ThreadedCANSocketTest.__init__(self, methodName=methodName) 1592 1593 @classmethod 1594 def build_can_frame(cls, can_id, data): 1595 """Build a CAN frame.""" 1596 can_dlc = len(data) 1597 data = data.ljust(8, b'\x00') 1598 return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) 1599 1600 @classmethod 1601 def dissect_can_frame(cls, frame): 1602 """Dissect a CAN frame.""" 1603 can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) 1604 return (can_id, can_dlc, data[:can_dlc]) 1605 1606 def testSendFrame(self): 1607 cf, addr = self.s.recvfrom(self.bufsize) 1608 self.assertEqual(self.cf, cf) 1609 self.assertEqual(addr[0], self.interface) 1610 self.assertEqual(addr[1], socket.AF_CAN) 1611 1612 def _testSendFrame(self): 1613 self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') 1614 self.cli.send(self.cf) 1615 1616 def testSendMaxFrame(self): 1617 cf, addr = self.s.recvfrom(self.bufsize) 1618 self.assertEqual(self.cf, cf) 1619 1620 def _testSendMaxFrame(self): 1621 self.cf = self.build_can_frame(0x00, b'\x07' * 8) 1622 self.cli.send(self.cf) 1623 1624 def testSendMultiFrames(self): 1625 cf, addr = self.s.recvfrom(self.bufsize) 1626 self.assertEqual(self.cf1, cf) 1627 1628 cf, addr = self.s.recvfrom(self.bufsize) 1629 self.assertEqual(self.cf2, cf) 1630 1631 def _testSendMultiFrames(self): 1632 self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') 1633 self.cli.send(self.cf1) 1634 1635 self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') 1636 self.cli.send(self.cf2) 1637 1638 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1639 'socket.CAN_BCM required for this test.') 1640 def _testBCM(self): 1641 cf, addr = self.cli.recvfrom(self.bufsize) 1642 self.assertEqual(self.cf, cf) 1643 can_id, can_dlc, data = self.dissect_can_frame(cf) 1644 self.assertEqual(self.can_id, can_id) 1645 self.assertEqual(self.data, data) 1646 1647 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1648 'socket.CAN_BCM required for this test.') 1649 def testBCM(self): 1650 bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) 1651 self.addCleanup(bcm.close) 1652 bcm.connect((self.interface,)) 1653 self.can_id = 0x123 1654 self.data = bytes([0xc0, 0xff, 0xee]) 1655 self.cf = self.build_can_frame(self.can_id, self.data) 1656 opcode = socket.CAN_BCM_TX_SEND 1657 flags = 0 1658 count = 0 1659 ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 1660 bcm_can_id = 0x0222 1661 nframes = 1 1662 assert len(self.cf) == 16 1663 header = struct.pack(self.bcm_cmd_msg_fmt, 1664 opcode, 1665 flags, 1666 count, 1667 ival1_seconds, 1668 ival1_usec, 1669 ival2_seconds, 1670 ival2_usec, 1671 bcm_can_id, 1672 nframes, 1673 ) 1674 header_plus_frame = header + self.cf 1675 bytes_sent = bcm.send(header_plus_frame) 1676 self.assertEqual(bytes_sent, len(header_plus_frame)) 1677 1678 1679@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 1680class BasicRDSTest(unittest.TestCase): 1681 1682 def testCrucialConstants(self): 1683 socket.AF_RDS 1684 socket.PF_RDS 1685 1686 def testCreateSocket(self): 1687 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 1688 pass 1689 1690 def testSocketBufferSize(self): 1691 bufsize = 16384 1692 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 1693 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) 1694 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) 1695 1696 1697@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 1698@unittest.skipUnless(thread, 'Threading required for this test.') 1699class RDSTest(ThreadedRDSSocketTest): 1700 1701 def __init__(self, methodName='runTest'): 1702 ThreadedRDSSocketTest.__init__(self, methodName=methodName) 1703 1704 def setUp(self): 1705 super().setUp() 1706 self.evt = threading.Event() 1707 1708 def testSendAndRecv(self): 1709 data, addr = self.serv.recvfrom(self.bufsize) 1710 self.assertEqual(self.data, data) 1711 self.assertEqual(self.cli_addr, addr) 1712 1713 def _testSendAndRecv(self): 1714 self.data = b'spam' 1715 self.cli.sendto(self.data, 0, (HOST, self.port)) 1716 1717 def testPeek(self): 1718 data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) 1719 self.assertEqual(self.data, data) 1720 data, addr = self.serv.recvfrom(self.bufsize) 1721 self.assertEqual(self.data, data) 1722 1723 def _testPeek(self): 1724 self.data = b'spam' 1725 self.cli.sendto(self.data, 0, (HOST, self.port)) 1726 1727 @requireAttrs(socket.socket, 'recvmsg') 1728 def testSendAndRecvMsg(self): 1729 data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) 1730 self.assertEqual(self.data, data) 1731 1732 @requireAttrs(socket.socket, 'sendmsg') 1733 def _testSendAndRecvMsg(self): 1734 self.data = b'hello ' * 10 1735 self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) 1736 1737 def testSendAndRecvMulti(self): 1738 data, addr = self.serv.recvfrom(self.bufsize) 1739 self.assertEqual(self.data1, data) 1740 1741 data, addr = self.serv.recvfrom(self.bufsize) 1742 self.assertEqual(self.data2, data) 1743 1744 def _testSendAndRecvMulti(self): 1745 self.data1 = b'bacon' 1746 self.cli.sendto(self.data1, 0, (HOST, self.port)) 1747 1748 self.data2 = b'egg' 1749 self.cli.sendto(self.data2, 0, (HOST, self.port)) 1750 1751 def testSelect(self): 1752 r, w, x = select.select([self.serv], [], [], 3.0) 1753 self.assertIn(self.serv, r) 1754 data, addr = self.serv.recvfrom(self.bufsize) 1755 self.assertEqual(self.data, data) 1756 1757 def _testSelect(self): 1758 self.data = b'select' 1759 self.cli.sendto(self.data, 0, (HOST, self.port)) 1760 1761 def testCongestion(self): 1762 # wait until the sender is done 1763 self.evt.wait() 1764 1765 def _testCongestion(self): 1766 # test the behavior in case of congestion 1767 self.data = b'fill' 1768 self.cli.setblocking(False) 1769 try: 1770 # try to lower the receiver's socket buffer size 1771 self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384) 1772 except OSError: 1773 pass 1774 with self.assertRaises(OSError) as cm: 1775 try: 1776 # fill the receiver's socket buffer 1777 while True: 1778 self.cli.sendto(self.data, 0, (HOST, self.port)) 1779 finally: 1780 # signal the receiver we're done 1781 self.evt.set() 1782 # sendto() should have failed with ENOBUFS 1783 self.assertEqual(cm.exception.errno, errno.ENOBUFS) 1784 # and we should have received a congestion notification through poll 1785 r, w, x = select.select([self.serv], [], [], 3.0) 1786 self.assertIn(self.serv, r) 1787 1788 1789@unittest.skipUnless(thread, 'Threading required for this test.') 1790class BasicTCPTest(SocketConnectedTest): 1791 1792 def __init__(self, methodName='runTest'): 1793 SocketConnectedTest.__init__(self, methodName=methodName) 1794 1795 def testRecv(self): 1796 # Testing large receive over TCP 1797 msg = self.cli_conn.recv(1024) 1798 self.assertEqual(msg, MSG) 1799 1800 def _testRecv(self): 1801 self.serv_conn.send(MSG) 1802 1803 def testOverFlowRecv(self): 1804 # Testing receive in chunks over TCP 1805 seg1 = self.cli_conn.recv(len(MSG) - 3) 1806 seg2 = self.cli_conn.recv(1024) 1807 msg = seg1 + seg2 1808 self.assertEqual(msg, MSG) 1809 1810 def _testOverFlowRecv(self): 1811 self.serv_conn.send(MSG) 1812 1813 def testRecvFrom(self): 1814 # Testing large recvfrom() over TCP 1815 msg, addr = self.cli_conn.recvfrom(1024) 1816 self.assertEqual(msg, MSG) 1817 1818 def _testRecvFrom(self): 1819 self.serv_conn.send(MSG) 1820 1821 def testOverFlowRecvFrom(self): 1822 # Testing recvfrom() in chunks over TCP 1823 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 1824 seg2, addr = self.cli_conn.recvfrom(1024) 1825 msg = seg1 + seg2 1826 self.assertEqual(msg, MSG) 1827 1828 def _testOverFlowRecvFrom(self): 1829 self.serv_conn.send(MSG) 1830 1831 def testSendAll(self): 1832 # Testing sendall() with a 2048 byte string over TCP 1833 msg = b'' 1834 while 1: 1835 read = self.cli_conn.recv(1024) 1836 if not read: 1837 break 1838 msg += read 1839 self.assertEqual(msg, b'f' * 2048) 1840 1841 def _testSendAll(self): 1842 big_chunk = b'f' * 2048 1843 self.serv_conn.sendall(big_chunk) 1844 1845 def testFromFd(self): 1846 # Testing fromfd() 1847 fd = self.cli_conn.fileno() 1848 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 1849 self.addCleanup(sock.close) 1850 self.assertIsInstance(sock, socket.socket) 1851 msg = sock.recv(1024) 1852 self.assertEqual(msg, MSG) 1853 1854 def _testFromFd(self): 1855 self.serv_conn.send(MSG) 1856 1857 def testDup(self): 1858 # Testing dup() 1859 sock = self.cli_conn.dup() 1860 self.addCleanup(sock.close) 1861 msg = sock.recv(1024) 1862 self.assertEqual(msg, MSG) 1863 1864 def _testDup(self): 1865 self.serv_conn.send(MSG) 1866 1867 def testShutdown(self): 1868 # Testing shutdown() 1869 msg = self.cli_conn.recv(1024) 1870 self.assertEqual(msg, MSG) 1871 # wait for _testShutdown to finish: on OS X, when the server 1872 # closes the connection the client also becomes disconnected, 1873 # and the client's shutdown call will fail. (Issue #4397.) 1874 self.done.wait() 1875 1876 def _testShutdown(self): 1877 self.serv_conn.send(MSG) 1878 self.serv_conn.shutdown(2) 1879 1880 testShutdown_overflow = support.cpython_only(testShutdown) 1881 1882 @support.cpython_only 1883 def _testShutdown_overflow(self): 1884 import _testcapi 1885 self.serv_conn.send(MSG) 1886 # Issue 15989 1887 self.assertRaises(OverflowError, self.serv_conn.shutdown, 1888 _testcapi.INT_MAX + 1) 1889 self.assertRaises(OverflowError, self.serv_conn.shutdown, 1890 2 + (_testcapi.UINT_MAX + 1)) 1891 self.serv_conn.shutdown(2) 1892 1893 def testDetach(self): 1894 # Testing detach() 1895 fileno = self.cli_conn.fileno() 1896 f = self.cli_conn.detach() 1897 self.assertEqual(f, fileno) 1898 # cli_conn cannot be used anymore... 1899 self.assertTrue(self.cli_conn._closed) 1900 self.assertRaises(OSError, self.cli_conn.recv, 1024) 1901 self.cli_conn.close() 1902 # ...but we can create another socket using the (still open) 1903 # file descriptor 1904 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) 1905 self.addCleanup(sock.close) 1906 msg = sock.recv(1024) 1907 self.assertEqual(msg, MSG) 1908 1909 def _testDetach(self): 1910 self.serv_conn.send(MSG) 1911 1912@unittest.skipUnless(thread, 'Threading required for this test.') 1913class BasicUDPTest(ThreadedUDPSocketTest): 1914 1915 def __init__(self, methodName='runTest'): 1916 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 1917 1918 def testSendtoAndRecv(self): 1919 # Testing sendto() and Recv() over UDP 1920 msg = self.serv.recv(len(MSG)) 1921 self.assertEqual(msg, MSG) 1922 1923 def _testSendtoAndRecv(self): 1924 self.cli.sendto(MSG, 0, (HOST, self.port)) 1925 1926 def testRecvFrom(self): 1927 # Testing recvfrom() over UDP 1928 msg, addr = self.serv.recvfrom(len(MSG)) 1929 self.assertEqual(msg, MSG) 1930 1931 def _testRecvFrom(self): 1932 self.cli.sendto(MSG, 0, (HOST, self.port)) 1933 1934 def testRecvFromNegative(self): 1935 # Negative lengths passed to recvfrom should give ValueError. 1936 self.assertRaises(ValueError, self.serv.recvfrom, -1) 1937 1938 def _testRecvFromNegative(self): 1939 self.cli.sendto(MSG, 0, (HOST, self.port)) 1940 1941# Tests for the sendmsg()/recvmsg() interface. Where possible, the 1942# same test code is used with different families and types of socket 1943# (e.g. stream, datagram), and tests using recvmsg() are repeated 1944# using recvmsg_into(). 1945# 1946# The generic test classes such as SendmsgTests and 1947# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be 1948# supplied with sockets cli_sock and serv_sock representing the 1949# client's and the server's end of the connection respectively, and 1950# attributes cli_addr and serv_addr holding their (numeric where 1951# appropriate) addresses. 1952# 1953# The final concrete test classes combine these with subclasses of 1954# SocketTestBase which set up client and server sockets of a specific 1955# type, and with subclasses of SendrecvmsgBase such as 1956# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these 1957# sockets to cli_sock and serv_sock and override the methods and 1958# attributes of SendrecvmsgBase to fill in destination addresses if 1959# needed when sending, check for specific flags in msg_flags, etc. 1960# 1961# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using 1962# recvmsg_into(). 1963 1964# XXX: like the other datagram (UDP) tests in this module, the code 1965# here assumes that datagram delivery on the local machine will be 1966# reliable. 1967 1968class SendrecvmsgBase(ThreadSafeCleanupTestCase): 1969 # Base class for sendmsg()/recvmsg() tests. 1970 1971 # Time in seconds to wait before considering a test failed, or 1972 # None for no timeout. Not all tests actually set a timeout. 1973 fail_timeout = 3.0 1974 1975 def setUp(self): 1976 self.misc_event = threading.Event() 1977 super().setUp() 1978 1979 def sendToServer(self, msg): 1980 # Send msg to the server. 1981 return self.cli_sock.send(msg) 1982 1983 # Tuple of alternative default arguments for sendmsg() when called 1984 # via sendmsgToServer() (e.g. to include a destination address). 1985 sendmsg_to_server_defaults = () 1986 1987 def sendmsgToServer(self, *args): 1988 # Call sendmsg() on self.cli_sock with the given arguments, 1989 # filling in any arguments which are not supplied with the 1990 # corresponding items of self.sendmsg_to_server_defaults, if 1991 # any. 1992 return self.cli_sock.sendmsg( 1993 *(args + self.sendmsg_to_server_defaults[len(args):])) 1994 1995 def doRecvmsg(self, sock, bufsize, *args): 1996 # Call recvmsg() on sock with given arguments and return its 1997 # result. Should be used for tests which can use either 1998 # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides 1999 # this method with one which emulates it using recvmsg_into(), 2000 # thus allowing the same test to be used for both methods. 2001 result = sock.recvmsg(bufsize, *args) 2002 self.registerRecvmsgResult(result) 2003 return result 2004 2005 def registerRecvmsgResult(self, result): 2006 # Called by doRecvmsg() with the return value of recvmsg() or 2007 # recvmsg_into(). Can be overridden to arrange cleanup based 2008 # on the returned ancillary data, for instance. 2009 pass 2010 2011 def checkRecvmsgAddress(self, addr1, addr2): 2012 # Called to compare the received address with the address of 2013 # the peer. 2014 self.assertEqual(addr1, addr2) 2015 2016 # Flags that are normally unset in msg_flags 2017 msg_flags_common_unset = 0 2018 for name in ("MSG_CTRUNC", "MSG_OOB"): 2019 msg_flags_common_unset |= getattr(socket, name, 0) 2020 2021 # Flags that are normally set 2022 msg_flags_common_set = 0 2023 2024 # Flags set when a complete record has been received (e.g. MSG_EOR 2025 # for SCTP) 2026 msg_flags_eor_indicator = 0 2027 2028 # Flags set when a complete record has not been received 2029 # (e.g. MSG_TRUNC for datagram sockets) 2030 msg_flags_non_eor_indicator = 0 2031 2032 def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): 2033 # Method to check the value of msg_flags returned by recvmsg[_into](). 2034 # 2035 # Checks that all bits in msg_flags_common_set attribute are 2036 # set in "flags" and all bits in msg_flags_common_unset are 2037 # unset. 2038 # 2039 # The "eor" argument specifies whether the flags should 2040 # indicate that a full record (or datagram) has been received. 2041 # If "eor" is None, no checks are done; otherwise, checks 2042 # that: 2043 # 2044 # * if "eor" is true, all bits in msg_flags_eor_indicator are 2045 # set and all bits in msg_flags_non_eor_indicator are unset 2046 # 2047 # * if "eor" is false, all bits in msg_flags_non_eor_indicator 2048 # are set and all bits in msg_flags_eor_indicator are unset 2049 # 2050 # If "checkset" and/or "checkunset" are supplied, they require 2051 # the given bits to be set or unset respectively, overriding 2052 # what the attributes require for those bits. 2053 # 2054 # If any bits are set in "ignore", they will not be checked, 2055 # regardless of the other inputs. 2056 # 2057 # Will raise Exception if the inputs require a bit to be both 2058 # set and unset, and it is not ignored. 2059 2060 defaultset = self.msg_flags_common_set 2061 defaultunset = self.msg_flags_common_unset 2062 2063 if eor: 2064 defaultset |= self.msg_flags_eor_indicator 2065 defaultunset |= self.msg_flags_non_eor_indicator 2066 elif eor is not None: 2067 defaultset |= self.msg_flags_non_eor_indicator 2068 defaultunset |= self.msg_flags_eor_indicator 2069 2070 # Function arguments override defaults 2071 defaultset &= ~checkunset 2072 defaultunset &= ~checkset 2073 2074 # Merge arguments with remaining defaults, and check for conflicts 2075 checkset |= defaultset 2076 checkunset |= defaultunset 2077 inboth = checkset & checkunset & ~ignore 2078 if inboth: 2079 raise Exception("contradictory set, unset requirements for flags " 2080 "{0:#x}".format(inboth)) 2081 2082 # Compare with given msg_flags value 2083 mask = (checkset | checkunset) & ~ignore 2084 self.assertEqual(flags & mask, checkset & mask) 2085 2086 2087class RecvmsgIntoMixin(SendrecvmsgBase): 2088 # Mixin to implement doRecvmsg() using recvmsg_into(). 2089 2090 def doRecvmsg(self, sock, bufsize, *args): 2091 buf = bytearray(bufsize) 2092 result = sock.recvmsg_into([buf], *args) 2093 self.registerRecvmsgResult(result) 2094 self.assertGreaterEqual(result[0], 0) 2095 self.assertLessEqual(result[0], bufsize) 2096 return (bytes(buf[:result[0]]),) + result[1:] 2097 2098 2099class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): 2100 # Defines flags to be checked in msg_flags for datagram sockets. 2101 2102 @property 2103 def msg_flags_non_eor_indicator(self): 2104 return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC 2105 2106 2107class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): 2108 # Defines flags to be checked in msg_flags for SCTP sockets. 2109 2110 @property 2111 def msg_flags_eor_indicator(self): 2112 return super().msg_flags_eor_indicator | socket.MSG_EOR 2113 2114 2115class SendrecvmsgConnectionlessBase(SendrecvmsgBase): 2116 # Base class for tests on connectionless-mode sockets. Users must 2117 # supply sockets on attributes cli and serv to be mapped to 2118 # cli_sock and serv_sock respectively. 2119 2120 @property 2121 def serv_sock(self): 2122 return self.serv 2123 2124 @property 2125 def cli_sock(self): 2126 return self.cli 2127 2128 @property 2129 def sendmsg_to_server_defaults(self): 2130 return ([], [], 0, self.serv_addr) 2131 2132 def sendToServer(self, msg): 2133 return self.cli_sock.sendto(msg, self.serv_addr) 2134 2135 2136class SendrecvmsgConnectedBase(SendrecvmsgBase): 2137 # Base class for tests on connected sockets. Users must supply 2138 # sockets on attributes serv_conn and cli_conn (representing the 2139 # connections *to* the server and the client), to be mapped to 2140 # cli_sock and serv_sock respectively. 2141 2142 @property 2143 def serv_sock(self): 2144 return self.cli_conn 2145 2146 @property 2147 def cli_sock(self): 2148 return self.serv_conn 2149 2150 def checkRecvmsgAddress(self, addr1, addr2): 2151 # Address is currently "unspecified" for a connected socket, 2152 # so we don't examine it 2153 pass 2154 2155 2156class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): 2157 # Base class to set a timeout on server's socket. 2158 2159 def setUp(self): 2160 super().setUp() 2161 self.serv_sock.settimeout(self.fail_timeout) 2162 2163 2164class SendmsgTests(SendrecvmsgServerTimeoutBase): 2165 # Tests for sendmsg() which can use any socket type and do not 2166 # involve recvmsg() or recvmsg_into(). 2167 2168 def testSendmsg(self): 2169 # Send a simple message with sendmsg(). 2170 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2171 2172 def _testSendmsg(self): 2173 self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) 2174 2175 def testSendmsgDataGenerator(self): 2176 # Send from buffer obtained from a generator (not a sequence). 2177 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2178 2179 def _testSendmsgDataGenerator(self): 2180 self.assertEqual(self.sendmsgToServer((o for o in [MSG])), 2181 len(MSG)) 2182 2183 def testSendmsgAncillaryGenerator(self): 2184 # Gather (empty) ancillary data from a generator. 2185 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2186 2187 def _testSendmsgAncillaryGenerator(self): 2188 self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), 2189 len(MSG)) 2190 2191 def testSendmsgArray(self): 2192 # Send data from an array instead of the usual bytes object. 2193 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2194 2195 def _testSendmsgArray(self): 2196 self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), 2197 len(MSG)) 2198 2199 def testSendmsgGather(self): 2200 # Send message data from more than one buffer (gather write). 2201 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2202 2203 def _testSendmsgGather(self): 2204 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2205 2206 def testSendmsgBadArgs(self): 2207 # Check that sendmsg() rejects invalid arguments. 2208 self.assertEqual(self.serv_sock.recv(1000), b"done") 2209 2210 def _testSendmsgBadArgs(self): 2211 self.assertRaises(TypeError, self.cli_sock.sendmsg) 2212 self.assertRaises(TypeError, self.sendmsgToServer, 2213 b"not in an iterable") 2214 self.assertRaises(TypeError, self.sendmsgToServer, 2215 object()) 2216 self.assertRaises(TypeError, self.sendmsgToServer, 2217 [object()]) 2218 self.assertRaises(TypeError, self.sendmsgToServer, 2219 [MSG, object()]) 2220 self.assertRaises(TypeError, self.sendmsgToServer, 2221 [MSG], object()) 2222 self.assertRaises(TypeError, self.sendmsgToServer, 2223 [MSG], [], object()) 2224 self.assertRaises(TypeError, self.sendmsgToServer, 2225 [MSG], [], 0, object()) 2226 self.sendToServer(b"done") 2227 2228 def testSendmsgBadCmsg(self): 2229 # Check that invalid ancillary data items are rejected. 2230 self.assertEqual(self.serv_sock.recv(1000), b"done") 2231 2232 def _testSendmsgBadCmsg(self): 2233 self.assertRaises(TypeError, self.sendmsgToServer, 2234 [MSG], [object()]) 2235 self.assertRaises(TypeError, self.sendmsgToServer, 2236 [MSG], [(object(), 0, b"data")]) 2237 self.assertRaises(TypeError, self.sendmsgToServer, 2238 [MSG], [(0, object(), b"data")]) 2239 self.assertRaises(TypeError, self.sendmsgToServer, 2240 [MSG], [(0, 0, object())]) 2241 self.assertRaises(TypeError, self.sendmsgToServer, 2242 [MSG], [(0, 0)]) 2243 self.assertRaises(TypeError, self.sendmsgToServer, 2244 [MSG], [(0, 0, b"data", 42)]) 2245 self.sendToServer(b"done") 2246 2247 @requireAttrs(socket, "CMSG_SPACE") 2248 def testSendmsgBadMultiCmsg(self): 2249 # Check that invalid ancillary data items are rejected when 2250 # more than one item is present. 2251 self.assertEqual(self.serv_sock.recv(1000), b"done") 2252 2253 @testSendmsgBadMultiCmsg.client_skip 2254 def _testSendmsgBadMultiCmsg(self): 2255 self.assertRaises(TypeError, self.sendmsgToServer, 2256 [MSG], [0, 0, b""]) 2257 self.assertRaises(TypeError, self.sendmsgToServer, 2258 [MSG], [(0, 0, b""), object()]) 2259 self.sendToServer(b"done") 2260 2261 def testSendmsgExcessCmsgReject(self): 2262 # Check that sendmsg() rejects excess ancillary data items 2263 # when the number that can be sent is limited. 2264 self.assertEqual(self.serv_sock.recv(1000), b"done") 2265 2266 def _testSendmsgExcessCmsgReject(self): 2267 if not hasattr(socket, "CMSG_SPACE"): 2268 # Can only send one item 2269 with self.assertRaises(OSError) as cm: 2270 self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) 2271 self.assertIsNone(cm.exception.errno) 2272 self.sendToServer(b"done") 2273 2274 def testSendmsgAfterClose(self): 2275 # Check that sendmsg() fails on a closed socket. 2276 pass 2277 2278 def _testSendmsgAfterClose(self): 2279 self.cli_sock.close() 2280 self.assertRaises(OSError, self.sendmsgToServer, [MSG]) 2281 2282 2283class SendmsgStreamTests(SendmsgTests): 2284 # Tests for sendmsg() which require a stream socket and do not 2285 # involve recvmsg() or recvmsg_into(). 2286 2287 def testSendmsgExplicitNoneAddr(self): 2288 # Check that peer address can be specified as None. 2289 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2290 2291 def _testSendmsgExplicitNoneAddr(self): 2292 self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) 2293 2294 def testSendmsgTimeout(self): 2295 # Check that timeout works with sendmsg(). 2296 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2297 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2298 2299 def _testSendmsgTimeout(self): 2300 try: 2301 self.cli_sock.settimeout(0.03) 2302 with self.assertRaises(socket.timeout): 2303 while True: 2304 self.sendmsgToServer([b"a"*512]) 2305 finally: 2306 self.misc_event.set() 2307 2308 # XXX: would be nice to have more tests for sendmsg flags argument. 2309 2310 # Linux supports MSG_DONTWAIT when sending, but in general, it 2311 # only works when receiving. Could add other platforms if they 2312 # support it too. 2313 @skipWithClientIf(sys.platform not in {"linux"}, 2314 "MSG_DONTWAIT not known to work on this platform when " 2315 "sending") 2316 def testSendmsgDontWait(self): 2317 # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. 2318 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2319 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2320 2321 @testSendmsgDontWait.client_skip 2322 def _testSendmsgDontWait(self): 2323 try: 2324 with self.assertRaises(OSError) as cm: 2325 while True: 2326 self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) 2327 self.assertIn(cm.exception.errno, 2328 (errno.EAGAIN, errno.EWOULDBLOCK)) 2329 finally: 2330 self.misc_event.set() 2331 2332 2333class SendmsgConnectionlessTests(SendmsgTests): 2334 # Tests for sendmsg() which require a connectionless-mode 2335 # (e.g. datagram) socket, and do not involve recvmsg() or 2336 # recvmsg_into(). 2337 2338 def testSendmsgNoDestAddr(self): 2339 # Check that sendmsg() fails when no destination address is 2340 # given for unconnected socket. 2341 pass 2342 2343 def _testSendmsgNoDestAddr(self): 2344 self.assertRaises(OSError, self.cli_sock.sendmsg, 2345 [MSG]) 2346 self.assertRaises(OSError, self.cli_sock.sendmsg, 2347 [MSG], [], 0, None) 2348 2349 2350class RecvmsgGenericTests(SendrecvmsgBase): 2351 # Tests for recvmsg() which can also be emulated using 2352 # recvmsg_into(), and can use any socket type. 2353 2354 def testRecvmsg(self): 2355 # Receive a simple message with recvmsg[_into](). 2356 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2357 self.assertEqual(msg, MSG) 2358 self.checkRecvmsgAddress(addr, self.cli_addr) 2359 self.assertEqual(ancdata, []) 2360 self.checkFlags(flags, eor=True) 2361 2362 def _testRecvmsg(self): 2363 self.sendToServer(MSG) 2364 2365 def testRecvmsgExplicitDefaults(self): 2366 # Test recvmsg[_into]() with default arguments provided explicitly. 2367 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2368 len(MSG), 0, 0) 2369 self.assertEqual(msg, MSG) 2370 self.checkRecvmsgAddress(addr, self.cli_addr) 2371 self.assertEqual(ancdata, []) 2372 self.checkFlags(flags, eor=True) 2373 2374 def _testRecvmsgExplicitDefaults(self): 2375 self.sendToServer(MSG) 2376 2377 def testRecvmsgShorter(self): 2378 # Receive a message smaller than buffer. 2379 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2380 len(MSG) + 42) 2381 self.assertEqual(msg, MSG) 2382 self.checkRecvmsgAddress(addr, self.cli_addr) 2383 self.assertEqual(ancdata, []) 2384 self.checkFlags(flags, eor=True) 2385 2386 def _testRecvmsgShorter(self): 2387 self.sendToServer(MSG) 2388 2389 # FreeBSD < 8 doesn't always set the MSG_TRUNC flag when a truncated 2390 # datagram is received (issue #13001). 2391 @support.requires_freebsd_version(8) 2392 def testRecvmsgTrunc(self): 2393 # Receive part of message, check for truncation indicators. 2394 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2395 len(MSG) - 3) 2396 self.assertEqual(msg, MSG[:-3]) 2397 self.checkRecvmsgAddress(addr, self.cli_addr) 2398 self.assertEqual(ancdata, []) 2399 self.checkFlags(flags, eor=False) 2400 2401 @support.requires_freebsd_version(8) 2402 def _testRecvmsgTrunc(self): 2403 self.sendToServer(MSG) 2404 2405 def testRecvmsgShortAncillaryBuf(self): 2406 # Test ancillary data buffer too small to hold any ancillary data. 2407 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2408 len(MSG), 1) 2409 self.assertEqual(msg, MSG) 2410 self.checkRecvmsgAddress(addr, self.cli_addr) 2411 self.assertEqual(ancdata, []) 2412 self.checkFlags(flags, eor=True) 2413 2414 def _testRecvmsgShortAncillaryBuf(self): 2415 self.sendToServer(MSG) 2416 2417 def testRecvmsgLongAncillaryBuf(self): 2418 # Test large ancillary data buffer. 2419 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2420 len(MSG), 10240) 2421 self.assertEqual(msg, MSG) 2422 self.checkRecvmsgAddress(addr, self.cli_addr) 2423 self.assertEqual(ancdata, []) 2424 self.checkFlags(flags, eor=True) 2425 2426 def _testRecvmsgLongAncillaryBuf(self): 2427 self.sendToServer(MSG) 2428 2429 def testRecvmsgAfterClose(self): 2430 # Check that recvmsg[_into]() fails on a closed socket. 2431 self.serv_sock.close() 2432 self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) 2433 2434 def _testRecvmsgAfterClose(self): 2435 pass 2436 2437 def testRecvmsgTimeout(self): 2438 # Check that timeout works. 2439 try: 2440 self.serv_sock.settimeout(0.03) 2441 self.assertRaises(socket.timeout, 2442 self.doRecvmsg, self.serv_sock, len(MSG)) 2443 finally: 2444 self.misc_event.set() 2445 2446 def _testRecvmsgTimeout(self): 2447 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2448 2449 @requireAttrs(socket, "MSG_PEEK") 2450 def testRecvmsgPeek(self): 2451 # Check that MSG_PEEK in flags enables examination of pending 2452 # data without consuming it. 2453 2454 # Receive part of data with MSG_PEEK. 2455 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2456 len(MSG) - 3, 0, 2457 socket.MSG_PEEK) 2458 self.assertEqual(msg, MSG[:-3]) 2459 self.checkRecvmsgAddress(addr, self.cli_addr) 2460 self.assertEqual(ancdata, []) 2461 # Ignoring MSG_TRUNC here (so this test is the same for stream 2462 # and datagram sockets). Some wording in POSIX seems to 2463 # suggest that it needn't be set when peeking, but that may 2464 # just be a slip. 2465 self.checkFlags(flags, eor=False, 2466 ignore=getattr(socket, "MSG_TRUNC", 0)) 2467 2468 # Receive all data with MSG_PEEK. 2469 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2470 len(MSG), 0, 2471 socket.MSG_PEEK) 2472 self.assertEqual(msg, MSG) 2473 self.checkRecvmsgAddress(addr, self.cli_addr) 2474 self.assertEqual(ancdata, []) 2475 self.checkFlags(flags, eor=True) 2476 2477 # Check that the same data can still be received normally. 2478 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2479 self.assertEqual(msg, MSG) 2480 self.checkRecvmsgAddress(addr, self.cli_addr) 2481 self.assertEqual(ancdata, []) 2482 self.checkFlags(flags, eor=True) 2483 2484 @testRecvmsgPeek.client_skip 2485 def _testRecvmsgPeek(self): 2486 self.sendToServer(MSG) 2487 2488 @requireAttrs(socket.socket, "sendmsg") 2489 def testRecvmsgFromSendmsg(self): 2490 # Test receiving with recvmsg[_into]() when message is sent 2491 # using sendmsg(). 2492 self.serv_sock.settimeout(self.fail_timeout) 2493 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2494 self.assertEqual(msg, MSG) 2495 self.checkRecvmsgAddress(addr, self.cli_addr) 2496 self.assertEqual(ancdata, []) 2497 self.checkFlags(flags, eor=True) 2498 2499 @testRecvmsgFromSendmsg.client_skip 2500 def _testRecvmsgFromSendmsg(self): 2501 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2502 2503 2504class RecvmsgGenericStreamTests(RecvmsgGenericTests): 2505 # Tests which require a stream socket and can use either recvmsg() 2506 # or recvmsg_into(). 2507 2508 def testRecvmsgEOF(self): 2509 # Receive end-of-stream indicator (b"", peer socket closed). 2510 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 2511 self.assertEqual(msg, b"") 2512 self.checkRecvmsgAddress(addr, self.cli_addr) 2513 self.assertEqual(ancdata, []) 2514 self.checkFlags(flags, eor=None) # Might not have end-of-record marker 2515 2516 def _testRecvmsgEOF(self): 2517 self.cli_sock.close() 2518 2519 def testRecvmsgOverflow(self): 2520 # Receive a message in more than one chunk. 2521 seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2522 len(MSG) - 3) 2523 self.checkRecvmsgAddress(addr, self.cli_addr) 2524 self.assertEqual(ancdata, []) 2525 self.checkFlags(flags, eor=False) 2526 2527 seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 2528 self.checkRecvmsgAddress(addr, self.cli_addr) 2529 self.assertEqual(ancdata, []) 2530 self.checkFlags(flags, eor=True) 2531 2532 msg = seg1 + seg2 2533 self.assertEqual(msg, MSG) 2534 2535 def _testRecvmsgOverflow(self): 2536 self.sendToServer(MSG) 2537 2538 2539class RecvmsgTests(RecvmsgGenericTests): 2540 # Tests for recvmsg() which can use any socket type. 2541 2542 def testRecvmsgBadArgs(self): 2543 # Check that recvmsg() rejects invalid arguments. 2544 self.assertRaises(TypeError, self.serv_sock.recvmsg) 2545 self.assertRaises(ValueError, self.serv_sock.recvmsg, 2546 -1, 0, 0) 2547 self.assertRaises(ValueError, self.serv_sock.recvmsg, 2548 len(MSG), -1, 0) 2549 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2550 [bytearray(10)], 0, 0) 2551 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2552 object(), 0, 0) 2553 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2554 len(MSG), object(), 0) 2555 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2556 len(MSG), 0, object()) 2557 2558 msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) 2559 self.assertEqual(msg, MSG) 2560 self.checkRecvmsgAddress(addr, self.cli_addr) 2561 self.assertEqual(ancdata, []) 2562 self.checkFlags(flags, eor=True) 2563 2564 def _testRecvmsgBadArgs(self): 2565 self.sendToServer(MSG) 2566 2567 2568class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): 2569 # Tests for recvmsg_into() which can use any socket type. 2570 2571 def testRecvmsgIntoBadArgs(self): 2572 # Check that recvmsg_into() rejects invalid arguments. 2573 buf = bytearray(len(MSG)) 2574 self.assertRaises(TypeError, self.serv_sock.recvmsg_into) 2575 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2576 len(MSG), 0, 0) 2577 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2578 buf, 0, 0) 2579 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2580 [object()], 0, 0) 2581 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2582 [b"I'm not writable"], 0, 0) 2583 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2584 [buf, object()], 0, 0) 2585 self.assertRaises(ValueError, self.serv_sock.recvmsg_into, 2586 [buf], -1, 0) 2587 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2588 [buf], object(), 0) 2589 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2590 [buf], 0, object()) 2591 2592 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) 2593 self.assertEqual(nbytes, len(MSG)) 2594 self.assertEqual(buf, bytearray(MSG)) 2595 self.checkRecvmsgAddress(addr, self.cli_addr) 2596 self.assertEqual(ancdata, []) 2597 self.checkFlags(flags, eor=True) 2598 2599 def _testRecvmsgIntoBadArgs(self): 2600 self.sendToServer(MSG) 2601 2602 def testRecvmsgIntoGenerator(self): 2603 # Receive into buffer obtained from a generator (not a sequence). 2604 buf = bytearray(len(MSG)) 2605 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 2606 (o for o in [buf])) 2607 self.assertEqual(nbytes, len(MSG)) 2608 self.assertEqual(buf, bytearray(MSG)) 2609 self.checkRecvmsgAddress(addr, self.cli_addr) 2610 self.assertEqual(ancdata, []) 2611 self.checkFlags(flags, eor=True) 2612 2613 def _testRecvmsgIntoGenerator(self): 2614 self.sendToServer(MSG) 2615 2616 def testRecvmsgIntoArray(self): 2617 # Receive into an array rather than the usual bytearray. 2618 buf = array.array("B", [0] * len(MSG)) 2619 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) 2620 self.assertEqual(nbytes, len(MSG)) 2621 self.assertEqual(buf.tobytes(), MSG) 2622 self.checkRecvmsgAddress(addr, self.cli_addr) 2623 self.assertEqual(ancdata, []) 2624 self.checkFlags(flags, eor=True) 2625 2626 def _testRecvmsgIntoArray(self): 2627 self.sendToServer(MSG) 2628 2629 def testRecvmsgIntoScatter(self): 2630 # Receive into multiple buffers (scatter write). 2631 b1 = bytearray(b"----") 2632 b2 = bytearray(b"0123456789") 2633 b3 = bytearray(b"--------------") 2634 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 2635 [b1, memoryview(b2)[2:9], b3]) 2636 self.assertEqual(nbytes, len(b"Mary had a little lamb")) 2637 self.assertEqual(b1, bytearray(b"Mary")) 2638 self.assertEqual(b2, bytearray(b"01 had a 9")) 2639 self.assertEqual(b3, bytearray(b"little lamb---")) 2640 self.checkRecvmsgAddress(addr, self.cli_addr) 2641 self.assertEqual(ancdata, []) 2642 self.checkFlags(flags, eor=True) 2643 2644 def _testRecvmsgIntoScatter(self): 2645 self.sendToServer(b"Mary had a little lamb") 2646 2647 2648class CmsgMacroTests(unittest.TestCase): 2649 # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests 2650 # assumptions used by sendmsg() and recvmsg[_into](), which share 2651 # code with these functions. 2652 2653 # Match the definition in socketmodule.c 2654 try: 2655 import _testcapi 2656 except ImportError: 2657 socklen_t_limit = 0x7fffffff 2658 else: 2659 socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) 2660 2661 @requireAttrs(socket, "CMSG_LEN") 2662 def testCMSG_LEN(self): 2663 # Test CMSG_LEN() with various valid and invalid values, 2664 # checking the assumptions used by recvmsg() and sendmsg(). 2665 toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 2666 values = list(range(257)) + list(range(toobig - 257, toobig)) 2667 2668 # struct cmsghdr has at least three members, two of which are ints 2669 self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) 2670 for n in values: 2671 ret = socket.CMSG_LEN(n) 2672 # This is how recvmsg() calculates the data size 2673 self.assertEqual(ret - socket.CMSG_LEN(0), n) 2674 self.assertLessEqual(ret, self.socklen_t_limit) 2675 2676 self.assertRaises(OverflowError, socket.CMSG_LEN, -1) 2677 # sendmsg() shares code with these functions, and requires 2678 # that it reject values over the limit. 2679 self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) 2680 self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) 2681 2682 @requireAttrs(socket, "CMSG_SPACE") 2683 def testCMSG_SPACE(self): 2684 # Test CMSG_SPACE() with various valid and invalid values, 2685 # checking the assumptions used by sendmsg(). 2686 toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 2687 values = list(range(257)) + list(range(toobig - 257, toobig)) 2688 2689 last = socket.CMSG_SPACE(0) 2690 # struct cmsghdr has at least three members, two of which are ints 2691 self.assertGreater(last, array.array("i").itemsize * 2) 2692 for n in values: 2693 ret = socket.CMSG_SPACE(n) 2694 self.assertGreaterEqual(ret, last) 2695 self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) 2696 self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) 2697 self.assertLessEqual(ret, self.socklen_t_limit) 2698 last = ret 2699 2700 self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) 2701 # sendmsg() shares code with these functions, and requires 2702 # that it reject values over the limit. 2703 self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) 2704 self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) 2705 2706 2707class SCMRightsTest(SendrecvmsgServerTimeoutBase): 2708 # Tests for file descriptor passing on Unix-domain sockets. 2709 2710 # Invalid file descriptor value that's unlikely to evaluate to a 2711 # real FD even if one of its bytes is replaced with a different 2712 # value (which shouldn't actually happen). 2713 badfd = -0x5555 2714 2715 def newFDs(self, n): 2716 # Return a list of n file descriptors for newly-created files 2717 # containing their list indices as ASCII numbers. 2718 fds = [] 2719 for i in range(n): 2720 fd, path = tempfile.mkstemp() 2721 self.addCleanup(os.unlink, path) 2722 self.addCleanup(os.close, fd) 2723 os.write(fd, str(i).encode()) 2724 fds.append(fd) 2725 return fds 2726 2727 def checkFDs(self, fds): 2728 # Check that the file descriptors in the given list contain 2729 # their correct list indices as ASCII numbers. 2730 for n, fd in enumerate(fds): 2731 os.lseek(fd, 0, os.SEEK_SET) 2732 self.assertEqual(os.read(fd, 1024), str(n).encode()) 2733 2734 def registerRecvmsgResult(self, result): 2735 self.addCleanup(self.closeRecvmsgFDs, result) 2736 2737 def closeRecvmsgFDs(self, recvmsg_result): 2738 # Close all file descriptors specified in the ancillary data 2739 # of the given return value from recvmsg() or recvmsg_into(). 2740 for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: 2741 if (cmsg_level == socket.SOL_SOCKET and 2742 cmsg_type == socket.SCM_RIGHTS): 2743 fds = array.array("i") 2744 fds.frombytes(cmsg_data[: 2745 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 2746 for fd in fds: 2747 os.close(fd) 2748 2749 def createAndSendFDs(self, n): 2750 # Send n new file descriptors created by newFDs() to the 2751 # server, with the constant MSG as the non-ancillary data. 2752 self.assertEqual( 2753 self.sendmsgToServer([MSG], 2754 [(socket.SOL_SOCKET, 2755 socket.SCM_RIGHTS, 2756 array.array("i", self.newFDs(n)))]), 2757 len(MSG)) 2758 2759 def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): 2760 # Check that constant MSG was received with numfds file 2761 # descriptors in a maximum of maxcmsgs control messages (which 2762 # must contain only complete integers). By default, check 2763 # that MSG_CTRUNC is unset, but ignore any flags in 2764 # ignoreflags. 2765 msg, ancdata, flags, addr = result 2766 self.assertEqual(msg, MSG) 2767 self.checkRecvmsgAddress(addr, self.cli_addr) 2768 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 2769 ignore=ignoreflags) 2770 2771 self.assertIsInstance(ancdata, list) 2772 self.assertLessEqual(len(ancdata), maxcmsgs) 2773 fds = array.array("i") 2774 for item in ancdata: 2775 self.assertIsInstance(item, tuple) 2776 cmsg_level, cmsg_type, cmsg_data = item 2777 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 2778 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 2779 self.assertIsInstance(cmsg_data, bytes) 2780 self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) 2781 fds.frombytes(cmsg_data) 2782 2783 self.assertEqual(len(fds), numfds) 2784 self.checkFDs(fds) 2785 2786 def testFDPassSimple(self): 2787 # Pass a single FD (array read from bytes object). 2788 self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, 2789 len(MSG), 10240)) 2790 2791 def _testFDPassSimple(self): 2792 self.assertEqual( 2793 self.sendmsgToServer( 2794 [MSG], 2795 [(socket.SOL_SOCKET, 2796 socket.SCM_RIGHTS, 2797 array.array("i", self.newFDs(1)).tobytes())]), 2798 len(MSG)) 2799 2800 def testMultipleFDPass(self): 2801 # Pass multiple FDs in a single array. 2802 self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, 2803 len(MSG), 10240)) 2804 2805 def _testMultipleFDPass(self): 2806 self.createAndSendFDs(4) 2807 2808 @requireAttrs(socket, "CMSG_SPACE") 2809 def testFDPassCMSG_SPACE(self): 2810 # Test using CMSG_SPACE() to calculate ancillary buffer size. 2811 self.checkRecvmsgFDs( 2812 4, self.doRecvmsg(self.serv_sock, len(MSG), 2813 socket.CMSG_SPACE(4 * SIZEOF_INT))) 2814 2815 @testFDPassCMSG_SPACE.client_skip 2816 def _testFDPassCMSG_SPACE(self): 2817 self.createAndSendFDs(4) 2818 2819 def testFDPassCMSG_LEN(self): 2820 # Test using CMSG_LEN() to calculate ancillary buffer size. 2821 self.checkRecvmsgFDs(1, 2822 self.doRecvmsg(self.serv_sock, len(MSG), 2823 socket.CMSG_LEN(4 * SIZEOF_INT)), 2824 # RFC 3542 says implementations may set 2825 # MSG_CTRUNC if there isn't enough space 2826 # for trailing padding. 2827 ignoreflags=socket.MSG_CTRUNC) 2828 2829 def _testFDPassCMSG_LEN(self): 2830 self.createAndSendFDs(1) 2831 2832 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2833 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2834 @requireAttrs(socket, "CMSG_SPACE") 2835 def testFDPassSeparate(self): 2836 # Pass two FDs in two separate arrays. Arrays may be combined 2837 # into a single control message by the OS. 2838 self.checkRecvmsgFDs(2, 2839 self.doRecvmsg(self.serv_sock, len(MSG), 10240), 2840 maxcmsgs=2) 2841 2842 @testFDPassSeparate.client_skip 2843 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2844 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2845 def _testFDPassSeparate(self): 2846 fd0, fd1 = self.newFDs(2) 2847 self.assertEqual( 2848 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 2849 socket.SCM_RIGHTS, 2850 array.array("i", [fd0])), 2851 (socket.SOL_SOCKET, 2852 socket.SCM_RIGHTS, 2853 array.array("i", [fd1]))]), 2854 len(MSG)) 2855 2856 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2857 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2858 @requireAttrs(socket, "CMSG_SPACE") 2859 def testFDPassSeparateMinSpace(self): 2860 # Pass two FDs in two separate arrays, receiving them into the 2861 # minimum space for two arrays. 2862 self.checkRecvmsgFDs(2, 2863 self.doRecvmsg(self.serv_sock, len(MSG), 2864 socket.CMSG_SPACE(SIZEOF_INT) + 2865 socket.CMSG_LEN(SIZEOF_INT)), 2866 maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) 2867 2868 @testFDPassSeparateMinSpace.client_skip 2869 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2870 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2871 def _testFDPassSeparateMinSpace(self): 2872 fd0, fd1 = self.newFDs(2) 2873 self.assertEqual( 2874 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 2875 socket.SCM_RIGHTS, 2876 array.array("i", [fd0])), 2877 (socket.SOL_SOCKET, 2878 socket.SCM_RIGHTS, 2879 array.array("i", [fd1]))]), 2880 len(MSG)) 2881 2882 def sendAncillaryIfPossible(self, msg, ancdata): 2883 # Try to send msg and ancdata to server, but if the system 2884 # call fails, just send msg with no ancillary data. 2885 try: 2886 nbytes = self.sendmsgToServer([msg], ancdata) 2887 except OSError as e: 2888 # Check that it was the system call that failed 2889 self.assertIsInstance(e.errno, int) 2890 nbytes = self.sendmsgToServer([msg]) 2891 self.assertEqual(nbytes, len(msg)) 2892 2893 @unittest.skipIf(sys.platform == "darwin", "see issue #24725") 2894 def testFDPassEmpty(self): 2895 # Try to pass an empty FD array. Can receive either no array 2896 # or an empty array. 2897 self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, 2898 len(MSG), 10240), 2899 ignoreflags=socket.MSG_CTRUNC) 2900 2901 def _testFDPassEmpty(self): 2902 self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, 2903 socket.SCM_RIGHTS, 2904 b"")]) 2905 2906 def testFDPassPartialInt(self): 2907 # Try to pass a truncated FD array. 2908 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2909 len(MSG), 10240) 2910 self.assertEqual(msg, MSG) 2911 self.checkRecvmsgAddress(addr, self.cli_addr) 2912 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 2913 self.assertLessEqual(len(ancdata), 1) 2914 for cmsg_level, cmsg_type, cmsg_data in ancdata: 2915 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 2916 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 2917 self.assertLess(len(cmsg_data), SIZEOF_INT) 2918 2919 def _testFDPassPartialInt(self): 2920 self.sendAncillaryIfPossible( 2921 MSG, 2922 [(socket.SOL_SOCKET, 2923 socket.SCM_RIGHTS, 2924 array.array("i", [self.badfd]).tobytes()[:-1])]) 2925 2926 @requireAttrs(socket, "CMSG_SPACE") 2927 def testFDPassPartialIntInMiddle(self): 2928 # Try to pass two FD arrays, the first of which is truncated. 2929 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2930 len(MSG), 10240) 2931 self.assertEqual(msg, MSG) 2932 self.checkRecvmsgAddress(addr, self.cli_addr) 2933 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 2934 self.assertLessEqual(len(ancdata), 2) 2935 fds = array.array("i") 2936 # Arrays may have been combined in a single control message 2937 for cmsg_level, cmsg_type, cmsg_data in ancdata: 2938 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 2939 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 2940 fds.frombytes(cmsg_data[: 2941 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 2942 self.assertLessEqual(len(fds), 2) 2943 self.checkFDs(fds) 2944 2945 @testFDPassPartialIntInMiddle.client_skip 2946 def _testFDPassPartialIntInMiddle(self): 2947 fd0, fd1 = self.newFDs(2) 2948 self.sendAncillaryIfPossible( 2949 MSG, 2950 [(socket.SOL_SOCKET, 2951 socket.SCM_RIGHTS, 2952 array.array("i", [fd0, self.badfd]).tobytes()[:-1]), 2953 (socket.SOL_SOCKET, 2954 socket.SCM_RIGHTS, 2955 array.array("i", [fd1]))]) 2956 2957 def checkTruncatedHeader(self, result, ignoreflags=0): 2958 # Check that no ancillary data items are returned when data is 2959 # truncated inside the cmsghdr structure. 2960 msg, ancdata, flags, addr = result 2961 self.assertEqual(msg, MSG) 2962 self.checkRecvmsgAddress(addr, self.cli_addr) 2963 self.assertEqual(ancdata, []) 2964 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 2965 ignore=ignoreflags) 2966 2967 def testCmsgTruncNoBufSize(self): 2968 # Check that no ancillary data is received when no buffer size 2969 # is specified. 2970 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), 2971 # BSD seems to set MSG_CTRUNC only 2972 # if an item has been partially 2973 # received. 2974 ignoreflags=socket.MSG_CTRUNC) 2975 2976 def _testCmsgTruncNoBufSize(self): 2977 self.createAndSendFDs(1) 2978 2979 def testCmsgTrunc0(self): 2980 # Check that no ancillary data is received when buffer size is 0. 2981 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), 2982 ignoreflags=socket.MSG_CTRUNC) 2983 2984 def _testCmsgTrunc0(self): 2985 self.createAndSendFDs(1) 2986 2987 # Check that no ancillary data is returned for various non-zero 2988 # (but still too small) buffer sizes. 2989 2990 def testCmsgTrunc1(self): 2991 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) 2992 2993 def _testCmsgTrunc1(self): 2994 self.createAndSendFDs(1) 2995 2996 def testCmsgTrunc2Int(self): 2997 # The cmsghdr structure has at least three members, two of 2998 # which are ints, so we still shouldn't see any ancillary 2999 # data. 3000 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3001 SIZEOF_INT * 2)) 3002 3003 def _testCmsgTrunc2Int(self): 3004 self.createAndSendFDs(1) 3005 3006 def testCmsgTruncLen0Minus1(self): 3007 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3008 socket.CMSG_LEN(0) - 1)) 3009 3010 def _testCmsgTruncLen0Minus1(self): 3011 self.createAndSendFDs(1) 3012 3013 # The following tests try to truncate the control message in the 3014 # middle of the FD array. 3015 3016 def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): 3017 # Check that file descriptor data is truncated to between 3018 # mindata and maxdata bytes when received with buffer size 3019 # ancbuf, and that any complete file descriptor numbers are 3020 # valid. 3021 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3022 len(MSG), ancbuf) 3023 self.assertEqual(msg, MSG) 3024 self.checkRecvmsgAddress(addr, self.cli_addr) 3025 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3026 3027 if mindata == 0 and ancdata == []: 3028 return 3029 self.assertEqual(len(ancdata), 1) 3030 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3031 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3032 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3033 self.assertGreaterEqual(len(cmsg_data), mindata) 3034 self.assertLessEqual(len(cmsg_data), maxdata) 3035 fds = array.array("i") 3036 fds.frombytes(cmsg_data[: 3037 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3038 self.checkFDs(fds) 3039 3040 def testCmsgTruncLen0(self): 3041 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) 3042 3043 def _testCmsgTruncLen0(self): 3044 self.createAndSendFDs(1) 3045 3046 def testCmsgTruncLen0Plus1(self): 3047 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) 3048 3049 def _testCmsgTruncLen0Plus1(self): 3050 self.createAndSendFDs(2) 3051 3052 def testCmsgTruncLen1(self): 3053 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), 3054 maxdata=SIZEOF_INT) 3055 3056 def _testCmsgTruncLen1(self): 3057 self.createAndSendFDs(2) 3058 3059 def testCmsgTruncLen2Minus1(self): 3060 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, 3061 maxdata=(2 * SIZEOF_INT) - 1) 3062 3063 def _testCmsgTruncLen2Minus1(self): 3064 self.createAndSendFDs(2) 3065 3066 3067class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): 3068 # Test sendmsg() and recvmsg[_into]() using the ancillary data 3069 # features of the RFC 3542 Advanced Sockets API for IPv6. 3070 # Currently we can only handle certain data items (e.g. traffic 3071 # class, hop limit, MTU discovery and fragmentation settings) 3072 # without resorting to unportable means such as the struct module, 3073 # but the tests here are aimed at testing the ancillary data 3074 # handling in sendmsg() and recvmsg() rather than the IPv6 API 3075 # itself. 3076 3077 # Test value to use when setting hop limit of packet 3078 hop_limit = 2 3079 3080 # Test value to use when setting traffic class of packet. 3081 # -1 means "use kernel default". 3082 traffic_class = -1 3083 3084 def ancillaryMapping(self, ancdata): 3085 # Given ancillary data list ancdata, return a mapping from 3086 # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. 3087 # Check that no (level, type) pair appears more than once. 3088 d = {} 3089 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3090 self.assertNotIn((cmsg_level, cmsg_type), d) 3091 d[(cmsg_level, cmsg_type)] = cmsg_data 3092 return d 3093 3094 def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): 3095 # Receive hop limit into ancbufsize bytes of ancillary data 3096 # space. Check that data is MSG, ancillary data is not 3097 # truncated (but ignore any flags in ignoreflags), and hop 3098 # limit is between 0 and maxhop inclusive. 3099 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3100 socket.IPV6_RECVHOPLIMIT, 1) 3101 self.misc_event.set() 3102 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3103 len(MSG), ancbufsize) 3104 3105 self.assertEqual(msg, MSG) 3106 self.checkRecvmsgAddress(addr, self.cli_addr) 3107 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3108 ignore=ignoreflags) 3109 3110 self.assertEqual(len(ancdata), 1) 3111 self.assertIsInstance(ancdata[0], tuple) 3112 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3113 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3114 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3115 self.assertIsInstance(cmsg_data, bytes) 3116 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3117 a = array.array("i") 3118 a.frombytes(cmsg_data) 3119 self.assertGreaterEqual(a[0], 0) 3120 self.assertLessEqual(a[0], maxhop) 3121 3122 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3123 def testRecvHopLimit(self): 3124 # Test receiving the packet hop limit as ancillary data. 3125 self.checkHopLimit(ancbufsize=10240) 3126 3127 @testRecvHopLimit.client_skip 3128 def _testRecvHopLimit(self): 3129 # Need to wait until server has asked to receive ancillary 3130 # data, as implementations are not required to buffer it 3131 # otherwise. 3132 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3133 self.sendToServer(MSG) 3134 3135 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3136 def testRecvHopLimitCMSG_SPACE(self): 3137 # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. 3138 self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) 3139 3140 @testRecvHopLimitCMSG_SPACE.client_skip 3141 def _testRecvHopLimitCMSG_SPACE(self): 3142 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3143 self.sendToServer(MSG) 3144 3145 # Could test receiving into buffer sized using CMSG_LEN, but RFC 3146 # 3542 says portable applications must provide space for trailing 3147 # padding. Implementations may set MSG_CTRUNC if there isn't 3148 # enough space for the padding. 3149 3150 @requireAttrs(socket.socket, "sendmsg") 3151 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3152 def testSetHopLimit(self): 3153 # Test setting hop limit on outgoing packet and receiving it 3154 # at the other end. 3155 self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) 3156 3157 @testSetHopLimit.client_skip 3158 def _testSetHopLimit(self): 3159 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3160 self.assertEqual( 3161 self.sendmsgToServer([MSG], 3162 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3163 array.array("i", [self.hop_limit]))]), 3164 len(MSG)) 3165 3166 def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, 3167 ignoreflags=0): 3168 # Receive traffic class and hop limit into ancbufsize bytes of 3169 # ancillary data space. Check that data is MSG, ancillary 3170 # data is not truncated (but ignore any flags in ignoreflags), 3171 # and traffic class and hop limit are in range (hop limit no 3172 # more than maxhop). 3173 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3174 socket.IPV6_RECVHOPLIMIT, 1) 3175 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3176 socket.IPV6_RECVTCLASS, 1) 3177 self.misc_event.set() 3178 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3179 len(MSG), ancbufsize) 3180 3181 self.assertEqual(msg, MSG) 3182 self.checkRecvmsgAddress(addr, self.cli_addr) 3183 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3184 ignore=ignoreflags) 3185 self.assertEqual(len(ancdata), 2) 3186 ancmap = self.ancillaryMapping(ancdata) 3187 3188 tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] 3189 self.assertEqual(len(tcdata), SIZEOF_INT) 3190 a = array.array("i") 3191 a.frombytes(tcdata) 3192 self.assertGreaterEqual(a[0], 0) 3193 self.assertLessEqual(a[0], 255) 3194 3195 hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] 3196 self.assertEqual(len(hldata), SIZEOF_INT) 3197 a = array.array("i") 3198 a.frombytes(hldata) 3199 self.assertGreaterEqual(a[0], 0) 3200 self.assertLessEqual(a[0], maxhop) 3201 3202 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3203 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3204 def testRecvTrafficClassAndHopLimit(self): 3205 # Test receiving traffic class and hop limit as ancillary data. 3206 self.checkTrafficClassAndHopLimit(ancbufsize=10240) 3207 3208 @testRecvTrafficClassAndHopLimit.client_skip 3209 def _testRecvTrafficClassAndHopLimit(self): 3210 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3211 self.sendToServer(MSG) 3212 3213 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3214 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3215 def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3216 # Test receiving traffic class and hop limit, using 3217 # CMSG_SPACE() to calculate buffer size. 3218 self.checkTrafficClassAndHopLimit( 3219 ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) 3220 3221 @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip 3222 def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3223 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3224 self.sendToServer(MSG) 3225 3226 @requireAttrs(socket.socket, "sendmsg") 3227 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3228 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3229 def testSetTrafficClassAndHopLimit(self): 3230 # Test setting traffic class and hop limit on outgoing packet, 3231 # and receiving them at the other end. 3232 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3233 maxhop=self.hop_limit) 3234 3235 @testSetTrafficClassAndHopLimit.client_skip 3236 def _testSetTrafficClassAndHopLimit(self): 3237 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3238 self.assertEqual( 3239 self.sendmsgToServer([MSG], 3240 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3241 array.array("i", [self.traffic_class])), 3242 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3243 array.array("i", [self.hop_limit]))]), 3244 len(MSG)) 3245 3246 @requireAttrs(socket.socket, "sendmsg") 3247 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3248 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3249 def testOddCmsgSize(self): 3250 # Try to send ancillary data with first item one byte too 3251 # long. Fall back to sending with correct size if this fails, 3252 # and check that second item was handled correctly. 3253 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3254 maxhop=self.hop_limit) 3255 3256 @testOddCmsgSize.client_skip 3257 def _testOddCmsgSize(self): 3258 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3259 try: 3260 nbytes = self.sendmsgToServer( 3261 [MSG], 3262 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3263 array.array("i", [self.traffic_class]).tobytes() + b"\x00"), 3264 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3265 array.array("i", [self.hop_limit]))]) 3266 except OSError as e: 3267 self.assertIsInstance(e.errno, int) 3268 nbytes = self.sendmsgToServer( 3269 [MSG], 3270 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3271 array.array("i", [self.traffic_class])), 3272 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3273 array.array("i", [self.hop_limit]))]) 3274 self.assertEqual(nbytes, len(MSG)) 3275 3276 # Tests for proper handling of truncated ancillary data 3277 3278 def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): 3279 # Receive hop limit into ancbufsize bytes of ancillary data 3280 # space, which should be too small to contain the ancillary 3281 # data header (if ancbufsize is None, pass no second argument 3282 # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set 3283 # (unless included in ignoreflags), and no ancillary data is 3284 # returned. 3285 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3286 socket.IPV6_RECVHOPLIMIT, 1) 3287 self.misc_event.set() 3288 args = () if ancbufsize is None else (ancbufsize,) 3289 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3290 len(MSG), *args) 3291 3292 self.assertEqual(msg, MSG) 3293 self.checkRecvmsgAddress(addr, self.cli_addr) 3294 self.assertEqual(ancdata, []) 3295 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3296 ignore=ignoreflags) 3297 3298 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3299 def testCmsgTruncNoBufSize(self): 3300 # Check that no ancillary data is received when no ancillary 3301 # buffer size is provided. 3302 self.checkHopLimitTruncatedHeader(ancbufsize=None, 3303 # BSD seems to set 3304 # MSG_CTRUNC only if an item 3305 # has been partially 3306 # received. 3307 ignoreflags=socket.MSG_CTRUNC) 3308 3309 @testCmsgTruncNoBufSize.client_skip 3310 def _testCmsgTruncNoBufSize(self): 3311 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3312 self.sendToServer(MSG) 3313 3314 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3315 def testSingleCmsgTrunc0(self): 3316 # Check that no ancillary data is received when ancillary 3317 # buffer size is zero. 3318 self.checkHopLimitTruncatedHeader(ancbufsize=0, 3319 ignoreflags=socket.MSG_CTRUNC) 3320 3321 @testSingleCmsgTrunc0.client_skip 3322 def _testSingleCmsgTrunc0(self): 3323 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3324 self.sendToServer(MSG) 3325 3326 # Check that no ancillary data is returned for various non-zero 3327 # (but still too small) buffer sizes. 3328 3329 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3330 def testSingleCmsgTrunc1(self): 3331 self.checkHopLimitTruncatedHeader(ancbufsize=1) 3332 3333 @testSingleCmsgTrunc1.client_skip 3334 def _testSingleCmsgTrunc1(self): 3335 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3336 self.sendToServer(MSG) 3337 3338 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3339 def testSingleCmsgTrunc2Int(self): 3340 self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) 3341 3342 @testSingleCmsgTrunc2Int.client_skip 3343 def _testSingleCmsgTrunc2Int(self): 3344 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3345 self.sendToServer(MSG) 3346 3347 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3348 def testSingleCmsgTruncLen0Minus1(self): 3349 self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) 3350 3351 @testSingleCmsgTruncLen0Minus1.client_skip 3352 def _testSingleCmsgTruncLen0Minus1(self): 3353 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3354 self.sendToServer(MSG) 3355 3356 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3357 def testSingleCmsgTruncInData(self): 3358 # Test truncation of a control message inside its associated 3359 # data. The message may be returned with its data truncated, 3360 # or not returned at all. 3361 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3362 socket.IPV6_RECVHOPLIMIT, 1) 3363 self.misc_event.set() 3364 msg, ancdata, flags, addr = self.doRecvmsg( 3365 self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) 3366 3367 self.assertEqual(msg, MSG) 3368 self.checkRecvmsgAddress(addr, self.cli_addr) 3369 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3370 3371 self.assertLessEqual(len(ancdata), 1) 3372 if ancdata: 3373 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3374 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3375 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3376 self.assertLess(len(cmsg_data), SIZEOF_INT) 3377 3378 @testSingleCmsgTruncInData.client_skip 3379 def _testSingleCmsgTruncInData(self): 3380 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3381 self.sendToServer(MSG) 3382 3383 def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): 3384 # Receive traffic class and hop limit into ancbufsize bytes of 3385 # ancillary data space, which should be large enough to 3386 # contain the first item, but too small to contain the header 3387 # of the second. Check that data is MSG, MSG_CTRUNC is set 3388 # (unless included in ignoreflags), and only one ancillary 3389 # data item is returned. 3390 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3391 socket.IPV6_RECVHOPLIMIT, 1) 3392 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3393 socket.IPV6_RECVTCLASS, 1) 3394 self.misc_event.set() 3395 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3396 len(MSG), ancbufsize) 3397 3398 self.assertEqual(msg, MSG) 3399 self.checkRecvmsgAddress(addr, self.cli_addr) 3400 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3401 ignore=ignoreflags) 3402 3403 self.assertEqual(len(ancdata), 1) 3404 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3405 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3406 self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) 3407 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3408 a = array.array("i") 3409 a.frombytes(cmsg_data) 3410 self.assertGreaterEqual(a[0], 0) 3411 self.assertLessEqual(a[0], 255) 3412 3413 # Try the above test with various buffer sizes. 3414 3415 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3416 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3417 def testSecondCmsgTrunc0(self): 3418 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), 3419 ignoreflags=socket.MSG_CTRUNC) 3420 3421 @testSecondCmsgTrunc0.client_skip 3422 def _testSecondCmsgTrunc0(self): 3423 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3424 self.sendToServer(MSG) 3425 3426 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3427 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3428 def testSecondCmsgTrunc1(self): 3429 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) 3430 3431 @testSecondCmsgTrunc1.client_skip 3432 def _testSecondCmsgTrunc1(self): 3433 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3434 self.sendToServer(MSG) 3435 3436 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3437 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3438 def testSecondCmsgTrunc2Int(self): 3439 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 3440 2 * SIZEOF_INT) 3441 3442 @testSecondCmsgTrunc2Int.client_skip 3443 def _testSecondCmsgTrunc2Int(self): 3444 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3445 self.sendToServer(MSG) 3446 3447 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3448 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3449 def testSecondCmsgTruncLen0Minus1(self): 3450 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 3451 socket.CMSG_LEN(0) - 1) 3452 3453 @testSecondCmsgTruncLen0Minus1.client_skip 3454 def _testSecondCmsgTruncLen0Minus1(self): 3455 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3456 self.sendToServer(MSG) 3457 3458 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3459 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3460 def testSecomdCmsgTruncInData(self): 3461 # Test truncation of the second of two control messages inside 3462 # its associated data. 3463 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3464 socket.IPV6_RECVHOPLIMIT, 1) 3465 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3466 socket.IPV6_RECVTCLASS, 1) 3467 self.misc_event.set() 3468 msg, ancdata, flags, addr = self.doRecvmsg( 3469 self.serv_sock, len(MSG), 3470 socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) 3471 3472 self.assertEqual(msg, MSG) 3473 self.checkRecvmsgAddress(addr, self.cli_addr) 3474 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3475 3476 cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} 3477 3478 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 3479 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3480 cmsg_types.remove(cmsg_type) 3481 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3482 a = array.array("i") 3483 a.frombytes(cmsg_data) 3484 self.assertGreaterEqual(a[0], 0) 3485 self.assertLessEqual(a[0], 255) 3486 3487 if ancdata: 3488 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 3489 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3490 cmsg_types.remove(cmsg_type) 3491 self.assertLess(len(cmsg_data), SIZEOF_INT) 3492 3493 self.assertEqual(ancdata, []) 3494 3495 @testSecomdCmsgTruncInData.client_skip 3496 def _testSecomdCmsgTruncInData(self): 3497 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3498 self.sendToServer(MSG) 3499 3500 3501# Derive concrete test classes for different socket types. 3502 3503class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, 3504 SendrecvmsgConnectionlessBase, 3505 ThreadedSocketTestMixin, UDPTestBase): 3506 pass 3507 3508@requireAttrs(socket.socket, "sendmsg") 3509@unittest.skipUnless(thread, 'Threading required for this test.') 3510class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): 3511 pass 3512 3513@requireAttrs(socket.socket, "recvmsg") 3514@unittest.skipUnless(thread, 'Threading required for this test.') 3515class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): 3516 pass 3517 3518@requireAttrs(socket.socket, "recvmsg_into") 3519@unittest.skipUnless(thread, 'Threading required for this test.') 3520class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): 3521 pass 3522 3523 3524class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, 3525 SendrecvmsgConnectionlessBase, 3526 ThreadedSocketTestMixin, UDP6TestBase): 3527 3528 def checkRecvmsgAddress(self, addr1, addr2): 3529 # Called to compare the received address with the address of 3530 # the peer, ignoring scope ID 3531 self.assertEqual(addr1[:-1], addr2[:-1]) 3532 3533@requireAttrs(socket.socket, "sendmsg") 3534@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3535@requireSocket("AF_INET6", "SOCK_DGRAM") 3536@unittest.skipUnless(thread, 'Threading required for this test.') 3537class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): 3538 pass 3539 3540@requireAttrs(socket.socket, "recvmsg") 3541@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3542@requireSocket("AF_INET6", "SOCK_DGRAM") 3543@unittest.skipUnless(thread, 'Threading required for this test.') 3544class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): 3545 pass 3546 3547@requireAttrs(socket.socket, "recvmsg_into") 3548@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3549@requireSocket("AF_INET6", "SOCK_DGRAM") 3550@unittest.skipUnless(thread, 'Threading required for this test.') 3551class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): 3552 pass 3553 3554@requireAttrs(socket.socket, "recvmsg") 3555@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3556@requireAttrs(socket, "IPPROTO_IPV6") 3557@requireSocket("AF_INET6", "SOCK_DGRAM") 3558@unittest.skipUnless(thread, 'Threading required for this test.') 3559class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, 3560 SendrecvmsgUDP6TestBase): 3561 pass 3562 3563@requireAttrs(socket.socket, "recvmsg_into") 3564@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3565@requireAttrs(socket, "IPPROTO_IPV6") 3566@requireSocket("AF_INET6", "SOCK_DGRAM") 3567@unittest.skipUnless(thread, 'Threading required for this test.') 3568class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, 3569 RFC3542AncillaryTest, 3570 SendrecvmsgUDP6TestBase): 3571 pass 3572 3573 3574class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, 3575 ConnectedStreamTestMixin, TCPTestBase): 3576 pass 3577 3578@requireAttrs(socket.socket, "sendmsg") 3579@unittest.skipUnless(thread, 'Threading required for this test.') 3580class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): 3581 pass 3582 3583@requireAttrs(socket.socket, "recvmsg") 3584@unittest.skipUnless(thread, 'Threading required for this test.') 3585class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, 3586 SendrecvmsgTCPTestBase): 3587 pass 3588 3589@requireAttrs(socket.socket, "recvmsg_into") 3590@unittest.skipUnless(thread, 'Threading required for this test.') 3591class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 3592 SendrecvmsgTCPTestBase): 3593 pass 3594 3595 3596class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, 3597 SendrecvmsgConnectedBase, 3598 ConnectedStreamTestMixin, SCTPStreamBase): 3599 pass 3600 3601@requireAttrs(socket.socket, "sendmsg") 3602@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 3603@unittest.skipUnless(thread, 'Threading required for this test.') 3604class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): 3605 pass 3606 3607@requireAttrs(socket.socket, "recvmsg") 3608@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 3609@unittest.skipUnless(thread, 'Threading required for this test.') 3610class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 3611 SendrecvmsgSCTPStreamTestBase): 3612 3613 def testRecvmsgEOF(self): 3614 try: 3615 super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() 3616 except OSError as e: 3617 if e.errno != errno.ENOTCONN: 3618 raise 3619 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 3620 3621@requireAttrs(socket.socket, "recvmsg_into") 3622@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 3623@unittest.skipUnless(thread, 'Threading required for this test.') 3624class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 3625 SendrecvmsgSCTPStreamTestBase): 3626 3627 def testRecvmsgEOF(self): 3628 try: 3629 super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() 3630 except OSError as e: 3631 if e.errno != errno.ENOTCONN: 3632 raise 3633 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 3634 3635 3636class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, 3637 ConnectedStreamTestMixin, UnixStreamBase): 3638 pass 3639 3640@requireAttrs(socket.socket, "sendmsg") 3641@requireAttrs(socket, "AF_UNIX") 3642@unittest.skipUnless(thread, 'Threading required for this test.') 3643class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): 3644 pass 3645 3646@requireAttrs(socket.socket, "recvmsg") 3647@requireAttrs(socket, "AF_UNIX") 3648@unittest.skipUnless(thread, 'Threading required for this test.') 3649class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 3650 SendrecvmsgUnixStreamTestBase): 3651 pass 3652 3653@requireAttrs(socket.socket, "recvmsg_into") 3654@requireAttrs(socket, "AF_UNIX") 3655@unittest.skipUnless(thread, 'Threading required for this test.') 3656class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 3657 SendrecvmsgUnixStreamTestBase): 3658 pass 3659 3660@requireAttrs(socket.socket, "sendmsg", "recvmsg") 3661@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 3662@unittest.skipUnless(thread, 'Threading required for this test.') 3663class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): 3664 pass 3665 3666@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") 3667@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 3668@unittest.skipUnless(thread, 'Threading required for this test.') 3669class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, 3670 SendrecvmsgUnixStreamTestBase): 3671 pass 3672 3673 3674# Test interrupting the interruptible send/receive methods with a 3675# signal when a timeout is set. These tests avoid having multiple 3676# threads alive during the test so that the OS cannot deliver the 3677# signal to the wrong one. 3678 3679class InterruptedTimeoutBase(unittest.TestCase): 3680 # Base class for interrupted send/receive tests. Installs an 3681 # empty handler for SIGALRM and removes it on teardown, along with 3682 # any scheduled alarms. 3683 3684 def setUp(self): 3685 super().setUp() 3686 orig_alrm_handler = signal.signal(signal.SIGALRM, 3687 lambda signum, frame: 1 / 0) 3688 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) 3689 self.addCleanup(self.setAlarm, 0) 3690 3691 # Timeout for socket operations 3692 timeout = 4.0 3693 3694 # Provide setAlarm() method to schedule delivery of SIGALRM after 3695 # given number of seconds, or cancel it if zero, and an 3696 # appropriate time value to use. Use setitimer() if available. 3697 if hasattr(signal, "setitimer"): 3698 alarm_time = 0.05 3699 3700 def setAlarm(self, seconds): 3701 signal.setitimer(signal.ITIMER_REAL, seconds) 3702 else: 3703 # Old systems may deliver the alarm up to one second early 3704 alarm_time = 2 3705 3706 def setAlarm(self, seconds): 3707 signal.alarm(seconds) 3708 3709 3710# Require siginterrupt() in order to ensure that system calls are 3711# interrupted by default. 3712@requireAttrs(signal, "siginterrupt") 3713@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 3714 "Don't have signal.alarm or signal.setitimer") 3715class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): 3716 # Test interrupting the recv*() methods with signals when a 3717 # timeout is set. 3718 3719 def setUp(self): 3720 super().setUp() 3721 self.serv.settimeout(self.timeout) 3722 3723 def checkInterruptedRecv(self, func, *args, **kwargs): 3724 # Check that func(*args, **kwargs) raises 3725 # errno of EINTR when interrupted by a signal. 3726 self.setAlarm(self.alarm_time) 3727 with self.assertRaises(ZeroDivisionError) as cm: 3728 func(*args, **kwargs) 3729 3730 def testInterruptedRecvTimeout(self): 3731 self.checkInterruptedRecv(self.serv.recv, 1024) 3732 3733 def testInterruptedRecvIntoTimeout(self): 3734 self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) 3735 3736 def testInterruptedRecvfromTimeout(self): 3737 self.checkInterruptedRecv(self.serv.recvfrom, 1024) 3738 3739 def testInterruptedRecvfromIntoTimeout(self): 3740 self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) 3741 3742 @requireAttrs(socket.socket, "recvmsg") 3743 def testInterruptedRecvmsgTimeout(self): 3744 self.checkInterruptedRecv(self.serv.recvmsg, 1024) 3745 3746 @requireAttrs(socket.socket, "recvmsg_into") 3747 def testInterruptedRecvmsgIntoTimeout(self): 3748 self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) 3749 3750 3751# Require siginterrupt() in order to ensure that system calls are 3752# interrupted by default. 3753@requireAttrs(signal, "siginterrupt") 3754@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 3755 "Don't have signal.alarm or signal.setitimer") 3756@unittest.skipUnless(thread, 'Threading required for this test.') 3757class InterruptedSendTimeoutTest(InterruptedTimeoutBase, 3758 ThreadSafeCleanupTestCase, 3759 SocketListeningTestMixin, TCPTestBase): 3760 # Test interrupting the interruptible send*() methods with signals 3761 # when a timeout is set. 3762 3763 def setUp(self): 3764 super().setUp() 3765 self.serv_conn = self.newSocket() 3766 self.addCleanup(self.serv_conn.close) 3767 # Use a thread to complete the connection, but wait for it to 3768 # terminate before running the test, so that there is only one 3769 # thread to accept the signal. 3770 cli_thread = threading.Thread(target=self.doConnect) 3771 cli_thread.start() 3772 self.cli_conn, addr = self.serv.accept() 3773 self.addCleanup(self.cli_conn.close) 3774 cli_thread.join() 3775 self.serv_conn.settimeout(self.timeout) 3776 3777 def doConnect(self): 3778 self.serv_conn.connect(self.serv_addr) 3779 3780 def checkInterruptedSend(self, func, *args, **kwargs): 3781 # Check that func(*args, **kwargs), run in a loop, raises 3782 # OSError with an errno of EINTR when interrupted by a 3783 # signal. 3784 with self.assertRaises(ZeroDivisionError) as cm: 3785 while True: 3786 self.setAlarm(self.alarm_time) 3787 func(*args, **kwargs) 3788 3789 # Issue #12958: The following tests have problems on OS X prior to 10.7 3790 @support.requires_mac_ver(10, 7) 3791 def testInterruptedSendTimeout(self): 3792 self.checkInterruptedSend(self.serv_conn.send, b"a"*512) 3793 3794 @support.requires_mac_ver(10, 7) 3795 def testInterruptedSendtoTimeout(self): 3796 # Passing an actual address here as Python's wrapper for 3797 # sendto() doesn't allow passing a zero-length one; POSIX 3798 # requires that the address is ignored since the socket is 3799 # connection-mode, however. 3800 self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, 3801 self.serv_addr) 3802 3803 @support.requires_mac_ver(10, 7) 3804 @requireAttrs(socket.socket, "sendmsg") 3805 def testInterruptedSendmsgTimeout(self): 3806 self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) 3807 3808 3809@unittest.skipUnless(thread, 'Threading required for this test.') 3810class TCPCloserTest(ThreadedTCPSocketTest): 3811 3812 def testClose(self): 3813 conn, addr = self.serv.accept() 3814 conn.close() 3815 3816 sd = self.cli 3817 read, write, err = select.select([sd], [], [], 1.0) 3818 self.assertEqual(read, [sd]) 3819 self.assertEqual(sd.recv(1), b'') 3820 3821 # Calling close() many times should be safe. 3822 conn.close() 3823 conn.close() 3824 3825 def _testClose(self): 3826 self.cli.connect((HOST, self.port)) 3827 time.sleep(1.0) 3828 3829@unittest.skipUnless(thread, 'Threading required for this test.') 3830class BasicSocketPairTest(SocketPairTest): 3831 3832 def __init__(self, methodName='runTest'): 3833 SocketPairTest.__init__(self, methodName=methodName) 3834 3835 def _check_defaults(self, sock): 3836 self.assertIsInstance(sock, socket.socket) 3837 if hasattr(socket, 'AF_UNIX'): 3838 self.assertEqual(sock.family, socket.AF_UNIX) 3839 else: 3840 self.assertEqual(sock.family, socket.AF_INET) 3841 self.assertEqual(sock.type, socket.SOCK_STREAM) 3842 self.assertEqual(sock.proto, 0) 3843 3844 def _testDefaults(self): 3845 self._check_defaults(self.cli) 3846 3847 def testDefaults(self): 3848 self._check_defaults(self.serv) 3849 3850 def testRecv(self): 3851 msg = self.serv.recv(1024) 3852 self.assertEqual(msg, MSG) 3853 3854 def _testRecv(self): 3855 self.cli.send(MSG) 3856 3857 def testSend(self): 3858 self.serv.send(MSG) 3859 3860 def _testSend(self): 3861 msg = self.cli.recv(1024) 3862 self.assertEqual(msg, MSG) 3863 3864@unittest.skipUnless(thread, 'Threading required for this test.') 3865class NonBlockingTCPTests(ThreadedTCPSocketTest): 3866 3867 def __init__(self, methodName='runTest'): 3868 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 3869 3870 def testSetBlocking(self): 3871 # Testing whether set blocking works 3872 self.serv.setblocking(True) 3873 self.assertIsNone(self.serv.gettimeout()) 3874 self.serv.setblocking(False) 3875 self.assertEqual(self.serv.gettimeout(), 0.0) 3876 start = time.time() 3877 try: 3878 self.serv.accept() 3879 except OSError: 3880 pass 3881 end = time.time() 3882 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") 3883 3884 def _testSetBlocking(self): 3885 pass 3886 3887 @support.cpython_only 3888 def testSetBlocking_overflow(self): 3889 # Issue 15989 3890 import _testcapi 3891 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 3892 self.skipTest('needs UINT_MAX < ULONG_MAX') 3893 self.serv.setblocking(False) 3894 self.assertEqual(self.serv.gettimeout(), 0.0) 3895 self.serv.setblocking(_testcapi.UINT_MAX + 1) 3896 self.assertIsNone(self.serv.gettimeout()) 3897 3898 _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) 3899 3900 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 3901 'test needs socket.SOCK_NONBLOCK') 3902 @support.requires_linux_version(2, 6, 28) 3903 def testInitNonBlocking(self): 3904 # reinit server socket 3905 self.serv.close() 3906 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | 3907 socket.SOCK_NONBLOCK) 3908 self.port = support.bind_port(self.serv) 3909 self.serv.listen() 3910 # actual testing 3911 start = time.time() 3912 try: 3913 self.serv.accept() 3914 except OSError: 3915 pass 3916 end = time.time() 3917 self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") 3918 3919 def _testInitNonBlocking(self): 3920 pass 3921 3922 def testInheritFlags(self): 3923 # Issue #7995: when calling accept() on a listening socket with a 3924 # timeout, the resulting socket should not be non-blocking. 3925 self.serv.settimeout(10) 3926 try: 3927 conn, addr = self.serv.accept() 3928 message = conn.recv(len(MSG)) 3929 finally: 3930 conn.close() 3931 self.serv.settimeout(None) 3932 3933 def _testInheritFlags(self): 3934 time.sleep(0.1) 3935 self.cli.connect((HOST, self.port)) 3936 time.sleep(0.5) 3937 self.cli.send(MSG) 3938 3939 def testAccept(self): 3940 # Testing non-blocking accept 3941 self.serv.setblocking(0) 3942 try: 3943 conn, addr = self.serv.accept() 3944 except OSError: 3945 pass 3946 else: 3947 self.fail("Error trying to do non-blocking accept.") 3948 read, write, err = select.select([self.serv], [], []) 3949 if self.serv in read: 3950 conn, addr = self.serv.accept() 3951 self.assertIsNone(conn.gettimeout()) 3952 conn.close() 3953 else: 3954 self.fail("Error trying to do accept after select.") 3955 3956 def _testAccept(self): 3957 time.sleep(0.1) 3958 self.cli.connect((HOST, self.port)) 3959 3960 def testConnect(self): 3961 # Testing non-blocking connect 3962 conn, addr = self.serv.accept() 3963 conn.close() 3964 3965 def _testConnect(self): 3966 self.cli.settimeout(10) 3967 self.cli.connect((HOST, self.port)) 3968 3969 def testRecv(self): 3970 # Testing non-blocking recv 3971 conn, addr = self.serv.accept() 3972 conn.setblocking(0) 3973 try: 3974 msg = conn.recv(len(MSG)) 3975 except OSError: 3976 pass 3977 else: 3978 self.fail("Error trying to do non-blocking recv.") 3979 read, write, err = select.select([conn], [], []) 3980 if conn in read: 3981 msg = conn.recv(len(MSG)) 3982 conn.close() 3983 self.assertEqual(msg, MSG) 3984 else: 3985 self.fail("Error during select call to non-blocking socket.") 3986 3987 def _testRecv(self): 3988 self.cli.connect((HOST, self.port)) 3989 time.sleep(0.1) 3990 self.cli.send(MSG) 3991 3992@unittest.skipUnless(thread, 'Threading required for this test.') 3993class FileObjectClassTestCase(SocketConnectedTest): 3994 """Unit tests for the object returned by socket.makefile() 3995 3996 self.read_file is the io object returned by makefile() on 3997 the client connection. You can read from this file to 3998 get output from the server. 3999 4000 self.write_file is the io object returned by makefile() on the 4001 server connection. You can write to this file to send output 4002 to the client. 4003 """ 4004 4005 bufsize = -1 # Use default buffer size 4006 encoding = 'utf-8' 4007 errors = 'strict' 4008 newline = None 4009 4010 read_mode = 'rb' 4011 read_msg = MSG 4012 write_mode = 'wb' 4013 write_msg = MSG 4014 4015 def __init__(self, methodName='runTest'): 4016 SocketConnectedTest.__init__(self, methodName=methodName) 4017 4018 def setUp(self): 4019 self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ 4020 threading.Event() for i in range(4)] 4021 SocketConnectedTest.setUp(self) 4022 self.read_file = self.cli_conn.makefile( 4023 self.read_mode, self.bufsize, 4024 encoding = self.encoding, 4025 errors = self.errors, 4026 newline = self.newline) 4027 4028 def tearDown(self): 4029 self.serv_finished.set() 4030 self.read_file.close() 4031 self.assertTrue(self.read_file.closed) 4032 self.read_file = None 4033 SocketConnectedTest.tearDown(self) 4034 4035 def clientSetUp(self): 4036 SocketConnectedTest.clientSetUp(self) 4037 self.write_file = self.serv_conn.makefile( 4038 self.write_mode, self.bufsize, 4039 encoding = self.encoding, 4040 errors = self.errors, 4041 newline = self.newline) 4042 4043 def clientTearDown(self): 4044 self.cli_finished.set() 4045 self.write_file.close() 4046 self.assertTrue(self.write_file.closed) 4047 self.write_file = None 4048 SocketConnectedTest.clientTearDown(self) 4049 4050 def testReadAfterTimeout(self): 4051 # Issue #7322: A file object must disallow further reads 4052 # after a timeout has occurred. 4053 self.cli_conn.settimeout(1) 4054 self.read_file.read(3) 4055 # First read raises a timeout 4056 self.assertRaises(socket.timeout, self.read_file.read, 1) 4057 # Second read is disallowed 4058 with self.assertRaises(OSError) as ctx: 4059 self.read_file.read(1) 4060 self.assertIn("cannot read from timed out object", str(ctx.exception)) 4061 4062 def _testReadAfterTimeout(self): 4063 self.write_file.write(self.write_msg[0:3]) 4064 self.write_file.flush() 4065 self.serv_finished.wait() 4066 4067 def testSmallRead(self): 4068 # Performing small file read test 4069 first_seg = self.read_file.read(len(self.read_msg)-3) 4070 second_seg = self.read_file.read(3) 4071 msg = first_seg + second_seg 4072 self.assertEqual(msg, self.read_msg) 4073 4074 def _testSmallRead(self): 4075 self.write_file.write(self.write_msg) 4076 self.write_file.flush() 4077 4078 def testFullRead(self): 4079 # read until EOF 4080 msg = self.read_file.read() 4081 self.assertEqual(msg, self.read_msg) 4082 4083 def _testFullRead(self): 4084 self.write_file.write(self.write_msg) 4085 self.write_file.close() 4086 4087 def testUnbufferedRead(self): 4088 # Performing unbuffered file read test 4089 buf = type(self.read_msg)() 4090 while 1: 4091 char = self.read_file.read(1) 4092 if not char: 4093 break 4094 buf += char 4095 self.assertEqual(buf, self.read_msg) 4096 4097 def _testUnbufferedRead(self): 4098 self.write_file.write(self.write_msg) 4099 self.write_file.flush() 4100 4101 def testReadline(self): 4102 # Performing file readline test 4103 line = self.read_file.readline() 4104 self.assertEqual(line, self.read_msg) 4105 4106 def _testReadline(self): 4107 self.write_file.write(self.write_msg) 4108 self.write_file.flush() 4109 4110 def testCloseAfterMakefile(self): 4111 # The file returned by makefile should keep the socket open. 4112 self.cli_conn.close() 4113 # read until EOF 4114 msg = self.read_file.read() 4115 self.assertEqual(msg, self.read_msg) 4116 4117 def _testCloseAfterMakefile(self): 4118 self.write_file.write(self.write_msg) 4119 self.write_file.flush() 4120 4121 def testMakefileAfterMakefileClose(self): 4122 self.read_file.close() 4123 msg = self.cli_conn.recv(len(MSG)) 4124 if isinstance(self.read_msg, str): 4125 msg = msg.decode() 4126 self.assertEqual(msg, self.read_msg) 4127 4128 def _testMakefileAfterMakefileClose(self): 4129 self.write_file.write(self.write_msg) 4130 self.write_file.flush() 4131 4132 def testClosedAttr(self): 4133 self.assertTrue(not self.read_file.closed) 4134 4135 def _testClosedAttr(self): 4136 self.assertTrue(not self.write_file.closed) 4137 4138 def testAttributes(self): 4139 self.assertEqual(self.read_file.mode, self.read_mode) 4140 self.assertEqual(self.read_file.name, self.cli_conn.fileno()) 4141 4142 def _testAttributes(self): 4143 self.assertEqual(self.write_file.mode, self.write_mode) 4144 self.assertEqual(self.write_file.name, self.serv_conn.fileno()) 4145 4146 def testRealClose(self): 4147 self.read_file.close() 4148 self.assertRaises(ValueError, self.read_file.fileno) 4149 self.cli_conn.close() 4150 self.assertRaises(OSError, self.cli_conn.getsockname) 4151 4152 def _testRealClose(self): 4153 pass 4154 4155 4156class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 4157 4158 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 4159 4160 In this case (and in this case only), it should be possible to 4161 create a file object, read a line from it, create another file 4162 object, read another line from it, without loss of data in the 4163 first file object's buffer. Note that http.client relies on this 4164 when reading multiple requests from the same socket.""" 4165 4166 bufsize = 0 # Use unbuffered mode 4167 4168 def testUnbufferedReadline(self): 4169 # Read a line, create a new file object, read another line with it 4170 line = self.read_file.readline() # first line 4171 self.assertEqual(line, b"A. " + self.write_msg) # first line 4172 self.read_file = self.cli_conn.makefile('rb', 0) 4173 line = self.read_file.readline() # second line 4174 self.assertEqual(line, b"B. " + self.write_msg) # second line 4175 4176 def _testUnbufferedReadline(self): 4177 self.write_file.write(b"A. " + self.write_msg) 4178 self.write_file.write(b"B. " + self.write_msg) 4179 self.write_file.flush() 4180 4181 def testMakefileClose(self): 4182 # The file returned by makefile should keep the socket open... 4183 self.cli_conn.close() 4184 msg = self.cli_conn.recv(1024) 4185 self.assertEqual(msg, self.read_msg) 4186 # ...until the file is itself closed 4187 self.read_file.close() 4188 self.assertRaises(OSError, self.cli_conn.recv, 1024) 4189 4190 def _testMakefileClose(self): 4191 self.write_file.write(self.write_msg) 4192 self.write_file.flush() 4193 4194 def testMakefileCloseSocketDestroy(self): 4195 refcount_before = sys.getrefcount(self.cli_conn) 4196 self.read_file.close() 4197 refcount_after = sys.getrefcount(self.cli_conn) 4198 self.assertEqual(refcount_before - 1, refcount_after) 4199 4200 def _testMakefileCloseSocketDestroy(self): 4201 pass 4202 4203 # Non-blocking ops 4204 # NOTE: to set `read_file` as non-blocking, we must call 4205 # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). 4206 4207 def testSmallReadNonBlocking(self): 4208 self.cli_conn.setblocking(False) 4209 self.assertEqual(self.read_file.readinto(bytearray(10)), None) 4210 self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) 4211 self.evt1.set() 4212 self.evt2.wait(1.0) 4213 first_seg = self.read_file.read(len(self.read_msg) - 3) 4214 if first_seg is None: 4215 # Data not arrived (can happen under Windows), wait a bit 4216 time.sleep(0.5) 4217 first_seg = self.read_file.read(len(self.read_msg) - 3) 4218 buf = bytearray(10) 4219 n = self.read_file.readinto(buf) 4220 self.assertEqual(n, 3) 4221 msg = first_seg + buf[:n] 4222 self.assertEqual(msg, self.read_msg) 4223 self.assertEqual(self.read_file.readinto(bytearray(16)), None) 4224 self.assertEqual(self.read_file.read(1), None) 4225 4226 def _testSmallReadNonBlocking(self): 4227 self.evt1.wait(1.0) 4228 self.write_file.write(self.write_msg) 4229 self.write_file.flush() 4230 self.evt2.set() 4231 # Avoid cloding the socket before the server test has finished, 4232 # otherwise system recv() will return 0 instead of EWOULDBLOCK. 4233 self.serv_finished.wait(5.0) 4234 4235 def testWriteNonBlocking(self): 4236 self.cli_finished.wait(5.0) 4237 # The client thread can't skip directly - the SkipTest exception 4238 # would appear as a failure. 4239 if self.serv_skipped: 4240 self.skipTest(self.serv_skipped) 4241 4242 def _testWriteNonBlocking(self): 4243 self.serv_skipped = None 4244 self.serv_conn.setblocking(False) 4245 # Try to saturate the socket buffer pipe with repeated large writes. 4246 BIG = b"x" * support.SOCK_MAX_SIZE 4247 LIMIT = 10 4248 # The first write() succeeds since a chunk of data can be buffered 4249 n = self.write_file.write(BIG) 4250 self.assertGreater(n, 0) 4251 for i in range(LIMIT): 4252 n = self.write_file.write(BIG) 4253 if n is None: 4254 # Succeeded 4255 break 4256 self.assertGreater(n, 0) 4257 else: 4258 # Let us know that this test didn't manage to establish 4259 # the expected conditions. This is not a failure in itself but, 4260 # if it happens repeatedly, the test should be fixed. 4261 self.serv_skipped = "failed to saturate the socket buffer" 4262 4263 4264class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 4265 4266 bufsize = 1 # Default-buffered for reading; line-buffered for writing 4267 4268 4269class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 4270 4271 bufsize = 2 # Exercise the buffering code 4272 4273 4274class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): 4275 """Tests for socket.makefile() in text mode (rather than binary)""" 4276 4277 read_mode = 'r' 4278 read_msg = MSG.decode('utf-8') 4279 write_mode = 'wb' 4280 write_msg = MSG 4281 newline = '' 4282 4283 4284class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): 4285 """Tests for socket.makefile() in text mode (rather than binary)""" 4286 4287 read_mode = 'rb' 4288 read_msg = MSG 4289 write_mode = 'w' 4290 write_msg = MSG.decode('utf-8') 4291 newline = '' 4292 4293 4294class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): 4295 """Tests for socket.makefile() in text mode (rather than binary)""" 4296 4297 read_mode = 'r' 4298 read_msg = MSG.decode('utf-8') 4299 write_mode = 'w' 4300 write_msg = MSG.decode('utf-8') 4301 newline = '' 4302 4303 4304class NetworkConnectionTest(object): 4305 """Prove network connection.""" 4306 4307 def clientSetUp(self): 4308 # We're inherited below by BasicTCPTest2, which also inherits 4309 # BasicTCPTest, which defines self.port referenced below. 4310 self.cli = socket.create_connection((HOST, self.port)) 4311 self.serv_conn = self.cli 4312 4313class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 4314 """Tests that NetworkConnection does not break existing TCP functionality. 4315 """ 4316 4317class NetworkConnectionNoServer(unittest.TestCase): 4318 4319 class MockSocket(socket.socket): 4320 def connect(self, *args): 4321 raise socket.timeout('timed out') 4322 4323 @contextlib.contextmanager 4324 def mocked_socket_module(self): 4325 """Return a socket which times out on connect""" 4326 old_socket = socket.socket 4327 socket.socket = self.MockSocket 4328 try: 4329 yield 4330 finally: 4331 socket.socket = old_socket 4332 4333 def test_connect(self): 4334 port = support.find_unused_port() 4335 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4336 self.addCleanup(cli.close) 4337 with self.assertRaises(OSError) as cm: 4338 cli.connect((HOST, port)) 4339 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 4340 4341 def test_create_connection(self): 4342 # Issue #9792: errors raised by create_connection() should have 4343 # a proper errno attribute. 4344 port = support.find_unused_port() 4345 with self.assertRaises(OSError) as cm: 4346 socket.create_connection((HOST, port)) 4347 4348 # Issue #16257: create_connection() calls getaddrinfo() against 4349 # 'localhost'. This may result in an IPV6 addr being returned 4350 # as well as an IPV4 one: 4351 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 4352 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 4353 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 4354 # 4355 # create_connection() enumerates through all the addresses returned 4356 # and if it doesn't successfully bind to any of them, it propagates 4357 # the last exception it encountered. 4358 # 4359 # On Solaris, ENETUNREACH is returned in this circumstance instead 4360 # of ECONNREFUSED. So, if that errno exists, add it to our list of 4361 # expected errnos. 4362 expected_errnos = [ errno.ECONNREFUSED, ] 4363 if hasattr(errno, 'ENETUNREACH'): 4364 expected_errnos.append(errno.ENETUNREACH) 4365 4366 self.assertIn(cm.exception.errno, expected_errnos) 4367 4368 def test_create_connection_timeout(self): 4369 # Issue #9792: create_connection() should not recast timeout errors 4370 # as generic socket errors. 4371 with self.mocked_socket_module(): 4372 with self.assertRaises(socket.timeout): 4373 socket.create_connection((HOST, 1234)) 4374 4375 4376@unittest.skipUnless(thread, 'Threading required for this test.') 4377class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 4378 4379 def __init__(self, methodName='runTest'): 4380 SocketTCPTest.__init__(self, methodName=methodName) 4381 ThreadableTest.__init__(self) 4382 4383 def clientSetUp(self): 4384 self.source_port = support.find_unused_port() 4385 4386 def clientTearDown(self): 4387 self.cli.close() 4388 self.cli = None 4389 ThreadableTest.clientTearDown(self) 4390 4391 def _justAccept(self): 4392 conn, addr = self.serv.accept() 4393 conn.close() 4394 4395 testFamily = _justAccept 4396 def _testFamily(self): 4397 self.cli = socket.create_connection((HOST, self.port), timeout=30) 4398 self.addCleanup(self.cli.close) 4399 self.assertEqual(self.cli.family, 2) 4400 4401 testSourceAddress = _justAccept 4402 def _testSourceAddress(self): 4403 self.cli = socket.create_connection((HOST, self.port), timeout=30, 4404 source_address=('', self.source_port)) 4405 self.addCleanup(self.cli.close) 4406 self.assertEqual(self.cli.getsockname()[1], self.source_port) 4407 # The port number being used is sufficient to show that the bind() 4408 # call happened. 4409 4410 testTimeoutDefault = _justAccept 4411 def _testTimeoutDefault(self): 4412 # passing no explicit timeout uses socket's global default 4413 self.assertTrue(socket.getdefaulttimeout() is None) 4414 socket.setdefaulttimeout(42) 4415 try: 4416 self.cli = socket.create_connection((HOST, self.port)) 4417 self.addCleanup(self.cli.close) 4418 finally: 4419 socket.setdefaulttimeout(None) 4420 self.assertEqual(self.cli.gettimeout(), 42) 4421 4422 testTimeoutNone = _justAccept 4423 def _testTimeoutNone(self): 4424 # None timeout means the same as sock.settimeout(None) 4425 self.assertTrue(socket.getdefaulttimeout() is None) 4426 socket.setdefaulttimeout(30) 4427 try: 4428 self.cli = socket.create_connection((HOST, self.port), timeout=None) 4429 self.addCleanup(self.cli.close) 4430 finally: 4431 socket.setdefaulttimeout(None) 4432 self.assertEqual(self.cli.gettimeout(), None) 4433 4434 testTimeoutValueNamed = _justAccept 4435 def _testTimeoutValueNamed(self): 4436 self.cli = socket.create_connection((HOST, self.port), timeout=30) 4437 self.assertEqual(self.cli.gettimeout(), 30) 4438 4439 testTimeoutValueNonamed = _justAccept 4440 def _testTimeoutValueNonamed(self): 4441 self.cli = socket.create_connection((HOST, self.port), 30) 4442 self.addCleanup(self.cli.close) 4443 self.assertEqual(self.cli.gettimeout(), 30) 4444 4445@unittest.skipUnless(thread, 'Threading required for this test.') 4446class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 4447 4448 def __init__(self, methodName='runTest'): 4449 SocketTCPTest.__init__(self, methodName=methodName) 4450 ThreadableTest.__init__(self) 4451 4452 def clientSetUp(self): 4453 pass 4454 4455 def clientTearDown(self): 4456 self.cli.close() 4457 self.cli = None 4458 ThreadableTest.clientTearDown(self) 4459 4460 def testInsideTimeout(self): 4461 conn, addr = self.serv.accept() 4462 self.addCleanup(conn.close) 4463 time.sleep(3) 4464 conn.send(b"done!") 4465 testOutsideTimeout = testInsideTimeout 4466 4467 def _testInsideTimeout(self): 4468 self.cli = sock = socket.create_connection((HOST, self.port)) 4469 data = sock.recv(5) 4470 self.assertEqual(data, b"done!") 4471 4472 def _testOutsideTimeout(self): 4473 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 4474 self.assertRaises(socket.timeout, lambda: sock.recv(5)) 4475 4476 4477class TCPTimeoutTest(SocketTCPTest): 4478 4479 def testTCPTimeout(self): 4480 def raise_timeout(*args, **kwargs): 4481 self.serv.settimeout(1.0) 4482 self.serv.accept() 4483 self.assertRaises(socket.timeout, raise_timeout, 4484 "Error generating a timeout exception (TCP)") 4485 4486 def testTimeoutZero(self): 4487 ok = False 4488 try: 4489 self.serv.settimeout(0.0) 4490 foo = self.serv.accept() 4491 except socket.timeout: 4492 self.fail("caught timeout instead of error (TCP)") 4493 except OSError: 4494 ok = True 4495 except: 4496 self.fail("caught unexpected exception (TCP)") 4497 if not ok: 4498 self.fail("accept() returned success when we did not expect it") 4499 4500 @unittest.skipUnless(hasattr(signal, 'alarm'), 4501 'test needs signal.alarm()') 4502 def testInterruptedTimeout(self): 4503 # XXX I don't know how to do this test on MSWindows or any other 4504 # plaform that doesn't support signal.alarm() or os.kill(), though 4505 # the bug should have existed on all platforms. 4506 self.serv.settimeout(5.0) # must be longer than alarm 4507 class Alarm(Exception): 4508 pass 4509 def alarm_handler(signal, frame): 4510 raise Alarm 4511 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 4512 try: 4513 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 4514 try: 4515 foo = self.serv.accept() 4516 except socket.timeout: 4517 self.fail("caught timeout instead of Alarm") 4518 except Alarm: 4519 pass 4520 except: 4521 self.fail("caught other exception instead of Alarm:" 4522 " %s(%s):\n%s" % 4523 (sys.exc_info()[:2] + (traceback.format_exc(),))) 4524 else: 4525 self.fail("nothing caught") 4526 finally: 4527 signal.alarm(0) # shut off alarm 4528 except Alarm: 4529 self.fail("got Alarm in wrong place") 4530 finally: 4531 # no alarm can be pending. Safe to restore old handler. 4532 signal.signal(signal.SIGALRM, old_alarm) 4533 4534class UDPTimeoutTest(SocketUDPTest): 4535 4536 def testUDPTimeout(self): 4537 def raise_timeout(*args, **kwargs): 4538 self.serv.settimeout(1.0) 4539 self.serv.recv(1024) 4540 self.assertRaises(socket.timeout, raise_timeout, 4541 "Error generating a timeout exception (UDP)") 4542 4543 def testTimeoutZero(self): 4544 ok = False 4545 try: 4546 self.serv.settimeout(0.0) 4547 foo = self.serv.recv(1024) 4548 except socket.timeout: 4549 self.fail("caught timeout instead of error (UDP)") 4550 except OSError: 4551 ok = True 4552 except: 4553 self.fail("caught unexpected exception (UDP)") 4554 if not ok: 4555 self.fail("recv() returned success when we did not expect it") 4556 4557class TestExceptions(unittest.TestCase): 4558 4559 def testExceptionTree(self): 4560 self.assertTrue(issubclass(OSError, Exception)) 4561 self.assertTrue(issubclass(socket.herror, OSError)) 4562 self.assertTrue(issubclass(socket.gaierror, OSError)) 4563 self.assertTrue(issubclass(socket.timeout, OSError)) 4564 4565 def test_setblocking_invalidfd(self): 4566 # Regression test for issue #28471 4567 4568 sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 4569 sock = socket.socket( 4570 socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) 4571 sock0.close() 4572 self.addCleanup(sock.detach) 4573 4574 with self.assertRaises(OSError): 4575 sock.setblocking(False) 4576 4577 4578@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 4579class TestLinuxAbstractNamespace(unittest.TestCase): 4580 4581 UNIX_PATH_MAX = 108 4582 4583 def testLinuxAbstractNamespace(self): 4584 address = b"\x00python-test-hello\x00\xff" 4585 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 4586 s1.bind(address) 4587 s1.listen() 4588 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 4589 s2.connect(s1.getsockname()) 4590 with s1.accept()[0] as s3: 4591 self.assertEqual(s1.getsockname(), address) 4592 self.assertEqual(s2.getpeername(), address) 4593 4594 def testMaxName(self): 4595 address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) 4596 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 4597 s.bind(address) 4598 self.assertEqual(s.getsockname(), address) 4599 4600 def testNameOverflow(self): 4601 address = "\x00" + "h" * self.UNIX_PATH_MAX 4602 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 4603 self.assertRaises(OSError, s.bind, address) 4604 4605 def testStrName(self): 4606 # Check that an abstract name can be passed as a string. 4607 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 4608 try: 4609 s.bind("\x00python\x00test\x00") 4610 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 4611 finally: 4612 s.close() 4613 4614 def testBytearrayName(self): 4615 # Check that an abstract name can be passed as a bytearray. 4616 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 4617 s.bind(bytearray(b"\x00python\x00test\x00")) 4618 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 4619 4620@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') 4621class TestUnixDomain(unittest.TestCase): 4622 4623 def setUp(self): 4624 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 4625 4626 def tearDown(self): 4627 self.sock.close() 4628 4629 def encoded(self, path): 4630 # Return the given path encoded in the file system encoding, 4631 # or skip the test if this is not possible. 4632 try: 4633 return os.fsencode(path) 4634 except UnicodeEncodeError: 4635 self.skipTest( 4636 "Pathname {0!a} cannot be represented in file " 4637 "system encoding {1!r}".format( 4638 path, sys.getfilesystemencoding())) 4639 4640 def bind(self, sock, path): 4641 # Bind the socket 4642 try: 4643 support.bind_unix_socket(sock, path) 4644 except OSError as e: 4645 if str(e) == "AF_UNIX path too long": 4646 self.skipTest( 4647 "Pathname {0!a} is too long to serve as an AF_UNIX path" 4648 .format(path)) 4649 else: 4650 raise 4651 4652 def testStrAddr(self): 4653 # Test binding to and retrieving a normal string pathname. 4654 path = os.path.abspath(support.TESTFN) 4655 self.bind(self.sock, path) 4656 self.addCleanup(support.unlink, path) 4657 self.assertEqual(self.sock.getsockname(), path) 4658 4659 def testBytesAddr(self): 4660 # Test binding to a bytes pathname. 4661 path = os.path.abspath(support.TESTFN) 4662 self.bind(self.sock, self.encoded(path)) 4663 self.addCleanup(support.unlink, path) 4664 self.assertEqual(self.sock.getsockname(), path) 4665 4666 def testSurrogateescapeBind(self): 4667 # Test binding to a valid non-ASCII pathname, with the 4668 # non-ASCII bytes supplied using surrogateescape encoding. 4669 path = os.path.abspath(support.TESTFN_UNICODE) 4670 b = self.encoded(path) 4671 self.bind(self.sock, b.decode("ascii", "surrogateescape")) 4672 self.addCleanup(support.unlink, path) 4673 self.assertEqual(self.sock.getsockname(), path) 4674 4675 def testUnencodableAddr(self): 4676 # Test binding to a pathname that cannot be encoded in the 4677 # file system encoding. 4678 if support.TESTFN_UNENCODABLE is None: 4679 self.skipTest("No unencodable filename available") 4680 path = os.path.abspath(support.TESTFN_UNENCODABLE) 4681 self.bind(self.sock, path) 4682 self.addCleanup(support.unlink, path) 4683 self.assertEqual(self.sock.getsockname(), path) 4684 4685@unittest.skipUnless(thread, 'Threading required for this test.') 4686class BufferIOTest(SocketConnectedTest): 4687 """ 4688 Test the buffer versions of socket.recv() and socket.send(). 4689 """ 4690 def __init__(self, methodName='runTest'): 4691 SocketConnectedTest.__init__(self, methodName=methodName) 4692 4693 def testRecvIntoArray(self): 4694 buf = array.array("B", [0] * len(MSG)) 4695 nbytes = self.cli_conn.recv_into(buf) 4696 self.assertEqual(nbytes, len(MSG)) 4697 buf = buf.tobytes() 4698 msg = buf[:len(MSG)] 4699 self.assertEqual(msg, MSG) 4700 4701 def _testRecvIntoArray(self): 4702 buf = bytes(MSG) 4703 self.serv_conn.send(buf) 4704 4705 def testRecvIntoBytearray(self): 4706 buf = bytearray(1024) 4707 nbytes = self.cli_conn.recv_into(buf) 4708 self.assertEqual(nbytes, len(MSG)) 4709 msg = buf[:len(MSG)] 4710 self.assertEqual(msg, MSG) 4711 4712 _testRecvIntoBytearray = _testRecvIntoArray 4713 4714 def testRecvIntoMemoryview(self): 4715 buf = bytearray(1024) 4716 nbytes = self.cli_conn.recv_into(memoryview(buf)) 4717 self.assertEqual(nbytes, len(MSG)) 4718 msg = buf[:len(MSG)] 4719 self.assertEqual(msg, MSG) 4720 4721 _testRecvIntoMemoryview = _testRecvIntoArray 4722 4723 def testRecvFromIntoArray(self): 4724 buf = array.array("B", [0] * len(MSG)) 4725 nbytes, addr = self.cli_conn.recvfrom_into(buf) 4726 self.assertEqual(nbytes, len(MSG)) 4727 buf = buf.tobytes() 4728 msg = buf[:len(MSG)] 4729 self.assertEqual(msg, MSG) 4730 4731 def _testRecvFromIntoArray(self): 4732 buf = bytes(MSG) 4733 self.serv_conn.send(buf) 4734 4735 def testRecvFromIntoBytearray(self): 4736 buf = bytearray(1024) 4737 nbytes, addr = self.cli_conn.recvfrom_into(buf) 4738 self.assertEqual(nbytes, len(MSG)) 4739 msg = buf[:len(MSG)] 4740 self.assertEqual(msg, MSG) 4741 4742 _testRecvFromIntoBytearray = _testRecvFromIntoArray 4743 4744 def testRecvFromIntoMemoryview(self): 4745 buf = bytearray(1024) 4746 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 4747 self.assertEqual(nbytes, len(MSG)) 4748 msg = buf[:len(MSG)] 4749 self.assertEqual(msg, MSG) 4750 4751 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 4752 4753 def testRecvFromIntoSmallBuffer(self): 4754 # See issue #20246. 4755 buf = bytearray(8) 4756 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 4757 4758 def _testRecvFromIntoSmallBuffer(self): 4759 self.serv_conn.send(MSG) 4760 4761 def testRecvFromIntoEmptyBuffer(self): 4762 buf = bytearray() 4763 self.cli_conn.recvfrom_into(buf) 4764 self.cli_conn.recvfrom_into(buf, 0) 4765 4766 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 4767 4768 4769TIPC_STYPE = 2000 4770TIPC_LOWER = 200 4771TIPC_UPPER = 210 4772 4773def isTipcAvailable(): 4774 """Check if the TIPC module is loaded 4775 4776 The TIPC module is not loaded automatically on Ubuntu and probably 4777 other Linux distros. 4778 """ 4779 if not hasattr(socket, "AF_TIPC"): 4780 return False 4781 try: 4782 f = open("/proc/modules") 4783 except (FileNotFoundError, IsADirectoryError, PermissionError): 4784 # It's ok if the file does not exist, is a directory or if we 4785 # have not the permission to read it. 4786 return False 4787 with f: 4788 for line in f: 4789 if line.startswith("tipc "): 4790 return True 4791 return False 4792 4793@unittest.skipUnless(isTipcAvailable(), 4794 "TIPC module is not loaded, please 'sudo modprobe tipc'") 4795class TIPCTest(unittest.TestCase): 4796 def testRDM(self): 4797 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 4798 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 4799 self.addCleanup(srv.close) 4800 self.addCleanup(cli.close) 4801 4802 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 4803 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 4804 TIPC_LOWER, TIPC_UPPER) 4805 srv.bind(srvaddr) 4806 4807 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 4808 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 4809 cli.sendto(MSG, sendaddr) 4810 4811 msg, recvaddr = srv.recvfrom(1024) 4812 4813 self.assertEqual(cli.getsockname(), recvaddr) 4814 self.assertEqual(msg, MSG) 4815 4816 4817@unittest.skipUnless(isTipcAvailable(), 4818 "TIPC module is not loaded, please 'sudo modprobe tipc'") 4819class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 4820 def __init__(self, methodName = 'runTest'): 4821 unittest.TestCase.__init__(self, methodName = methodName) 4822 ThreadableTest.__init__(self) 4823 4824 def setUp(self): 4825 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 4826 self.addCleanup(self.srv.close) 4827 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 4828 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 4829 TIPC_LOWER, TIPC_UPPER) 4830 self.srv.bind(srvaddr) 4831 self.srv.listen() 4832 self.serverExplicitReady() 4833 self.conn, self.connaddr = self.srv.accept() 4834 self.addCleanup(self.conn.close) 4835 4836 def clientSetUp(self): 4837 # There is a hittable race between serverExplicitReady() and the 4838 # accept() call; sleep a little while to avoid it, otherwise 4839 # we could get an exception 4840 time.sleep(0.1) 4841 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 4842 self.addCleanup(self.cli.close) 4843 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 4844 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 4845 self.cli.connect(addr) 4846 self.cliaddr = self.cli.getsockname() 4847 4848 def testStream(self): 4849 msg = self.conn.recv(1024) 4850 self.assertEqual(msg, MSG) 4851 self.assertEqual(self.cliaddr, self.connaddr) 4852 4853 def _testStream(self): 4854 self.cli.send(MSG) 4855 self.cli.close() 4856 4857 4858@unittest.skipUnless(thread, 'Threading required for this test.') 4859class ContextManagersTest(ThreadedTCPSocketTest): 4860 4861 def _testSocketClass(self): 4862 # base test 4863 with socket.socket() as sock: 4864 self.assertFalse(sock._closed) 4865 self.assertTrue(sock._closed) 4866 # close inside with block 4867 with socket.socket() as sock: 4868 sock.close() 4869 self.assertTrue(sock._closed) 4870 # exception inside with block 4871 with socket.socket() as sock: 4872 self.assertRaises(OSError, sock.sendall, b'foo') 4873 self.assertTrue(sock._closed) 4874 4875 def testCreateConnectionBase(self): 4876 conn, addr = self.serv.accept() 4877 self.addCleanup(conn.close) 4878 data = conn.recv(1024) 4879 conn.sendall(data) 4880 4881 def _testCreateConnectionBase(self): 4882 address = self.serv.getsockname() 4883 with socket.create_connection(address) as sock: 4884 self.assertFalse(sock._closed) 4885 sock.sendall(b'foo') 4886 self.assertEqual(sock.recv(1024), b'foo') 4887 self.assertTrue(sock._closed) 4888 4889 def testCreateConnectionClose(self): 4890 conn, addr = self.serv.accept() 4891 self.addCleanup(conn.close) 4892 data = conn.recv(1024) 4893 conn.sendall(data) 4894 4895 def _testCreateConnectionClose(self): 4896 address = self.serv.getsockname() 4897 with socket.create_connection(address) as sock: 4898 sock.close() 4899 self.assertTrue(sock._closed) 4900 self.assertRaises(OSError, sock.sendall, b'foo') 4901 4902 4903class InheritanceTest(unittest.TestCase): 4904 @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), 4905 "SOCK_CLOEXEC not defined") 4906 @support.requires_linux_version(2, 6, 28) 4907 def test_SOCK_CLOEXEC(self): 4908 with socket.socket(socket.AF_INET, 4909 socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: 4910 self.assertTrue(s.type & socket.SOCK_CLOEXEC) 4911 self.assertFalse(s.get_inheritable()) 4912 4913 def test_default_inheritable(self): 4914 sock = socket.socket() 4915 with sock: 4916 self.assertEqual(sock.get_inheritable(), False) 4917 4918 def test_dup(self): 4919 sock = socket.socket() 4920 with sock: 4921 newsock = sock.dup() 4922 sock.close() 4923 with newsock: 4924 self.assertEqual(newsock.get_inheritable(), False) 4925 4926 def test_set_inheritable(self): 4927 sock = socket.socket() 4928 with sock: 4929 sock.set_inheritable(True) 4930 self.assertEqual(sock.get_inheritable(), True) 4931 4932 sock.set_inheritable(False) 4933 self.assertEqual(sock.get_inheritable(), False) 4934 4935 @unittest.skipIf(fcntl is None, "need fcntl") 4936 def test_get_inheritable_cloexec(self): 4937 sock = socket.socket() 4938 with sock: 4939 fd = sock.fileno() 4940 self.assertEqual(sock.get_inheritable(), False) 4941 4942 # clear FD_CLOEXEC flag 4943 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 4944 flags &= ~fcntl.FD_CLOEXEC 4945 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 4946 4947 self.assertEqual(sock.get_inheritable(), True) 4948 4949 @unittest.skipIf(fcntl is None, "need fcntl") 4950 def test_set_inheritable_cloexec(self): 4951 sock = socket.socket() 4952 with sock: 4953 fd = sock.fileno() 4954 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 4955 fcntl.FD_CLOEXEC) 4956 4957 sock.set_inheritable(True) 4958 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 4959 0) 4960 4961 4962 @unittest.skipUnless(hasattr(socket, "socketpair"), 4963 "need socket.socketpair()") 4964 def test_socketpair(self): 4965 s1, s2 = socket.socketpair() 4966 self.addCleanup(s1.close) 4967 self.addCleanup(s2.close) 4968 self.assertEqual(s1.get_inheritable(), False) 4969 self.assertEqual(s2.get_inheritable(), False) 4970 4971 4972@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), 4973 "SOCK_NONBLOCK not defined") 4974class NonblockConstantTest(unittest.TestCase): 4975 def checkNonblock(self, s, nonblock=True, timeout=0.0): 4976 if nonblock: 4977 self.assertTrue(s.type & socket.SOCK_NONBLOCK) 4978 self.assertEqual(s.gettimeout(), timeout) 4979 else: 4980 self.assertFalse(s.type & socket.SOCK_NONBLOCK) 4981 self.assertEqual(s.gettimeout(), None) 4982 4983 @support.requires_linux_version(2, 6, 28) 4984 def test_SOCK_NONBLOCK(self): 4985 # a lot of it seems silly and redundant, but I wanted to test that 4986 # changing back and forth worked ok 4987 with socket.socket(socket.AF_INET, 4988 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: 4989 self.checkNonblock(s) 4990 s.setblocking(1) 4991 self.checkNonblock(s, False) 4992 s.setblocking(0) 4993 self.checkNonblock(s) 4994 s.settimeout(None) 4995 self.checkNonblock(s, False) 4996 s.settimeout(2.0) 4997 self.checkNonblock(s, timeout=2.0) 4998 s.setblocking(1) 4999 self.checkNonblock(s, False) 5000 # defaulttimeout 5001 t = socket.getdefaulttimeout() 5002 socket.setdefaulttimeout(0.0) 5003 with socket.socket() as s: 5004 self.checkNonblock(s) 5005 socket.setdefaulttimeout(None) 5006 with socket.socket() as s: 5007 self.checkNonblock(s, False) 5008 socket.setdefaulttimeout(2.0) 5009 with socket.socket() as s: 5010 self.checkNonblock(s, timeout=2.0) 5011 socket.setdefaulttimeout(None) 5012 with socket.socket() as s: 5013 self.checkNonblock(s, False) 5014 socket.setdefaulttimeout(t) 5015 5016 5017@unittest.skipUnless(os.name == "nt", "Windows specific") 5018@unittest.skipUnless(multiprocessing, "need multiprocessing") 5019class TestSocketSharing(SocketTCPTest): 5020 # This must be classmethod and not staticmethod or multiprocessing 5021 # won't be able to bootstrap it. 5022 @classmethod 5023 def remoteProcessServer(cls, q): 5024 # Recreate socket from shared data 5025 sdata = q.get() 5026 message = q.get() 5027 5028 s = socket.fromshare(sdata) 5029 s2, c = s.accept() 5030 5031 # Send the message 5032 s2.sendall(message) 5033 s2.close() 5034 s.close() 5035 5036 def testShare(self): 5037 # Transfer the listening server socket to another process 5038 # and service it from there. 5039 5040 # Create process: 5041 q = multiprocessing.Queue() 5042 p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) 5043 p.start() 5044 5045 # Get the shared socket data 5046 data = self.serv.share(p.pid) 5047 5048 # Pass the shared socket to the other process 5049 addr = self.serv.getsockname() 5050 self.serv.close() 5051 q.put(data) 5052 5053 # The data that the server will send us 5054 message = b"slapmahfro" 5055 q.put(message) 5056 5057 # Connect 5058 s = socket.create_connection(addr) 5059 # listen for the data 5060 m = [] 5061 while True: 5062 data = s.recv(100) 5063 if not data: 5064 break 5065 m.append(data) 5066 s.close() 5067 received = b"".join(m) 5068 self.assertEqual(received, message) 5069 p.join() 5070 5071 def testShareLength(self): 5072 data = self.serv.share(os.getpid()) 5073 self.assertRaises(ValueError, socket.fromshare, data[:-1]) 5074 self.assertRaises(ValueError, socket.fromshare, data+b"foo") 5075 5076 def compareSockets(self, org, other): 5077 # socket sharing is expected to work only for blocking socket 5078 # since the internal python timeout value isn't transferred. 5079 self.assertEqual(org.gettimeout(), None) 5080 self.assertEqual(org.gettimeout(), other.gettimeout()) 5081 5082 self.assertEqual(org.family, other.family) 5083 self.assertEqual(org.type, other.type) 5084 # If the user specified "0" for proto, then 5085 # internally windows will have picked the correct value. 5086 # Python introspection on the socket however will still return 5087 # 0. For the shared socket, the python value is recreated 5088 # from the actual value, so it may not compare correctly. 5089 if org.proto != 0: 5090 self.assertEqual(org.proto, other.proto) 5091 5092 def testShareLocal(self): 5093 data = self.serv.share(os.getpid()) 5094 s = socket.fromshare(data) 5095 try: 5096 self.compareSockets(self.serv, s) 5097 finally: 5098 s.close() 5099 5100 def testTypes(self): 5101 families = [socket.AF_INET, socket.AF_INET6] 5102 types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] 5103 for f in families: 5104 for t in types: 5105 try: 5106 source = socket.socket(f, t) 5107 except OSError: 5108 continue # This combination is not supported 5109 try: 5110 data = source.share(os.getpid()) 5111 shared = socket.fromshare(data) 5112 try: 5113 self.compareSockets(source, shared) 5114 finally: 5115 shared.close() 5116 finally: 5117 source.close() 5118 5119 5120@unittest.skipUnless(thread, 'Threading required for this test.') 5121class SendfileUsingSendTest(ThreadedTCPSocketTest): 5122 """ 5123 Test the send() implementation of socket.sendfile(). 5124 """ 5125 5126 FILESIZE = (10 * 1024 * 1024) # 10MB 5127 BUFSIZE = 8192 5128 FILEDATA = b"" 5129 TIMEOUT = 2 5130 5131 @classmethod 5132 def setUpClass(cls): 5133 def chunks(total, step): 5134 assert total >= step 5135 while total > step: 5136 yield step 5137 total -= step 5138 if total: 5139 yield total 5140 5141 chunk = b"".join([random.choice(string.ascii_letters).encode() 5142 for i in range(cls.BUFSIZE)]) 5143 with open(support.TESTFN, 'wb') as f: 5144 for csize in chunks(cls.FILESIZE, cls.BUFSIZE): 5145 f.write(chunk) 5146 with open(support.TESTFN, 'rb') as f: 5147 cls.FILEDATA = f.read() 5148 assert len(cls.FILEDATA) == cls.FILESIZE 5149 5150 @classmethod 5151 def tearDownClass(cls): 5152 support.unlink(support.TESTFN) 5153 5154 def accept_conn(self): 5155 self.serv.settimeout(self.TIMEOUT) 5156 conn, addr = self.serv.accept() 5157 conn.settimeout(self.TIMEOUT) 5158 self.addCleanup(conn.close) 5159 return conn 5160 5161 def recv_data(self, conn): 5162 received = [] 5163 while True: 5164 chunk = conn.recv(self.BUFSIZE) 5165 if not chunk: 5166 break 5167 received.append(chunk) 5168 return b''.join(received) 5169 5170 def meth_from_sock(self, sock): 5171 # Depending on the mixin class being run return either send() 5172 # or sendfile() method implementation. 5173 return getattr(sock, "_sendfile_use_send") 5174 5175 # regular file 5176 5177 def _testRegularFile(self): 5178 address = self.serv.getsockname() 5179 file = open(support.TESTFN, 'rb') 5180 with socket.create_connection(address) as sock, file as file: 5181 meth = self.meth_from_sock(sock) 5182 sent = meth(file) 5183 self.assertEqual(sent, self.FILESIZE) 5184 self.assertEqual(file.tell(), self.FILESIZE) 5185 5186 def testRegularFile(self): 5187 conn = self.accept_conn() 5188 data = self.recv_data(conn) 5189 self.assertEqual(len(data), self.FILESIZE) 5190 self.assertEqual(data, self.FILEDATA) 5191 5192 # non regular file 5193 5194 def _testNonRegularFile(self): 5195 address = self.serv.getsockname() 5196 file = io.BytesIO(self.FILEDATA) 5197 with socket.create_connection(address) as sock, file as file: 5198 sent = sock.sendfile(file) 5199 self.assertEqual(sent, self.FILESIZE) 5200 self.assertEqual(file.tell(), self.FILESIZE) 5201 self.assertRaises(socket._GiveupOnSendfile, 5202 sock._sendfile_use_sendfile, file) 5203 5204 def testNonRegularFile(self): 5205 conn = self.accept_conn() 5206 data = self.recv_data(conn) 5207 self.assertEqual(len(data), self.FILESIZE) 5208 self.assertEqual(data, self.FILEDATA) 5209 5210 # empty file 5211 5212 def _testEmptyFileSend(self): 5213 address = self.serv.getsockname() 5214 filename = support.TESTFN + "2" 5215 with open(filename, 'wb'): 5216 self.addCleanup(support.unlink, filename) 5217 file = open(filename, 'rb') 5218 with socket.create_connection(address) as sock, file as file: 5219 meth = self.meth_from_sock(sock) 5220 sent = meth(file) 5221 self.assertEqual(sent, 0) 5222 self.assertEqual(file.tell(), 0) 5223 5224 def testEmptyFileSend(self): 5225 conn = self.accept_conn() 5226 data = self.recv_data(conn) 5227 self.assertEqual(data, b"") 5228 5229 # offset 5230 5231 def _testOffset(self): 5232 address = self.serv.getsockname() 5233 file = open(support.TESTFN, 'rb') 5234 with socket.create_connection(address) as sock, file as file: 5235 meth = self.meth_from_sock(sock) 5236 sent = meth(file, offset=5000) 5237 self.assertEqual(sent, self.FILESIZE - 5000) 5238 self.assertEqual(file.tell(), self.FILESIZE) 5239 5240 def testOffset(self): 5241 conn = self.accept_conn() 5242 data = self.recv_data(conn) 5243 self.assertEqual(len(data), self.FILESIZE - 5000) 5244 self.assertEqual(data, self.FILEDATA[5000:]) 5245 5246 # count 5247 5248 def _testCount(self): 5249 address = self.serv.getsockname() 5250 file = open(support.TESTFN, 'rb') 5251 with socket.create_connection(address, timeout=2) as sock, file as file: 5252 count = 5000007 5253 meth = self.meth_from_sock(sock) 5254 sent = meth(file, count=count) 5255 self.assertEqual(sent, count) 5256 self.assertEqual(file.tell(), count) 5257 5258 def testCount(self): 5259 count = 5000007 5260 conn = self.accept_conn() 5261 data = self.recv_data(conn) 5262 self.assertEqual(len(data), count) 5263 self.assertEqual(data, self.FILEDATA[:count]) 5264 5265 # count small 5266 5267 def _testCountSmall(self): 5268 address = self.serv.getsockname() 5269 file = open(support.TESTFN, 'rb') 5270 with socket.create_connection(address, timeout=2) as sock, file as file: 5271 count = 1 5272 meth = self.meth_from_sock(sock) 5273 sent = meth(file, count=count) 5274 self.assertEqual(sent, count) 5275 self.assertEqual(file.tell(), count) 5276 5277 def testCountSmall(self): 5278 count = 1 5279 conn = self.accept_conn() 5280 data = self.recv_data(conn) 5281 self.assertEqual(len(data), count) 5282 self.assertEqual(data, self.FILEDATA[:count]) 5283 5284 # count + offset 5285 5286 def _testCountWithOffset(self): 5287 address = self.serv.getsockname() 5288 file = open(support.TESTFN, 'rb') 5289 with socket.create_connection(address, timeout=2) as sock, file as file: 5290 count = 100007 5291 meth = self.meth_from_sock(sock) 5292 sent = meth(file, offset=2007, count=count) 5293 self.assertEqual(sent, count) 5294 self.assertEqual(file.tell(), count + 2007) 5295 5296 def testCountWithOffset(self): 5297 count = 100007 5298 conn = self.accept_conn() 5299 data = self.recv_data(conn) 5300 self.assertEqual(len(data), count) 5301 self.assertEqual(data, self.FILEDATA[2007:count+2007]) 5302 5303 # non blocking sockets are not supposed to work 5304 5305 def _testNonBlocking(self): 5306 address = self.serv.getsockname() 5307 file = open(support.TESTFN, 'rb') 5308 with socket.create_connection(address) as sock, file as file: 5309 sock.setblocking(False) 5310 meth = self.meth_from_sock(sock) 5311 self.assertRaises(ValueError, meth, file) 5312 self.assertRaises(ValueError, sock.sendfile, file) 5313 5314 def testNonBlocking(self): 5315 conn = self.accept_conn() 5316 if conn.recv(8192): 5317 self.fail('was not supposed to receive any data') 5318 5319 # timeout (non-triggered) 5320 5321 def _testWithTimeout(self): 5322 address = self.serv.getsockname() 5323 file = open(support.TESTFN, 'rb') 5324 with socket.create_connection(address, timeout=2) as sock, file as file: 5325 meth = self.meth_from_sock(sock) 5326 sent = meth(file) 5327 self.assertEqual(sent, self.FILESIZE) 5328 5329 def testWithTimeout(self): 5330 conn = self.accept_conn() 5331 data = self.recv_data(conn) 5332 self.assertEqual(len(data), self.FILESIZE) 5333 self.assertEqual(data, self.FILEDATA) 5334 5335 # timeout (triggered) 5336 5337 def _testWithTimeoutTriggeredSend(self): 5338 address = self.serv.getsockname() 5339 file = open(support.TESTFN, 'rb') 5340 with socket.create_connection(address, timeout=0.01) as sock, \ 5341 file as file: 5342 meth = self.meth_from_sock(sock) 5343 self.assertRaises(socket.timeout, meth, file) 5344 5345 def testWithTimeoutTriggeredSend(self): 5346 conn = self.accept_conn() 5347 conn.recv(88192) 5348 5349 # errors 5350 5351 def _test_errors(self): 5352 pass 5353 5354 def test_errors(self): 5355 with open(support.TESTFN, 'rb') as file: 5356 with socket.socket(type=socket.SOCK_DGRAM) as s: 5357 meth = self.meth_from_sock(s) 5358 self.assertRaisesRegex( 5359 ValueError, "SOCK_STREAM", meth, file) 5360 with open(support.TESTFN, 'rt') as file: 5361 with socket.socket() as s: 5362 meth = self.meth_from_sock(s) 5363 self.assertRaisesRegex( 5364 ValueError, "binary mode", meth, file) 5365 with open(support.TESTFN, 'rb') as file: 5366 with socket.socket() as s: 5367 meth = self.meth_from_sock(s) 5368 self.assertRaisesRegex(TypeError, "positive integer", 5369 meth, file, count='2') 5370 self.assertRaisesRegex(TypeError, "positive integer", 5371 meth, file, count=0.1) 5372 self.assertRaisesRegex(ValueError, "positive integer", 5373 meth, file, count=0) 5374 self.assertRaisesRegex(ValueError, "positive integer", 5375 meth, file, count=-1) 5376 5377 5378@unittest.skipUnless(thread, 'Threading required for this test.') 5379@unittest.skipUnless(hasattr(os, "sendfile"), 5380 'os.sendfile() required for this test.') 5381class SendfileUsingSendfileTest(SendfileUsingSendTest): 5382 """ 5383 Test the sendfile() implementation of socket.sendfile(). 5384 """ 5385 def meth_from_sock(self, sock): 5386 return getattr(sock, "_sendfile_use_sendfile") 5387 5388 5389@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') 5390class LinuxKernelCryptoAPI(unittest.TestCase): 5391 # tests for AF_ALG 5392 def create_alg(self, typ, name): 5393 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 5394 try: 5395 sock.bind((typ, name)) 5396 except FileNotFoundError as e: 5397 # type / algorithm is not available 5398 sock.close() 5399 raise unittest.SkipTest(str(e), typ, name) 5400 else: 5401 return sock 5402 5403 def test_sha256(self): 5404 expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" 5405 "177a9cb410ff61f20015ad") 5406 with self.create_alg('hash', 'sha256') as algo: 5407 op, _ = algo.accept() 5408 with op: 5409 op.sendall(b"abc") 5410 self.assertEqual(op.recv(512), expected) 5411 5412 op, _ = algo.accept() 5413 with op: 5414 op.send(b'a', socket.MSG_MORE) 5415 op.send(b'b', socket.MSG_MORE) 5416 op.send(b'c', socket.MSG_MORE) 5417 op.send(b'') 5418 self.assertEqual(op.recv(512), expected) 5419 5420 def test_hmac_sha1(self): 5421 expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") 5422 with self.create_alg('hash', 'hmac(sha1)') as algo: 5423 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") 5424 op, _ = algo.accept() 5425 with op: 5426 op.sendall(b"what do ya want for nothing?") 5427 self.assertEqual(op.recv(512), expected) 5428 5429 # Although it should work with 3.19 and newer the test blocks on 5430 # Ubuntu 15.10 with Kernel 4.2.0-19. 5431 @support.requires_linux_version(4, 3) 5432 def test_aes_cbc(self): 5433 key = bytes.fromhex('06a9214036b8a15b512e03d534120006') 5434 iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') 5435 msg = b"Single block msg" 5436 ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') 5437 msglen = len(msg) 5438 with self.create_alg('skcipher', 'cbc(aes)') as algo: 5439 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 5440 op, _ = algo.accept() 5441 with op: 5442 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 5443 flags=socket.MSG_MORE) 5444 op.sendall(msg) 5445 self.assertEqual(op.recv(msglen), ciphertext) 5446 5447 op, _ = algo.accept() 5448 with op: 5449 op.sendmsg_afalg([ciphertext], 5450 op=socket.ALG_OP_DECRYPT, iv=iv) 5451 self.assertEqual(op.recv(msglen), msg) 5452 5453 # long message 5454 multiplier = 1024 5455 longmsg = [msg] * multiplier 5456 op, _ = algo.accept() 5457 with op: 5458 op.sendmsg_afalg(longmsg, 5459 op=socket.ALG_OP_ENCRYPT, iv=iv) 5460 enc = op.recv(msglen * multiplier) 5461 self.assertEqual(len(enc), msglen * multiplier) 5462 self.assertTrue(enc[:msglen], ciphertext) 5463 5464 op, _ = algo.accept() 5465 with op: 5466 op.sendmsg_afalg([enc], 5467 op=socket.ALG_OP_DECRYPT, iv=iv) 5468 dec = op.recv(msglen * multiplier) 5469 self.assertEqual(len(dec), msglen * multiplier) 5470 self.assertEqual(dec, msg * multiplier) 5471 5472 @support.requires_linux_version(4, 3) # see test_aes_cbc 5473 def test_aead_aes_gcm(self): 5474 key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') 5475 iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') 5476 plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') 5477 assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') 5478 expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') 5479 expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') 5480 5481 taglen = len(expected_tag) 5482 assoclen = len(assoc) 5483 5484 with self.create_alg('aead', 'gcm(aes)') as algo: 5485 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 5486 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, 5487 None, taglen) 5488 5489 # send assoc, plain and tag buffer in separate steps 5490 op, _ = algo.accept() 5491 with op: 5492 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 5493 assoclen=assoclen, flags=socket.MSG_MORE) 5494 op.sendall(assoc, socket.MSG_MORE) 5495 op.sendall(plain, socket.MSG_MORE) 5496 op.sendall(b'\x00' * taglen) 5497 res = op.recv(assoclen + len(plain) + taglen) 5498 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5499 self.assertEqual(expected_tag, res[-taglen:]) 5500 5501 # now with msg 5502 op, _ = algo.accept() 5503 with op: 5504 msg = assoc + plain + b'\x00' * taglen 5505 op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, 5506 assoclen=assoclen) 5507 res = op.recv(assoclen + len(plain) + taglen) 5508 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5509 self.assertEqual(expected_tag, res[-taglen:]) 5510 5511 # create anc data manually 5512 pack_uint32 = struct.Struct('I').pack 5513 op, _ = algo.accept() 5514 with op: 5515 msg = assoc + plain + b'\x00' * taglen 5516 op.sendmsg( 5517 [msg], 5518 ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], 5519 [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], 5520 [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], 5521 ) 5522 ) 5523 res = op.recv(len(msg)) 5524 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5525 self.assertEqual(expected_tag, res[-taglen:]) 5526 5527 # decrypt and verify 5528 op, _ = algo.accept() 5529 with op: 5530 msg = assoc + expected_ct + expected_tag 5531 op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, 5532 assoclen=assoclen) 5533 res = op.recv(len(msg)) 5534 self.assertEqual(plain, res[assoclen:-taglen]) 5535 5536 @support.requires_linux_version(4, 3) # see test_aes_cbc 5537 def test_drbg_pr_sha256(self): 5538 # deterministic random bit generator, prediction resistance, sha256 5539 with self.create_alg('rng', 'drbg_pr_sha256') as algo: 5540 extra_seed = os.urandom(32) 5541 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) 5542 op, _ = algo.accept() 5543 with op: 5544 rn = op.recv(32) 5545 self.assertEqual(len(rn), 32) 5546 5547 def test_sendmsg_afalg_args(self): 5548 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 5549 with sock: 5550 with self.assertRaises(TypeError): 5551 sock.sendmsg_afalg() 5552 5553 with self.assertRaises(TypeError): 5554 sock.sendmsg_afalg(op=None) 5555 5556 with self.assertRaises(TypeError): 5557 sock.sendmsg_afalg(1) 5558 5559 with self.assertRaises(TypeError): 5560 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) 5561 5562 with self.assertRaises(TypeError): 5563 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) 5564 5565 5566def test_main(): 5567 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, 5568 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ] 5569 5570 tests.extend([ 5571 NonBlockingTCPTests, 5572 FileObjectClassTestCase, 5573 UnbufferedFileObjectClassTestCase, 5574 LineBufferedFileObjectClassTestCase, 5575 SmallBufferedFileObjectClassTestCase, 5576 UnicodeReadFileObjectClassTestCase, 5577 UnicodeWriteFileObjectClassTestCase, 5578 UnicodeReadWriteFileObjectClassTestCase, 5579 NetworkConnectionNoServer, 5580 NetworkConnectionAttributesTest, 5581 NetworkConnectionBehaviourTest, 5582 ContextManagersTest, 5583 InheritanceTest, 5584 NonblockConstantTest 5585 ]) 5586 tests.append(BasicSocketPairTest) 5587 tests.append(TestUnixDomain) 5588 tests.append(TestLinuxAbstractNamespace) 5589 tests.extend([TIPCTest, TIPCThreadableTest]) 5590 tests.extend([BasicCANTest, CANTest]) 5591 tests.extend([BasicRDSTest, RDSTest]) 5592 tests.append(LinuxKernelCryptoAPI) 5593 tests.extend([ 5594 CmsgMacroTests, 5595 SendmsgUDPTest, 5596 RecvmsgUDPTest, 5597 RecvmsgIntoUDPTest, 5598 SendmsgUDP6Test, 5599 RecvmsgUDP6Test, 5600 RecvmsgRFC3542AncillaryUDP6Test, 5601 RecvmsgIntoRFC3542AncillaryUDP6Test, 5602 RecvmsgIntoUDP6Test, 5603 SendmsgTCPTest, 5604 RecvmsgTCPTest, 5605 RecvmsgIntoTCPTest, 5606 SendmsgSCTPStreamTest, 5607 RecvmsgSCTPStreamTest, 5608 RecvmsgIntoSCTPStreamTest, 5609 SendmsgUnixStreamTest, 5610 RecvmsgUnixStreamTest, 5611 RecvmsgIntoUnixStreamTest, 5612 RecvmsgSCMRightsStreamTest, 5613 RecvmsgIntoSCMRightsStreamTest, 5614 # These are slow when setitimer() is not available 5615 InterruptedRecvTimeoutTest, 5616 InterruptedSendTimeoutTest, 5617 TestSocketSharing, 5618 SendfileUsingSendTest, 5619 SendfileUsingSendfileTest, 5620 ]) 5621 5622 thread_info = support.threading_setup() 5623 support.run_unittest(*tests) 5624 support.threading_cleanup(*thread_info) 5625 5626if __name__ == "__main__": 5627 test_main() 5628