• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3
4import errno
5import io
6import itertools
7import socket
8import select
9import tempfile
10import time
11import traceback
12import queue
13import sys
14import os
15import platform
16import array
17import contextlib
18from weakref import proxy
19import signal
20import math
21import pickle
22import struct
23import random
24import shutil
25import string
26import _thread as thread
27import threading
28try:
29    import multiprocessing
30except ImportError:
31    multiprocessing = False
32try:
33    import fcntl
34except ImportError:
35    fcntl = None
36
37HOST = support.HOST
38# test unicode string and carriage return
39MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
40MAIN_TIMEOUT = 60.0
41
42VSOCKPORT = 1234
43AIX = platform.system() == "AIX"
44
45try:
46    import _socket
47except ImportError:
48    _socket = None
49
50def get_cid():
51    if fcntl is None:
52        return None
53    if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'):
54        return None
55    try:
56        with open("/dev/vsock", "rb") as f:
57            r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, "    ")
58    except OSError:
59        return None
60    else:
61        return struct.unpack("I", r)[0]
62
63def _have_socket_can():
64    """Check whether CAN sockets are supported on this host."""
65    try:
66        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
67    except (AttributeError, OSError):
68        return False
69    else:
70        s.close()
71    return True
72
73def _have_socket_can_isotp():
74    """Check whether CAN ISOTP sockets are supported on this host."""
75    try:
76        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
77    except (AttributeError, OSError):
78        return False
79    else:
80        s.close()
81    return True
82
83def _have_socket_rds():
84    """Check whether RDS sockets are supported on this host."""
85    try:
86        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
87    except (AttributeError, OSError):
88        return False
89    else:
90        s.close()
91    return True
92
93def _have_socket_alg():
94    """Check whether AF_ALG sockets are supported on this host."""
95    try:
96        s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
97    except (AttributeError, OSError):
98        return False
99    else:
100        s.close()
101    return True
102
103def _have_socket_qipcrtr():
104    """Check whether AF_QIPCRTR sockets are supported on this host."""
105    try:
106        s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0)
107    except (AttributeError, OSError):
108        return False
109    else:
110        s.close()
111    return True
112
113def _have_socket_vsock():
114    """Check whether AF_VSOCK sockets are supported on this host."""
115    ret = get_cid() is not None
116    return ret
117
118
119@contextlib.contextmanager
120def socket_setdefaulttimeout(timeout):
121    old_timeout = socket.getdefaulttimeout()
122    try:
123        socket.setdefaulttimeout(timeout)
124        yield
125    finally:
126        socket.setdefaulttimeout(old_timeout)
127
128
129HAVE_SOCKET_CAN = _have_socket_can()
130
131HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
132
133HAVE_SOCKET_RDS = _have_socket_rds()
134
135HAVE_SOCKET_ALG = _have_socket_alg()
136
137HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr()
138
139HAVE_SOCKET_VSOCK = _have_socket_vsock()
140
141# Size in bytes of the int type
142SIZEOF_INT = array.array("i").itemsize
143
144class SocketTCPTest(unittest.TestCase):
145
146    def setUp(self):
147        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
148        self.port = support.bind_port(self.serv)
149        self.serv.listen()
150
151    def tearDown(self):
152        self.serv.close()
153        self.serv = None
154
155class SocketUDPTest(unittest.TestCase):
156
157    def setUp(self):
158        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
159        self.port = support.bind_port(self.serv)
160
161    def tearDown(self):
162        self.serv.close()
163        self.serv = None
164
165class ThreadSafeCleanupTestCase(unittest.TestCase):
166    """Subclass of unittest.TestCase with thread-safe cleanup methods.
167
168    This subclass protects the addCleanup() and doCleanups() methods
169    with a recursive lock.
170    """
171
172    def __init__(self, *args, **kwargs):
173        super().__init__(*args, **kwargs)
174        self._cleanup_lock = threading.RLock()
175
176    def addCleanup(self, *args, **kwargs):
177        with self._cleanup_lock:
178            return super().addCleanup(*args, **kwargs)
179
180    def doCleanups(self, *args, **kwargs):
181        with self._cleanup_lock:
182            return super().doCleanups(*args, **kwargs)
183
184class SocketCANTest(unittest.TestCase):
185
186    """To be able to run this test, a `vcan0` CAN interface can be created with
187    the following commands:
188    # modprobe vcan
189    # ip link add dev vcan0 type vcan
190    # ifconfig vcan0 up
191    """
192    interface = 'vcan0'
193    bufsize = 128
194
195    """The CAN frame structure is defined in <linux/can.h>:
196
197    struct can_frame {
198        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
199        __u8    can_dlc; /* data length code: 0 .. 8 */
200        __u8    data[8] __attribute__((aligned(8)));
201    };
202    """
203    can_frame_fmt = "=IB3x8s"
204    can_frame_size = struct.calcsize(can_frame_fmt)
205
206    """The Broadcast Management Command frame structure is defined
207    in <linux/can/bcm.h>:
208
209    struct bcm_msg_head {
210        __u32 opcode;
211        __u32 flags;
212        __u32 count;
213        struct timeval ival1, ival2;
214        canid_t can_id;
215        __u32 nframes;
216        struct can_frame frames[0];
217    }
218
219    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
220    `struct can_frame` definition). Must use native not standard types for packing.
221    """
222    bcm_cmd_msg_fmt = "@3I4l2I"
223    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
224
225    def setUp(self):
226        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
227        self.addCleanup(self.s.close)
228        try:
229            self.s.bind((self.interface,))
230        except OSError:
231            self.skipTest('network interface `%s` does not exist' %
232                           self.interface)
233
234
235class SocketRDSTest(unittest.TestCase):
236
237    """To be able to run this test, the `rds` kernel module must be loaded:
238    # modprobe rds
239    """
240    bufsize = 8192
241
242    def setUp(self):
243        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
244        self.addCleanup(self.serv.close)
245        try:
246            self.port = support.bind_port(self.serv)
247        except OSError:
248            self.skipTest('unable to bind RDS socket')
249
250
251class ThreadableTest:
252    """Threadable Test class
253
254    The ThreadableTest class makes it easy to create a threaded
255    client/server pair from an existing unit test. To create a
256    new threaded class from an existing unit test, use multiple
257    inheritance:
258
259        class NewClass (OldClass, ThreadableTest):
260            pass
261
262    This class defines two new fixture functions with obvious
263    purposes for overriding:
264
265        clientSetUp ()
266        clientTearDown ()
267
268    Any new test functions within the class must then define
269    tests in pairs, where the test name is preceded with a
270    '_' to indicate the client portion of the test. Ex:
271
272        def testFoo(self):
273            # Server portion
274
275        def _testFoo(self):
276            # Client portion
277
278    Any exceptions raised by the clients during their tests
279    are caught and transferred to the main thread to alert
280    the testing framework.
281
282    Note, the server setup function cannot call any blocking
283    functions that rely on the client thread during setup,
284    unless serverExplicitReady() is called just before
285    the blocking call (such as in setting up a client/server
286    connection and performing the accept() in setUp().
287    """
288
289    def __init__(self):
290        # Swap the true setup function
291        self.__setUp = self.setUp
292        self.__tearDown = self.tearDown
293        self.setUp = self._setUp
294        self.tearDown = self._tearDown
295
296    def serverExplicitReady(self):
297        """This method allows the server to explicitly indicate that
298        it wants the client thread to proceed. This is useful if the
299        server is about to execute a blocking routine that is
300        dependent upon the client thread during its setup routine."""
301        self.server_ready.set()
302
303    def _setUp(self):
304        self.wait_threads = support.wait_threads_exit()
305        self.wait_threads.__enter__()
306
307        self.server_ready = threading.Event()
308        self.client_ready = threading.Event()
309        self.done = threading.Event()
310        self.queue = queue.Queue(1)
311        self.server_crashed = False
312
313        # Do some munging to start the client test.
314        methodname = self.id()
315        i = methodname.rfind('.')
316        methodname = methodname[i+1:]
317        test_method = getattr(self, '_' + methodname)
318        self.client_thread = thread.start_new_thread(
319            self.clientRun, (test_method,))
320
321        try:
322            self.__setUp()
323        except:
324            self.server_crashed = True
325            raise
326        finally:
327            self.server_ready.set()
328        self.client_ready.wait()
329
330    def _tearDown(self):
331        self.__tearDown()
332        self.done.wait()
333        self.wait_threads.__exit__(None, None, None)
334
335        if self.queue.qsize():
336            exc = self.queue.get()
337            raise exc
338
339    def clientRun(self, test_func):
340        self.server_ready.wait()
341        try:
342            self.clientSetUp()
343        except BaseException as e:
344            self.queue.put(e)
345            self.clientTearDown()
346            return
347        finally:
348            self.client_ready.set()
349        if self.server_crashed:
350            self.clientTearDown()
351            return
352        if not hasattr(test_func, '__call__'):
353            raise TypeError("test_func must be a callable function")
354        try:
355            test_func()
356        except BaseException as e:
357            self.queue.put(e)
358        finally:
359            self.clientTearDown()
360
361    def clientSetUp(self):
362        raise NotImplementedError("clientSetUp must be implemented.")
363
364    def clientTearDown(self):
365        self.done.set()
366        thread.exit()
367
368class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
369
370    def __init__(self, methodName='runTest'):
371        SocketTCPTest.__init__(self, methodName=methodName)
372        ThreadableTest.__init__(self)
373
374    def clientSetUp(self):
375        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
376
377    def clientTearDown(self):
378        self.cli.close()
379        self.cli = None
380        ThreadableTest.clientTearDown(self)
381
382class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
383
384    def __init__(self, methodName='runTest'):
385        SocketUDPTest.__init__(self, methodName=methodName)
386        ThreadableTest.__init__(self)
387
388    def clientSetUp(self):
389        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
390
391    def clientTearDown(self):
392        self.cli.close()
393        self.cli = None
394        ThreadableTest.clientTearDown(self)
395
396class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
397
398    def __init__(self, methodName='runTest'):
399        SocketCANTest.__init__(self, methodName=methodName)
400        ThreadableTest.__init__(self)
401
402    def clientSetUp(self):
403        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
404        try:
405            self.cli.bind((self.interface,))
406        except OSError:
407            # skipTest should not be called here, and will be called in the
408            # server instead
409            pass
410
411    def clientTearDown(self):
412        self.cli.close()
413        self.cli = None
414        ThreadableTest.clientTearDown(self)
415
416class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
417
418    def __init__(self, methodName='runTest'):
419        SocketRDSTest.__init__(self, methodName=methodName)
420        ThreadableTest.__init__(self)
421
422    def clientSetUp(self):
423        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
424        try:
425            # RDS sockets must be bound explicitly to send or receive data
426            self.cli.bind((HOST, 0))
427            self.cli_addr = self.cli.getsockname()
428        except OSError:
429            # skipTest should not be called here, and will be called in the
430            # server instead
431            pass
432
433    def clientTearDown(self):
434        self.cli.close()
435        self.cli = None
436        ThreadableTest.clientTearDown(self)
437
438@unittest.skipIf(fcntl is None, "need fcntl")
439@unittest.skipUnless(HAVE_SOCKET_VSOCK,
440          'VSOCK sockets required for this test.')
441@unittest.skipUnless(get_cid() != 2,
442          "This test can only be run on a virtual guest.")
443class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest):
444
445    def __init__(self, methodName='runTest'):
446        unittest.TestCase.__init__(self, methodName=methodName)
447        ThreadableTest.__init__(self)
448
449    def setUp(self):
450        self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
451        self.addCleanup(self.serv.close)
452        self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT))
453        self.serv.listen()
454        self.serverExplicitReady()
455        self.conn, self.connaddr = self.serv.accept()
456        self.addCleanup(self.conn.close)
457
458    def clientSetUp(self):
459        time.sleep(0.1)
460        self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
461        self.addCleanup(self.cli.close)
462        cid = get_cid()
463        self.cli.connect((cid, VSOCKPORT))
464
465    def testStream(self):
466        msg = self.conn.recv(1024)
467        self.assertEqual(msg, MSG)
468
469    def _testStream(self):
470        self.cli.send(MSG)
471        self.cli.close()
472
473class SocketConnectedTest(ThreadedTCPSocketTest):
474    """Socket tests for client-server connection.
475
476    self.cli_conn is a client socket connected to the server.  The
477    setUp() method guarantees that it is connected to the server.
478    """
479
480    def __init__(self, methodName='runTest'):
481        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
482
483    def setUp(self):
484        ThreadedTCPSocketTest.setUp(self)
485        # Indicate explicitly we're ready for the client thread to
486        # proceed and then perform the blocking call to accept
487        self.serverExplicitReady()
488        conn, addr = self.serv.accept()
489        self.cli_conn = conn
490
491    def tearDown(self):
492        self.cli_conn.close()
493        self.cli_conn = None
494        ThreadedTCPSocketTest.tearDown(self)
495
496    def clientSetUp(self):
497        ThreadedTCPSocketTest.clientSetUp(self)
498        self.cli.connect((HOST, self.port))
499        self.serv_conn = self.cli
500
501    def clientTearDown(self):
502        self.serv_conn.close()
503        self.serv_conn = None
504        ThreadedTCPSocketTest.clientTearDown(self)
505
506class SocketPairTest(unittest.TestCase, ThreadableTest):
507
508    def __init__(self, methodName='runTest'):
509        unittest.TestCase.__init__(self, methodName=methodName)
510        ThreadableTest.__init__(self)
511
512    def setUp(self):
513        self.serv, self.cli = socket.socketpair()
514
515    def tearDown(self):
516        self.serv.close()
517        self.serv = None
518
519    def clientSetUp(self):
520        pass
521
522    def clientTearDown(self):
523        self.cli.close()
524        self.cli = None
525        ThreadableTest.clientTearDown(self)
526
527
528# The following classes are used by the sendmsg()/recvmsg() tests.
529# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
530# gives a drop-in replacement for SocketConnectedTest, but different
531# address families can be used, and the attributes serv_addr and
532# cli_addr will be set to the addresses of the endpoints.
533
534class SocketTestBase(unittest.TestCase):
535    """A base class for socket tests.
536
537    Subclasses must provide methods newSocket() to return a new socket
538    and bindSock(sock) to bind it to an unused address.
539
540    Creates a socket self.serv and sets self.serv_addr to its address.
541    """
542
543    def setUp(self):
544        self.serv = self.newSocket()
545        self.bindServer()
546
547    def bindServer(self):
548        """Bind server socket and set self.serv_addr to its address."""
549        self.bindSock(self.serv)
550        self.serv_addr = self.serv.getsockname()
551
552    def tearDown(self):
553        self.serv.close()
554        self.serv = None
555
556
557class SocketListeningTestMixin(SocketTestBase):
558    """Mixin to listen on the server socket."""
559
560    def setUp(self):
561        super().setUp()
562        self.serv.listen()
563
564
565class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
566                              ThreadableTest):
567    """Mixin to add client socket and allow client/server tests.
568
569    Client socket is self.cli and its address is self.cli_addr.  See
570    ThreadableTest for usage information.
571    """
572
573    def __init__(self, *args, **kwargs):
574        super().__init__(*args, **kwargs)
575        ThreadableTest.__init__(self)
576
577    def clientSetUp(self):
578        self.cli = self.newClientSocket()
579        self.bindClient()
580
581    def newClientSocket(self):
582        """Return a new socket for use as client."""
583        return self.newSocket()
584
585    def bindClient(self):
586        """Bind client socket and set self.cli_addr to its address."""
587        self.bindSock(self.cli)
588        self.cli_addr = self.cli.getsockname()
589
590    def clientTearDown(self):
591        self.cli.close()
592        self.cli = None
593        ThreadableTest.clientTearDown(self)
594
595
596class ConnectedStreamTestMixin(SocketListeningTestMixin,
597                               ThreadedSocketTestMixin):
598    """Mixin to allow client/server stream tests with connected client.
599
600    Server's socket representing connection to client is self.cli_conn
601    and client's connection to server is self.serv_conn.  (Based on
602    SocketConnectedTest.)
603    """
604
605    def setUp(self):
606        super().setUp()
607        # Indicate explicitly we're ready for the client thread to
608        # proceed and then perform the blocking call to accept
609        self.serverExplicitReady()
610        conn, addr = self.serv.accept()
611        self.cli_conn = conn
612
613    def tearDown(self):
614        self.cli_conn.close()
615        self.cli_conn = None
616        super().tearDown()
617
618    def clientSetUp(self):
619        super().clientSetUp()
620        self.cli.connect(self.serv_addr)
621        self.serv_conn = self.cli
622
623    def clientTearDown(self):
624        try:
625            self.serv_conn.close()
626            self.serv_conn = None
627        except AttributeError:
628            pass
629        super().clientTearDown()
630
631
632class UnixSocketTestBase(SocketTestBase):
633    """Base class for Unix-domain socket tests."""
634
635    # This class is used for file descriptor passing tests, so we
636    # create the sockets in a private directory so that other users
637    # can't send anything that might be problematic for a privileged
638    # user running the tests.
639
640    def setUp(self):
641        self.dir_path = tempfile.mkdtemp()
642        self.addCleanup(os.rmdir, self.dir_path)
643        super().setUp()
644
645    def bindSock(self, sock):
646        path = tempfile.mktemp(dir=self.dir_path)
647        support.bind_unix_socket(sock, path)
648        self.addCleanup(support.unlink, path)
649
650class UnixStreamBase(UnixSocketTestBase):
651    """Base class for Unix-domain SOCK_STREAM tests."""
652
653    def newSocket(self):
654        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
655
656
657class InetTestBase(SocketTestBase):
658    """Base class for IPv4 socket tests."""
659
660    host = HOST
661
662    def setUp(self):
663        super().setUp()
664        self.port = self.serv_addr[1]
665
666    def bindSock(self, sock):
667        support.bind_port(sock, host=self.host)
668
669class TCPTestBase(InetTestBase):
670    """Base class for TCP-over-IPv4 tests."""
671
672    def newSocket(self):
673        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
674
675class UDPTestBase(InetTestBase):
676    """Base class for UDP-over-IPv4 tests."""
677
678    def newSocket(self):
679        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
680
681class SCTPStreamBase(InetTestBase):
682    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
683
684    def newSocket(self):
685        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
686                             socket.IPPROTO_SCTP)
687
688
689class Inet6TestBase(InetTestBase):
690    """Base class for IPv6 socket tests."""
691
692    host = support.HOSTv6
693
694class UDP6TestBase(Inet6TestBase):
695    """Base class for UDP-over-IPv6 tests."""
696
697    def newSocket(self):
698        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
699
700
701# Test-skipping decorators for use with ThreadableTest.
702
703def skipWithClientIf(condition, reason):
704    """Skip decorated test if condition is true, add client_skip decorator.
705
706    If the decorated object is not a class, sets its attribute
707    "client_skip" to a decorator which will return an empty function
708    if the test is to be skipped, or the original function if it is
709    not.  This can be used to avoid running the client part of a
710    skipped test when using ThreadableTest.
711    """
712    def client_pass(*args, **kwargs):
713        pass
714    def skipdec(obj):
715        retval = unittest.skip(reason)(obj)
716        if not isinstance(obj, type):
717            retval.client_skip = lambda f: client_pass
718        return retval
719    def noskipdec(obj):
720        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
721            obj.client_skip = lambda f: f
722        return obj
723    return skipdec if condition else noskipdec
724
725
726def requireAttrs(obj, *attributes):
727    """Skip decorated test if obj is missing any of the given attributes.
728
729    Sets client_skip attribute as skipWithClientIf() does.
730    """
731    missing = [name for name in attributes if not hasattr(obj, name)]
732    return skipWithClientIf(
733        missing, "don't have " + ", ".join(name for name in missing))
734
735
736def requireSocket(*args):
737    """Skip decorated test if a socket cannot be created with given arguments.
738
739    When an argument is given as a string, will use the value of that
740    attribute of the socket module, or skip the test if it doesn't
741    exist.  Sets client_skip attribute as skipWithClientIf() does.
742    """
743    err = None
744    missing = [obj for obj in args if
745               isinstance(obj, str) and not hasattr(socket, obj)]
746    if missing:
747        err = "don't have " + ", ".join(name for name in missing)
748    else:
749        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
750                    for obj in args]
751        try:
752            s = socket.socket(*callargs)
753        except OSError as e:
754            # XXX: check errno?
755            err = str(e)
756        else:
757            s.close()
758    return skipWithClientIf(
759        err is not None,
760        "can't create socket({0}): {1}".format(
761            ", ".join(str(o) for o in args), err))
762
763
764#######################################################################
765## Begin Tests
766
767class GeneralModuleTests(unittest.TestCase):
768
769    def test_SocketType_is_socketobject(self):
770        import _socket
771        self.assertTrue(socket.SocketType is _socket.socket)
772        s = socket.socket()
773        self.assertIsInstance(s, socket.SocketType)
774        s.close()
775
776    def test_repr(self):
777        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
778        with s:
779            self.assertIn('fd=%i' % s.fileno(), repr(s))
780            self.assertIn('family=%s' % socket.AF_INET, repr(s))
781            self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
782            self.assertIn('proto=0', repr(s))
783            self.assertNotIn('raddr', repr(s))
784            s.bind(('127.0.0.1', 0))
785            self.assertIn('laddr', repr(s))
786            self.assertIn(str(s.getsockname()), repr(s))
787        self.assertIn('[closed]', repr(s))
788        self.assertNotIn('laddr', repr(s))
789
790    @unittest.skipUnless(_socket is not None, 'need _socket module')
791    def test_csocket_repr(self):
792        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
793        try:
794            expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
795                        % (s.fileno(), s.family, s.type, s.proto))
796            self.assertEqual(repr(s), expected)
797        finally:
798            s.close()
799        expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
800                    % (s.family, s.type, s.proto))
801        self.assertEqual(repr(s), expected)
802
803    def test_weakref(self):
804        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
805            p = proxy(s)
806            self.assertEqual(p.fileno(), s.fileno())
807        s = None
808        try:
809            p.fileno()
810        except ReferenceError:
811            pass
812        else:
813            self.fail('Socket proxy still exists')
814
815    def testSocketError(self):
816        # Testing socket module exceptions
817        msg = "Error raising socket exception (%s)."
818        with self.assertRaises(OSError, msg=msg % 'OSError'):
819            raise OSError
820        with self.assertRaises(OSError, msg=msg % 'socket.herror'):
821            raise socket.herror
822        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
823            raise socket.gaierror
824
825    def testSendtoErrors(self):
826        # Testing that sendto doesn't mask failures. See #10169.
827        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
828        self.addCleanup(s.close)
829        s.bind(('', 0))
830        sockname = s.getsockname()
831        # 2 args
832        with self.assertRaises(TypeError) as cm:
833            s.sendto('\u2620', sockname)
834        self.assertEqual(str(cm.exception),
835                         "a bytes-like object is required, not 'str'")
836        with self.assertRaises(TypeError) as cm:
837            s.sendto(5j, sockname)
838        self.assertEqual(str(cm.exception),
839                         "a bytes-like object is required, not 'complex'")
840        with self.assertRaises(TypeError) as cm:
841            s.sendto(b'foo', None)
842        self.assertIn('not NoneType',str(cm.exception))
843        # 3 args
844        with self.assertRaises(TypeError) as cm:
845            s.sendto('\u2620', 0, sockname)
846        self.assertEqual(str(cm.exception),
847                         "a bytes-like object is required, not 'str'")
848        with self.assertRaises(TypeError) as cm:
849            s.sendto(5j, 0, sockname)
850        self.assertEqual(str(cm.exception),
851                         "a bytes-like object is required, not 'complex'")
852        with self.assertRaises(TypeError) as cm:
853            s.sendto(b'foo', 0, None)
854        self.assertIn('not NoneType', str(cm.exception))
855        with self.assertRaises(TypeError) as cm:
856            s.sendto(b'foo', 'bar', sockname)
857        self.assertIn('an integer is required', str(cm.exception))
858        with self.assertRaises(TypeError) as cm:
859            s.sendto(b'foo', None, None)
860        self.assertIn('an integer is required', str(cm.exception))
861        # wrong number of args
862        with self.assertRaises(TypeError) as cm:
863            s.sendto(b'foo')
864        self.assertIn('(1 given)', str(cm.exception))
865        with self.assertRaises(TypeError) as cm:
866            s.sendto(b'foo', 0, sockname, 4)
867        self.assertIn('(4 given)', str(cm.exception))
868
869    def testCrucialConstants(self):
870        # Testing for mission critical constants
871        socket.AF_INET
872        if socket.has_ipv6:
873            socket.AF_INET6
874        socket.SOCK_STREAM
875        socket.SOCK_DGRAM
876        socket.SOCK_RAW
877        socket.SOCK_RDM
878        socket.SOCK_SEQPACKET
879        socket.SOL_SOCKET
880        socket.SO_REUSEADDR
881
882    def testCrucialIpProtoConstants(self):
883        socket.IPPROTO_TCP
884        socket.IPPROTO_UDP
885        if socket.has_ipv6:
886            socket.IPPROTO_IPV6
887
888    @unittest.skipUnless(os.name == "nt", "Windows specific")
889    def testWindowsSpecificConstants(self):
890        socket.IPPROTO_ICLFXBM
891        socket.IPPROTO_ST
892        socket.IPPROTO_CBT
893        socket.IPPROTO_IGP
894        socket.IPPROTO_RDP
895        socket.IPPROTO_PGM
896        socket.IPPROTO_L2TP
897        socket.IPPROTO_SCTP
898
899    def testHostnameRes(self):
900        # Testing hostname resolution mechanisms
901        hostname = socket.gethostname()
902        try:
903            ip = socket.gethostbyname(hostname)
904        except OSError:
905            # Probably name lookup wasn't set up right; skip this test
906            self.skipTest('name lookup failure')
907        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
908        try:
909            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
910        except OSError:
911            # Probably a similar problem as above; skip this test
912            self.skipTest('name lookup failure')
913        all_host_names = [hostname, hname] + aliases
914        fqhn = socket.getfqdn(ip)
915        if not fqhn in all_host_names:
916            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
917
918    def test_host_resolution(self):
919        for addr in [support.HOSTv4, '10.0.0.1', '255.255.255.255']:
920            self.assertEqual(socket.gethostbyname(addr), addr)
921
922        # we don't test support.HOSTv6 because there's a chance it doesn't have
923        # a matching name entry (e.g. 'ip6-localhost')
924        for host in [support.HOSTv4]:
925            self.assertIn(host, socket.gethostbyaddr(host)[2])
926
927    def test_host_resolution_bad_address(self):
928        # These are all malformed IP addresses and expected not to resolve to
929        # any result.  But some ISPs, e.g. AWS, may successfully resolve these
930        # IPs.
931        explanation = (
932            "resolving an invalid IP address did not raise OSError; "
933            "can be caused by a broken DNS server"
934        )
935        for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
936                     '1:1:1:1:1:1:1:1:1']:
937            with self.assertRaises(OSError, msg=addr):
938                socket.gethostbyname(addr)
939            with self.assertRaises(OSError, msg=explanation):
940                socket.gethostbyaddr(addr)
941
942    @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
943    @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
944    def test_sethostname(self):
945        oldhn = socket.gethostname()
946        try:
947            socket.sethostname('new')
948        except OSError as e:
949            if e.errno == errno.EPERM:
950                self.skipTest("test should be run as root")
951            else:
952                raise
953        try:
954            # running test as root!
955            self.assertEqual(socket.gethostname(), 'new')
956            # Should work with bytes objects too
957            socket.sethostname(b'bar')
958            self.assertEqual(socket.gethostname(), 'bar')
959        finally:
960            socket.sethostname(oldhn)
961
962    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
963                         'socket.if_nameindex() not available.')
964    def testInterfaceNameIndex(self):
965        interfaces = socket.if_nameindex()
966        for index, name in interfaces:
967            self.assertIsInstance(index, int)
968            self.assertIsInstance(name, str)
969            # interface indices are non-zero integers
970            self.assertGreater(index, 0)
971            _index = socket.if_nametoindex(name)
972            self.assertIsInstance(_index, int)
973            self.assertEqual(index, _index)
974            _name = socket.if_indextoname(index)
975            self.assertIsInstance(_name, str)
976            self.assertEqual(name, _name)
977
978    @unittest.skipUnless(hasattr(socket, 'if_indextoname'),
979                         'socket.if_indextoname() not available.')
980    def testInvalidInterfaceIndexToName(self):
981        self.assertRaises(OSError, socket.if_indextoname, 0)
982        self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
983
984    @unittest.skipUnless(hasattr(socket, 'if_nametoindex'),
985                         'socket.if_nametoindex() not available.')
986    def testInvalidInterfaceNameToIndex(self):
987        self.assertRaises(TypeError, socket.if_nametoindex, 0)
988        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
989
990    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
991                         'test needs sys.getrefcount()')
992    def testRefCountGetNameInfo(self):
993        # Testing reference count for getnameinfo
994        try:
995            # On some versions, this loses a reference
996            orig = sys.getrefcount(__name__)
997            socket.getnameinfo(__name__,0)
998        except TypeError:
999            if sys.getrefcount(__name__) != orig:
1000                self.fail("socket.getnameinfo loses a reference")
1001
1002    def testInterpreterCrash(self):
1003        # Making sure getnameinfo doesn't crash the interpreter
1004        try:
1005            # On some versions, this crashes the interpreter.
1006            socket.getnameinfo(('x', 0, 0, 0), 0)
1007        except OSError:
1008            pass
1009
1010    def testNtoH(self):
1011        # This just checks that htons etc. are their own inverse,
1012        # when looking at the lower 16 or 32 bits.
1013        sizes = {socket.htonl: 32, socket.ntohl: 32,
1014                 socket.htons: 16, socket.ntohs: 16}
1015        for func, size in sizes.items():
1016            mask = (1<<size) - 1
1017            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
1018                self.assertEqual(i & mask, func(func(i&mask)) & mask)
1019
1020            swapped = func(mask)
1021            self.assertEqual(swapped & mask, mask)
1022            self.assertRaises(OverflowError, func, 1<<34)
1023
1024    @support.cpython_only
1025    def testNtoHErrors(self):
1026        import _testcapi
1027        s_good_values = [0, 1, 2, 0xffff]
1028        l_good_values = s_good_values + [0xffffffff]
1029        l_bad_values = [-1, -2, 1<<32, 1<<1000]
1030        s_bad_values = l_bad_values + [_testcapi.INT_MIN - 1,
1031                                       _testcapi.INT_MAX + 1]
1032        s_deprecated_values = [1<<16, _testcapi.INT_MAX]
1033        for k in s_good_values:
1034            socket.ntohs(k)
1035            socket.htons(k)
1036        for k in l_good_values:
1037            socket.ntohl(k)
1038            socket.htonl(k)
1039        for k in s_bad_values:
1040            self.assertRaises(OverflowError, socket.ntohs, k)
1041            self.assertRaises(OverflowError, socket.htons, k)
1042        for k in l_bad_values:
1043            self.assertRaises(OverflowError, socket.ntohl, k)
1044            self.assertRaises(OverflowError, socket.htonl, k)
1045        for k in s_deprecated_values:
1046            self.assertWarns(DeprecationWarning, socket.ntohs, k)
1047            self.assertWarns(DeprecationWarning, socket.htons, k)
1048
1049    def testGetServBy(self):
1050        eq = self.assertEqual
1051        # Find one service that exists, then check all the related interfaces.
1052        # I've ordered this by protocols that have both a tcp and udp
1053        # protocol, at least for modern Linuxes.
1054        if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
1055            or sys.platform in ('linux', 'darwin')):
1056            # avoid the 'echo' service on this platform, as there is an
1057            # assumption breaking non-standard port/protocol entry
1058            services = ('daytime', 'qotd', 'domain')
1059        else:
1060            services = ('echo', 'daytime', 'domain')
1061        for service in services:
1062            try:
1063                port = socket.getservbyname(service, 'tcp')
1064                break
1065            except OSError:
1066                pass
1067        else:
1068            raise OSError
1069        # Try same call with optional protocol omitted
1070        # Issue #26936: Android getservbyname() was broken before API 23.
1071        if (not hasattr(sys, 'getandroidapilevel') or
1072                sys.getandroidapilevel() >= 23):
1073            port2 = socket.getservbyname(service)
1074            eq(port, port2)
1075        # Try udp, but don't barf if it doesn't exist
1076        try:
1077            udpport = socket.getservbyname(service, 'udp')
1078        except OSError:
1079            udpport = None
1080        else:
1081            eq(udpport, port)
1082        # Now make sure the lookup by port returns the same service name
1083        # Issue #26936: Android getservbyport() is broken.
1084        if not support.is_android:
1085            eq(socket.getservbyport(port2), service)
1086        eq(socket.getservbyport(port, 'tcp'), service)
1087        if udpport is not None:
1088            eq(socket.getservbyport(udpport, 'udp'), service)
1089        # Make sure getservbyport does not accept out of range ports.
1090        self.assertRaises(OverflowError, socket.getservbyport, -1)
1091        self.assertRaises(OverflowError, socket.getservbyport, 65536)
1092
1093    def testDefaultTimeout(self):
1094        # Testing default timeout
1095        # The default timeout should initially be None
1096        self.assertEqual(socket.getdefaulttimeout(), None)
1097        with socket.socket() as s:
1098            self.assertEqual(s.gettimeout(), None)
1099
1100        # Set the default timeout to 10, and see if it propagates
1101        with socket_setdefaulttimeout(10):
1102            self.assertEqual(socket.getdefaulttimeout(), 10)
1103            with socket.socket() as sock:
1104                self.assertEqual(sock.gettimeout(), 10)
1105
1106            # Reset the default timeout to None, and see if it propagates
1107            socket.setdefaulttimeout(None)
1108            self.assertEqual(socket.getdefaulttimeout(), None)
1109            with socket.socket() as sock:
1110                self.assertEqual(sock.gettimeout(), None)
1111
1112        # Check that setting it to an invalid value raises ValueError
1113        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
1114
1115        # Check that setting it to an invalid type raises TypeError
1116        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
1117
1118    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
1119                         'test needs socket.inet_aton()')
1120    def testIPv4_inet_aton_fourbytes(self):
1121        # Test that issue1008086 and issue767150 are fixed.
1122        # It must return 4 bytes.
1123        self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
1124        self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
1125
1126    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1127                         'test needs socket.inet_pton()')
1128    def testIPv4toString(self):
1129        from socket import inet_aton as f, inet_pton, AF_INET
1130        g = lambda a: inet_pton(AF_INET, a)
1131
1132        assertInvalid = lambda func,a: self.assertRaises(
1133            (OSError, ValueError), func, a
1134        )
1135
1136        self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
1137        self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1138        self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1139        self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1140        self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1141        # bpo-29972: inet_pton() doesn't fail on AIX
1142        if not AIX:
1143            assertInvalid(f, '0.0.0.')
1144        assertInvalid(f, '300.0.0.0')
1145        assertInvalid(f, 'a.0.0.0')
1146        assertInvalid(f, '1.2.3.4.5')
1147        assertInvalid(f, '::1')
1148
1149        self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1150        self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1151        self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1152        self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1153        assertInvalid(g, '0.0.0.')
1154        assertInvalid(g, '300.0.0.0')
1155        assertInvalid(g, 'a.0.0.0')
1156        assertInvalid(g, '1.2.3.4.5')
1157        assertInvalid(g, '::1')
1158
1159    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1160                         'test needs socket.inet_pton()')
1161    def testIPv6toString(self):
1162        try:
1163            from socket import inet_pton, AF_INET6, has_ipv6
1164            if not has_ipv6:
1165                self.skipTest('IPv6 not available')
1166        except ImportError:
1167            self.skipTest('could not import needed symbols from socket')
1168
1169        if sys.platform == "win32":
1170            try:
1171                inet_pton(AF_INET6, '::')
1172            except OSError as e:
1173                if e.winerror == 10022:
1174                    self.skipTest('IPv6 might not be supported')
1175
1176        f = lambda a: inet_pton(AF_INET6, a)
1177        assertInvalid = lambda a: self.assertRaises(
1178            (OSError, ValueError), f, a
1179        )
1180
1181        self.assertEqual(b'\x00' * 16, f('::'))
1182        self.assertEqual(b'\x00' * 16, f('0::0'))
1183        self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1184        self.assertEqual(
1185            b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1186            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1187        )
1188        self.assertEqual(
1189            b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1190            f('ad42:abc::127:0:254:2')
1191        )
1192        self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1193        assertInvalid('0x20::')
1194        assertInvalid(':::')
1195        assertInvalid('::0::')
1196        assertInvalid('1::abc::')
1197        assertInvalid('1::abc::def')
1198        assertInvalid('1:2:3:4:5:6')
1199        assertInvalid('1:2:3:4:5:6:')
1200        assertInvalid('1:2:3:4:5:6:7:8:0')
1201        # bpo-29972: inet_pton() doesn't fail on AIX
1202        if not AIX:
1203            assertInvalid('1:2:3:4:5:6:7:8:')
1204
1205        self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1206            f('::254.42.23.64')
1207        )
1208        self.assertEqual(
1209            b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1210            f('42::a29b:254.42.23.64')
1211        )
1212        self.assertEqual(
1213            b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1214            f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1215        )
1216        assertInvalid('255.254.253.252')
1217        assertInvalid('1::260.2.3.0')
1218        assertInvalid('1::0.be.e.0')
1219        assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1220        assertInvalid('::1.2.3.4:0')
1221        assertInvalid('0.100.200.0:3:4:5:6:7:8')
1222
1223    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1224                         'test needs socket.inet_ntop()')
1225    def testStringToIPv4(self):
1226        from socket import inet_ntoa as f, inet_ntop, AF_INET
1227        g = lambda a: inet_ntop(AF_INET, a)
1228        assertInvalid = lambda func,a: self.assertRaises(
1229            (OSError, ValueError), func, a
1230        )
1231
1232        self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1233        self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1234        self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1235        self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1236        assertInvalid(f, b'\x00' * 3)
1237        assertInvalid(f, b'\x00' * 5)
1238        assertInvalid(f, b'\x00' * 16)
1239        self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1240
1241        self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1242        self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1243        self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1244        assertInvalid(g, b'\x00' * 3)
1245        assertInvalid(g, b'\x00' * 5)
1246        assertInvalid(g, b'\x00' * 16)
1247        self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1248
1249    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1250                         'test needs socket.inet_ntop()')
1251    def testStringToIPv6(self):
1252        try:
1253            from socket import inet_ntop, AF_INET6, has_ipv6
1254            if not has_ipv6:
1255                self.skipTest('IPv6 not available')
1256        except ImportError:
1257            self.skipTest('could not import needed symbols from socket')
1258
1259        if sys.platform == "win32":
1260            try:
1261                inet_ntop(AF_INET6, b'\x00' * 16)
1262            except OSError as e:
1263                if e.winerror == 10022:
1264                    self.skipTest('IPv6 might not be supported')
1265
1266        f = lambda a: inet_ntop(AF_INET6, a)
1267        assertInvalid = lambda a: self.assertRaises(
1268            (OSError, ValueError), f, a
1269        )
1270
1271        self.assertEqual('::', f(b'\x00' * 16))
1272        self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1273        self.assertEqual(
1274            'aef:b01:506:1001:ffff:9997:55:170',
1275            f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1276        )
1277        self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1278
1279        assertInvalid(b'\x12' * 15)
1280        assertInvalid(b'\x12' * 17)
1281        assertInvalid(b'\x12' * 4)
1282
1283    # XXX The following don't test module-level functionality...
1284
1285    def testSockName(self):
1286        # Testing getsockname()
1287        port = support.find_unused_port()
1288        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1289        self.addCleanup(sock.close)
1290        sock.bind(("0.0.0.0", port))
1291        name = sock.getsockname()
1292        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1293        # it reasonable to get the host's addr in addition to 0.0.0.0.
1294        # At least for eCos.  This is required for the S/390 to pass.
1295        try:
1296            my_ip_addr = socket.gethostbyname(socket.gethostname())
1297        except OSError:
1298            # Probably name lookup wasn't set up right; skip this test
1299            self.skipTest('name lookup failure')
1300        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1301        self.assertEqual(name[1], port)
1302
1303    def testGetSockOpt(self):
1304        # Testing getsockopt()
1305        # We know a socket should start without reuse==0
1306        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1307        self.addCleanup(sock.close)
1308        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1309        self.assertFalse(reuse != 0, "initial mode is reuse")
1310
1311    def testSetSockOpt(self):
1312        # Testing setsockopt()
1313        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1314        self.addCleanup(sock.close)
1315        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1316        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1317        self.assertFalse(reuse == 0, "failed to set reuse mode")
1318
1319    def testSendAfterClose(self):
1320        # testing send() after close() with timeout
1321        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1322            sock.settimeout(1)
1323        self.assertRaises(OSError, sock.send, b"spam")
1324
1325    def testCloseException(self):
1326        sock = socket.socket()
1327        sock.bind((socket._LOCALHOST, 0))
1328        socket.socket(fileno=sock.fileno()).close()
1329        try:
1330            sock.close()
1331        except OSError as err:
1332            # Winsock apparently raises ENOTSOCK
1333            self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1334        else:
1335            self.fail("close() should raise EBADF/ENOTSOCK")
1336
1337    def testNewAttributes(self):
1338        # testing .family, .type and .protocol
1339
1340        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1341            self.assertEqual(sock.family, socket.AF_INET)
1342            if hasattr(socket, 'SOCK_CLOEXEC'):
1343                self.assertIn(sock.type,
1344                              (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1345                               socket.SOCK_STREAM))
1346            else:
1347                self.assertEqual(sock.type, socket.SOCK_STREAM)
1348            self.assertEqual(sock.proto, 0)
1349
1350    def test_getsockaddrarg(self):
1351        sock = socket.socket()
1352        self.addCleanup(sock.close)
1353        port = support.find_unused_port()
1354        big_port = port + 65536
1355        neg_port = port - 65536
1356        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1357        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1358        # Since find_unused_port() is inherently subject to race conditions, we
1359        # call it a couple times if necessary.
1360        for i in itertools.count():
1361            port = support.find_unused_port()
1362            try:
1363                sock.bind((HOST, port))
1364            except OSError as e:
1365                if e.errno != errno.EADDRINUSE or i == 5:
1366                    raise
1367            else:
1368                break
1369
1370    @unittest.skipUnless(os.name == "nt", "Windows specific")
1371    def test_sock_ioctl(self):
1372        self.assertTrue(hasattr(socket.socket, 'ioctl'))
1373        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1374        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1375        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1376        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1377        s = socket.socket()
1378        self.addCleanup(s.close)
1379        self.assertRaises(ValueError, s.ioctl, -1, None)
1380        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1381
1382    @unittest.skipUnless(os.name == "nt", "Windows specific")
1383    @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1384                         'Loopback fast path support required for this test')
1385    def test_sio_loopback_fast_path(self):
1386        s = socket.socket()
1387        self.addCleanup(s.close)
1388        try:
1389            s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1390        except OSError as exc:
1391            WSAEOPNOTSUPP = 10045
1392            if exc.winerror == WSAEOPNOTSUPP:
1393                self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1394                              "doesn't implemented in this Windows version")
1395            raise
1396        self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1397
1398    def testGetaddrinfo(self):
1399        try:
1400            socket.getaddrinfo('localhost', 80)
1401        except socket.gaierror as err:
1402            if err.errno == socket.EAI_SERVICE:
1403                # see http://bugs.python.org/issue1282647
1404                self.skipTest("buggy libc version")
1405            raise
1406        # len of every sequence is supposed to be == 5
1407        for info in socket.getaddrinfo(HOST, None):
1408            self.assertEqual(len(info), 5)
1409        # host can be a domain name, a string representation of an
1410        # IPv4/v6 address or None
1411        socket.getaddrinfo('localhost', 80)
1412        socket.getaddrinfo('127.0.0.1', 80)
1413        socket.getaddrinfo(None, 80)
1414        if support.IPV6_ENABLED:
1415            socket.getaddrinfo('::1', 80)
1416        # port can be a string service name such as "http", a numeric
1417        # port number or None
1418        # Issue #26936: Android getaddrinfo() was broken before API level 23.
1419        if (not hasattr(sys, 'getandroidapilevel') or
1420                sys.getandroidapilevel() >= 23):
1421            socket.getaddrinfo(HOST, "http")
1422        socket.getaddrinfo(HOST, 80)
1423        socket.getaddrinfo(HOST, None)
1424        # test family and socktype filters
1425        infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1426        for family, type, _, _, _ in infos:
1427            self.assertEqual(family, socket.AF_INET)
1428            self.assertEqual(str(family), 'AddressFamily.AF_INET')
1429            self.assertEqual(type, socket.SOCK_STREAM)
1430            self.assertEqual(str(type), 'SocketKind.SOCK_STREAM')
1431        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1432        for _, socktype, _, _, _ in infos:
1433            self.assertEqual(socktype, socket.SOCK_STREAM)
1434        # test proto and flags arguments
1435        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1436        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1437        # a server willing to support both IPv4 and IPv6 will
1438        # usually do this
1439        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1440                           socket.AI_PASSIVE)
1441        # test keyword arguments
1442        a = socket.getaddrinfo(HOST, None)
1443        b = socket.getaddrinfo(host=HOST, port=None)
1444        self.assertEqual(a, b)
1445        a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1446        b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1447        self.assertEqual(a, b)
1448        a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1449        b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1450        self.assertEqual(a, b)
1451        a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1452        b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1453        self.assertEqual(a, b)
1454        a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1455        b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1456        self.assertEqual(a, b)
1457        a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1458                               socket.AI_PASSIVE)
1459        b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1460                               type=socket.SOCK_STREAM, proto=0,
1461                               flags=socket.AI_PASSIVE)
1462        self.assertEqual(a, b)
1463        # Issue #6697.
1464        self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1465
1466        # Issue 17269: test workaround for OS X platform bug segfault
1467        if hasattr(socket, 'AI_NUMERICSERV'):
1468            try:
1469                # The arguments here are undefined and the call may succeed
1470                # or fail.  All we care here is that it doesn't segfault.
1471                socket.getaddrinfo("localhost", None, 0, 0, 0,
1472                                   socket.AI_NUMERICSERV)
1473            except socket.gaierror:
1474                pass
1475
1476    def test_getnameinfo(self):
1477        # only IP addresses are allowed
1478        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1479
1480    @unittest.skipUnless(support.is_resource_enabled('network'),
1481                         'network is not enabled')
1482    def test_idna(self):
1483        # Check for internet access before running test
1484        # (issue #12804, issue #25138).
1485        with support.transient_internet('python.org'):
1486            socket.gethostbyname('python.org')
1487
1488        # these should all be successful
1489        domain = 'испытание.pythontest.net'
1490        socket.gethostbyname(domain)
1491        socket.gethostbyname_ex(domain)
1492        socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1493        # this may not work if the forward lookup chooses the IPv6 address, as that doesn't
1494        # have a reverse entry yet
1495        # socket.gethostbyaddr('испытание.python.org')
1496
1497    def check_sendall_interrupted(self, with_timeout):
1498        # socketpair() is not strictly required, but it makes things easier.
1499        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1500            self.skipTest("signal.alarm and socket.socketpair required for this test")
1501        # Our signal handlers clobber the C errno by calling a math function
1502        # with an invalid domain value.
1503        def ok_handler(*args):
1504            self.assertRaises(ValueError, math.acosh, 0)
1505        def raising_handler(*args):
1506            self.assertRaises(ValueError, math.acosh, 0)
1507            1 // 0
1508        c, s = socket.socketpair()
1509        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1510        try:
1511            if with_timeout:
1512                # Just above the one second minimum for signal.alarm
1513                c.settimeout(1.5)
1514            with self.assertRaises(ZeroDivisionError):
1515                signal.alarm(1)
1516                c.sendall(b"x" * support.SOCK_MAX_SIZE)
1517            if with_timeout:
1518                signal.signal(signal.SIGALRM, ok_handler)
1519                signal.alarm(1)
1520                self.assertRaises(socket.timeout, c.sendall,
1521                                  b"x" * support.SOCK_MAX_SIZE)
1522        finally:
1523            signal.alarm(0)
1524            signal.signal(signal.SIGALRM, old_alarm)
1525            c.close()
1526            s.close()
1527
1528    def test_sendall_interrupted(self):
1529        self.check_sendall_interrupted(False)
1530
1531    def test_sendall_interrupted_with_timeout(self):
1532        self.check_sendall_interrupted(True)
1533
1534    def test_dealloc_warn(self):
1535        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1536        r = repr(sock)
1537        with self.assertWarns(ResourceWarning) as cm:
1538            sock = None
1539            support.gc_collect()
1540        self.assertIn(r, str(cm.warning.args[0]))
1541        # An open socket file object gets dereferenced after the socket
1542        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1543        f = sock.makefile('rb')
1544        r = repr(sock)
1545        sock = None
1546        support.gc_collect()
1547        with self.assertWarns(ResourceWarning):
1548            f = None
1549            support.gc_collect()
1550
1551    def test_name_closed_socketio(self):
1552        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1553            fp = sock.makefile("rb")
1554            fp.close()
1555            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1556
1557    def test_unusable_closed_socketio(self):
1558        with socket.socket() as sock:
1559            fp = sock.makefile("rb", buffering=0)
1560            self.assertTrue(fp.readable())
1561            self.assertFalse(fp.writable())
1562            self.assertFalse(fp.seekable())
1563            fp.close()
1564            self.assertRaises(ValueError, fp.readable)
1565            self.assertRaises(ValueError, fp.writable)
1566            self.assertRaises(ValueError, fp.seekable)
1567
1568    def test_socket_close(self):
1569        sock = socket.socket()
1570        try:
1571            sock.bind((HOST, 0))
1572            socket.close(sock.fileno())
1573            with self.assertRaises(OSError):
1574                sock.listen(1)
1575        finally:
1576            with self.assertRaises(OSError):
1577                # sock.close() fails with EBADF
1578                sock.close()
1579        with self.assertRaises(TypeError):
1580            socket.close(None)
1581        with self.assertRaises(OSError):
1582            socket.close(-1)
1583
1584    def test_makefile_mode(self):
1585        for mode in 'r', 'rb', 'rw', 'w', 'wb':
1586            with self.subTest(mode=mode):
1587                with socket.socket() as sock:
1588                    with sock.makefile(mode) as fp:
1589                        self.assertEqual(fp.mode, mode)
1590
1591    def test_makefile_invalid_mode(self):
1592        for mode in 'rt', 'x', '+', 'a':
1593            with self.subTest(mode=mode):
1594                with socket.socket() as sock:
1595                    with self.assertRaisesRegex(ValueError, 'invalid mode'):
1596                        sock.makefile(mode)
1597
1598    def test_pickle(self):
1599        sock = socket.socket()
1600        with sock:
1601            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1602                self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1603        for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1604            family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1605            self.assertEqual(family, socket.AF_INET)
1606            type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1607            self.assertEqual(type, socket.SOCK_STREAM)
1608
1609    def test_listen_backlog(self):
1610        for backlog in 0, -1:
1611            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1612                srv.bind((HOST, 0))
1613                srv.listen(backlog)
1614
1615        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1616            srv.bind((HOST, 0))
1617            srv.listen()
1618
1619    @support.cpython_only
1620    def test_listen_backlog_overflow(self):
1621        # Issue 15989
1622        import _testcapi
1623        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1624            srv.bind((HOST, 0))
1625            self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1626
1627    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1628    def test_flowinfo(self):
1629        self.assertRaises(OverflowError, socket.getnameinfo,
1630                          (support.HOSTv6, 0, 0xffffffff), 0)
1631        with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1632            self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10))
1633
1634    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1635    def test_getaddrinfo_ipv6_basic(self):
1636        ((*_, sockaddr),) = socket.getaddrinfo(
1637            'ff02::1de:c0:face:8D',  # Note capital letter `D`.
1638            1234, socket.AF_INET6,
1639            socket.SOCK_DGRAM,
1640            socket.IPPROTO_UDP
1641        )
1642        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
1643
1644    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1645    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1646    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1647    def test_getaddrinfo_ipv6_scopeid_symbolic(self):
1648        # Just pick up any network interface (Linux, Mac OS X)
1649        (ifindex, test_interface) = socket.if_nameindex()[0]
1650        ((*_, sockaddr),) = socket.getaddrinfo(
1651            'ff02::1de:c0:face:8D%' + test_interface,
1652            1234, socket.AF_INET6,
1653            socket.SOCK_DGRAM,
1654            socket.IPPROTO_UDP
1655        )
1656        # Note missing interface name part in IPv6 address
1657        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1658
1659    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1660    @unittest.skipUnless(
1661        sys.platform == 'win32',
1662        'Numeric scope id does not work or undocumented')
1663    def test_getaddrinfo_ipv6_scopeid_numeric(self):
1664        # Also works on Linux and Mac OS X, but is not documented (?)
1665        # Windows, Linux and Max OS X allow nonexistent interface numbers here.
1666        ifindex = 42
1667        ((*_, sockaddr),) = socket.getaddrinfo(
1668            'ff02::1de:c0:face:8D%' + str(ifindex),
1669            1234, socket.AF_INET6,
1670            socket.SOCK_DGRAM,
1671            socket.IPPROTO_UDP
1672        )
1673        # Note missing interface name part in IPv6 address
1674        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1675
1676    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1677    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1678    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1679    def test_getnameinfo_ipv6_scopeid_symbolic(self):
1680        # Just pick up any network interface.
1681        (ifindex, test_interface) = socket.if_nameindex()[0]
1682        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1683        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1684        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
1685
1686    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1687    @unittest.skipUnless( sys.platform == 'win32',
1688        'Numeric scope id does not work or undocumented')
1689    def test_getnameinfo_ipv6_scopeid_numeric(self):
1690        # Also works on Linux (undocumented), but does not work on Mac OS X
1691        # Windows and Linux allow nonexistent interface numbers here.
1692        ifindex = 42
1693        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1694        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1695        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234'))
1696
1697    def test_str_for_enums(self):
1698        # Make sure that the AF_* and SOCK_* constants have enum-like string
1699        # reprs.
1700        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1701            self.assertEqual(str(s.family), 'AddressFamily.AF_INET')
1702            self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM')
1703
1704    def test_socket_consistent_sock_type(self):
1705        SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
1706        SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0)
1707        sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC
1708
1709        with socket.socket(socket.AF_INET, sock_type) as s:
1710            self.assertEqual(s.type, socket.SOCK_STREAM)
1711            s.settimeout(1)
1712            self.assertEqual(s.type, socket.SOCK_STREAM)
1713            s.settimeout(0)
1714            self.assertEqual(s.type, socket.SOCK_STREAM)
1715            s.setblocking(True)
1716            self.assertEqual(s.type, socket.SOCK_STREAM)
1717            s.setblocking(False)
1718            self.assertEqual(s.type, socket.SOCK_STREAM)
1719
1720    def test_unknown_socket_family_repr(self):
1721        # Test that when created with a family that's not one of the known
1722        # AF_*/SOCK_* constants, socket.family just returns the number.
1723        #
1724        # To do this we fool socket.socket into believing it already has an
1725        # open fd because on this path it doesn't actually verify the family and
1726        # type and populates the socket object.
1727        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1728        fd = sock.detach()
1729        unknown_family = max(socket.AddressFamily.__members__.values()) + 1
1730
1731        unknown_type = max(
1732            kind
1733            for name, kind in socket.SocketKind.__members__.items()
1734            if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'}
1735        ) + 1
1736
1737        with socket.socket(
1738                family=unknown_family, type=unknown_type, proto=23,
1739                fileno=fd) as s:
1740            self.assertEqual(s.family, unknown_family)
1741            self.assertEqual(s.type, unknown_type)
1742            # some OS like macOS ignore proto
1743            self.assertIn(s.proto, {0, 23})
1744
1745    @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1746    def test__sendfile_use_sendfile(self):
1747        class File:
1748            def __init__(self, fd):
1749                self.fd = fd
1750
1751            def fileno(self):
1752                return self.fd
1753        with socket.socket() as sock:
1754            fd = os.open(os.curdir, os.O_RDONLY)
1755            os.close(fd)
1756            with self.assertRaises(socket._GiveupOnSendfile):
1757                sock._sendfile_use_sendfile(File(fd))
1758            with self.assertRaises(OverflowError):
1759                sock._sendfile_use_sendfile(File(2**1000))
1760            with self.assertRaises(TypeError):
1761                sock._sendfile_use_sendfile(File(None))
1762
1763    def _test_socket_fileno(self, s, family, stype):
1764        self.assertEqual(s.family, family)
1765        self.assertEqual(s.type, stype)
1766
1767        fd = s.fileno()
1768        s2 = socket.socket(fileno=fd)
1769        self.addCleanup(s2.close)
1770        # detach old fd to avoid double close
1771        s.detach()
1772        self.assertEqual(s2.family, family)
1773        self.assertEqual(s2.type, stype)
1774        self.assertEqual(s2.fileno(), fd)
1775
1776    def test_socket_fileno(self):
1777        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1778        self.addCleanup(s.close)
1779        s.bind((support.HOST, 0))
1780        self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
1781
1782        if hasattr(socket, "SOCK_DGRAM"):
1783            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1784            self.addCleanup(s.close)
1785            s.bind((support.HOST, 0))
1786            self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
1787
1788        if support.IPV6_ENABLED:
1789            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1790            self.addCleanup(s.close)
1791            s.bind((support.HOSTv6, 0, 0, 0))
1792            self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
1793
1794        if hasattr(socket, "AF_UNIX"):
1795            tmpdir = tempfile.mkdtemp()
1796            self.addCleanup(shutil.rmtree, tmpdir)
1797            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1798            self.addCleanup(s.close)
1799            try:
1800                s.bind(os.path.join(tmpdir, 'socket'))
1801            except PermissionError:
1802                pass
1803            else:
1804                self._test_socket_fileno(s, socket.AF_UNIX,
1805                                         socket.SOCK_STREAM)
1806
1807    def test_socket_fileno_rejects_float(self):
1808        with self.assertRaisesRegex(TypeError, "integer argument expected"):
1809            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5)
1810
1811    def test_socket_fileno_rejects_other_types(self):
1812        with self.assertRaisesRegex(TypeError, "integer is required"):
1813            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo")
1814
1815    def test_socket_fileno_rejects_invalid_socket(self):
1816        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1817            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1)
1818
1819    @unittest.skipIf(os.name == "nt", "Windows disallows -1 only")
1820    def test_socket_fileno_rejects_negative(self):
1821        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1822            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42)
1823
1824    def test_socket_fileno_requires_valid_fd(self):
1825        WSAENOTSOCK = 10038
1826        with self.assertRaises(OSError) as cm:
1827            socket.socket(fileno=support.make_bad_fd())
1828        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1829
1830        with self.assertRaises(OSError) as cm:
1831            socket.socket(
1832                socket.AF_INET,
1833                socket.SOCK_STREAM,
1834                fileno=support.make_bad_fd())
1835        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1836
1837    def test_socket_fileno_requires_socket_fd(self):
1838        with tempfile.NamedTemporaryFile() as afile:
1839            with self.assertRaises(OSError):
1840                socket.socket(fileno=afile.fileno())
1841
1842            with self.assertRaises(OSError) as cm:
1843                socket.socket(
1844                    socket.AF_INET,
1845                    socket.SOCK_STREAM,
1846                    fileno=afile.fileno())
1847            self.assertEqual(cm.exception.errno, errno.ENOTSOCK)
1848
1849
1850@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1851class BasicCANTest(unittest.TestCase):
1852
1853    def testCrucialConstants(self):
1854        socket.AF_CAN
1855        socket.PF_CAN
1856        socket.CAN_RAW
1857
1858    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1859                         'socket.CAN_BCM required for this test.')
1860    def testBCMConstants(self):
1861        socket.CAN_BCM
1862
1863        # opcodes
1864        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task
1865        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task
1866        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task
1867        socket.CAN_BCM_TX_SEND      # send one CAN frame
1868        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription
1869        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription
1870        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription
1871        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request
1872        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0)
1873        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request
1874        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent
1875        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change)
1876
1877        # flags
1878        socket.CAN_BCM_SETTIMER
1879        socket.CAN_BCM_STARTTIMER
1880        socket.CAN_BCM_TX_COUNTEVT
1881        socket.CAN_BCM_TX_ANNOUNCE
1882        socket.CAN_BCM_TX_CP_CAN_ID
1883        socket.CAN_BCM_RX_FILTER_ID
1884        socket.CAN_BCM_RX_CHECK_DLC
1885        socket.CAN_BCM_RX_NO_AUTOTIMER
1886        socket.CAN_BCM_RX_ANNOUNCE_RESUME
1887        socket.CAN_BCM_TX_RESET_MULTI_IDX
1888        socket.CAN_BCM_RX_RTR_FRAME
1889
1890    def testCreateSocket(self):
1891        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1892            pass
1893
1894    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1895                         'socket.CAN_BCM required for this test.')
1896    def testCreateBCMSocket(self):
1897        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
1898            pass
1899
1900    def testBindAny(self):
1901        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1902            address = ('', )
1903            s.bind(address)
1904            self.assertEqual(s.getsockname(), address)
1905
1906    def testTooLongInterfaceName(self):
1907        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
1908        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1909            self.assertRaisesRegex(OSError, 'interface name too long',
1910                                   s.bind, ('x' * 1024,))
1911
1912    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
1913                         'socket.CAN_RAW_LOOPBACK required for this test.')
1914    def testLoopback(self):
1915        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1916            for loopback in (0, 1):
1917                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
1918                             loopback)
1919                self.assertEqual(loopback,
1920                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
1921
1922    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
1923                         'socket.CAN_RAW_FILTER required for this test.')
1924    def testFilter(self):
1925        can_id, can_mask = 0x200, 0x700
1926        can_filter = struct.pack("=II", can_id, can_mask)
1927        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1928            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
1929            self.assertEqual(can_filter,
1930                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
1931            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
1932
1933
1934@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1935class CANTest(ThreadedCANSocketTest):
1936
1937    def __init__(self, methodName='runTest'):
1938        ThreadedCANSocketTest.__init__(self, methodName=methodName)
1939
1940    @classmethod
1941    def build_can_frame(cls, can_id, data):
1942        """Build a CAN frame."""
1943        can_dlc = len(data)
1944        data = data.ljust(8, b'\x00')
1945        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
1946
1947    @classmethod
1948    def dissect_can_frame(cls, frame):
1949        """Dissect a CAN frame."""
1950        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
1951        return (can_id, can_dlc, data[:can_dlc])
1952
1953    def testSendFrame(self):
1954        cf, addr = self.s.recvfrom(self.bufsize)
1955        self.assertEqual(self.cf, cf)
1956        self.assertEqual(addr[0], self.interface)
1957        self.assertEqual(addr[1], socket.AF_CAN)
1958
1959    def _testSendFrame(self):
1960        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
1961        self.cli.send(self.cf)
1962
1963    def testSendMaxFrame(self):
1964        cf, addr = self.s.recvfrom(self.bufsize)
1965        self.assertEqual(self.cf, cf)
1966
1967    def _testSendMaxFrame(self):
1968        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
1969        self.cli.send(self.cf)
1970
1971    def testSendMultiFrames(self):
1972        cf, addr = self.s.recvfrom(self.bufsize)
1973        self.assertEqual(self.cf1, cf)
1974
1975        cf, addr = self.s.recvfrom(self.bufsize)
1976        self.assertEqual(self.cf2, cf)
1977
1978    def _testSendMultiFrames(self):
1979        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
1980        self.cli.send(self.cf1)
1981
1982        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
1983        self.cli.send(self.cf2)
1984
1985    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1986                         'socket.CAN_BCM required for this test.')
1987    def _testBCM(self):
1988        cf, addr = self.cli.recvfrom(self.bufsize)
1989        self.assertEqual(self.cf, cf)
1990        can_id, can_dlc, data = self.dissect_can_frame(cf)
1991        self.assertEqual(self.can_id, can_id)
1992        self.assertEqual(self.data, data)
1993
1994    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1995                         'socket.CAN_BCM required for this test.')
1996    def testBCM(self):
1997        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
1998        self.addCleanup(bcm.close)
1999        bcm.connect((self.interface,))
2000        self.can_id = 0x123
2001        self.data = bytes([0xc0, 0xff, 0xee])
2002        self.cf = self.build_can_frame(self.can_id, self.data)
2003        opcode = socket.CAN_BCM_TX_SEND
2004        flags = 0
2005        count = 0
2006        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
2007        bcm_can_id = 0x0222
2008        nframes = 1
2009        assert len(self.cf) == 16
2010        header = struct.pack(self.bcm_cmd_msg_fmt,
2011                    opcode,
2012                    flags,
2013                    count,
2014                    ival1_seconds,
2015                    ival1_usec,
2016                    ival2_seconds,
2017                    ival2_usec,
2018                    bcm_can_id,
2019                    nframes,
2020                    )
2021        header_plus_frame = header + self.cf
2022        bytes_sent = bcm.send(header_plus_frame)
2023        self.assertEqual(bytes_sent, len(header_plus_frame))
2024
2025
2026@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
2027class ISOTPTest(unittest.TestCase):
2028
2029    def __init__(self, *args, **kwargs):
2030        super().__init__(*args, **kwargs)
2031        self.interface = "vcan0"
2032
2033    def testCrucialConstants(self):
2034        socket.AF_CAN
2035        socket.PF_CAN
2036        socket.CAN_ISOTP
2037        socket.SOCK_DGRAM
2038
2039    def testCreateSocket(self):
2040        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2041            pass
2042
2043    @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
2044                         'socket.CAN_ISOTP required for this test.')
2045    def testCreateISOTPSocket(self):
2046        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2047            pass
2048
2049    def testTooLongInterfaceName(self):
2050        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2051        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2052            with self.assertRaisesRegex(OSError, 'interface name too long'):
2053                s.bind(('x' * 1024, 1, 2))
2054
2055    def testBind(self):
2056        try:
2057            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2058                addr = self.interface, 0x123, 0x456
2059                s.bind(addr)
2060                self.assertEqual(s.getsockname(), addr)
2061        except OSError as e:
2062            if e.errno == errno.ENODEV:
2063                self.skipTest('network interface `%s` does not exist' %
2064                           self.interface)
2065            else:
2066                raise
2067
2068
2069@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2070class BasicRDSTest(unittest.TestCase):
2071
2072    def testCrucialConstants(self):
2073        socket.AF_RDS
2074        socket.PF_RDS
2075
2076    def testCreateSocket(self):
2077        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2078            pass
2079
2080    def testSocketBufferSize(self):
2081        bufsize = 16384
2082        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2083            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
2084            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
2085
2086
2087@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2088class RDSTest(ThreadedRDSSocketTest):
2089
2090    def __init__(self, methodName='runTest'):
2091        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
2092
2093    def setUp(self):
2094        super().setUp()
2095        self.evt = threading.Event()
2096
2097    def testSendAndRecv(self):
2098        data, addr = self.serv.recvfrom(self.bufsize)
2099        self.assertEqual(self.data, data)
2100        self.assertEqual(self.cli_addr, addr)
2101
2102    def _testSendAndRecv(self):
2103        self.data = b'spam'
2104        self.cli.sendto(self.data, 0, (HOST, self.port))
2105
2106    def testPeek(self):
2107        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
2108        self.assertEqual(self.data, data)
2109        data, addr = self.serv.recvfrom(self.bufsize)
2110        self.assertEqual(self.data, data)
2111
2112    def _testPeek(self):
2113        self.data = b'spam'
2114        self.cli.sendto(self.data, 0, (HOST, self.port))
2115
2116    @requireAttrs(socket.socket, 'recvmsg')
2117    def testSendAndRecvMsg(self):
2118        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
2119        self.assertEqual(self.data, data)
2120
2121    @requireAttrs(socket.socket, 'sendmsg')
2122    def _testSendAndRecvMsg(self):
2123        self.data = b'hello ' * 10
2124        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
2125
2126    def testSendAndRecvMulti(self):
2127        data, addr = self.serv.recvfrom(self.bufsize)
2128        self.assertEqual(self.data1, data)
2129
2130        data, addr = self.serv.recvfrom(self.bufsize)
2131        self.assertEqual(self.data2, data)
2132
2133    def _testSendAndRecvMulti(self):
2134        self.data1 = b'bacon'
2135        self.cli.sendto(self.data1, 0, (HOST, self.port))
2136
2137        self.data2 = b'egg'
2138        self.cli.sendto(self.data2, 0, (HOST, self.port))
2139
2140    def testSelect(self):
2141        r, w, x = select.select([self.serv], [], [], 3.0)
2142        self.assertIn(self.serv, r)
2143        data, addr = self.serv.recvfrom(self.bufsize)
2144        self.assertEqual(self.data, data)
2145
2146    def _testSelect(self):
2147        self.data = b'select'
2148        self.cli.sendto(self.data, 0, (HOST, self.port))
2149
2150@unittest.skipUnless(HAVE_SOCKET_QIPCRTR,
2151          'QIPCRTR sockets required for this test.')
2152class BasicQIPCRTRTest(unittest.TestCase):
2153
2154    def testCrucialConstants(self):
2155        socket.AF_QIPCRTR
2156
2157    def testCreateSocket(self):
2158        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2159            pass
2160
2161    def testUnbound(self):
2162        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2163            self.assertEqual(s.getsockname()[1], 0)
2164
2165    def testBindSock(self):
2166        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2167            support.bind_port(s, host=s.getsockname()[0])
2168            self.assertNotEqual(s.getsockname()[1], 0)
2169
2170    def testInvalidBindSock(self):
2171        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2172            self.assertRaises(OSError, support.bind_port, s, host=-2)
2173
2174    def testAutoBindSock(self):
2175        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2176            s.connect((123, 123))
2177            self.assertNotEqual(s.getsockname()[1], 0)
2178
2179@unittest.skipIf(fcntl is None, "need fcntl")
2180@unittest.skipUnless(HAVE_SOCKET_VSOCK,
2181          'VSOCK sockets required for this test.')
2182class BasicVSOCKTest(unittest.TestCase):
2183
2184    def testCrucialConstants(self):
2185        socket.AF_VSOCK
2186
2187    def testVSOCKConstants(self):
2188        socket.SO_VM_SOCKETS_BUFFER_SIZE
2189        socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE
2190        socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE
2191        socket.VMADDR_CID_ANY
2192        socket.VMADDR_PORT_ANY
2193        socket.VMADDR_CID_HOST
2194        socket.VM_SOCKETS_INVALID_VERSION
2195        socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID
2196
2197    def testCreateSocket(self):
2198        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2199            pass
2200
2201    def testSocketBufferSize(self):
2202        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2203            orig_max = s.getsockopt(socket.AF_VSOCK,
2204                                    socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)
2205            orig = s.getsockopt(socket.AF_VSOCK,
2206                                socket.SO_VM_SOCKETS_BUFFER_SIZE)
2207            orig_min = s.getsockopt(socket.AF_VSOCK,
2208                                    socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)
2209
2210            s.setsockopt(socket.AF_VSOCK,
2211                         socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2)
2212            s.setsockopt(socket.AF_VSOCK,
2213                         socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2)
2214            s.setsockopt(socket.AF_VSOCK,
2215                         socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2)
2216
2217            self.assertEqual(orig_max * 2,
2218                             s.getsockopt(socket.AF_VSOCK,
2219                             socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE))
2220            self.assertEqual(orig * 2,
2221                             s.getsockopt(socket.AF_VSOCK,
2222                             socket.SO_VM_SOCKETS_BUFFER_SIZE))
2223            self.assertEqual(orig_min * 2,
2224                             s.getsockopt(socket.AF_VSOCK,
2225                             socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE))
2226
2227
2228class BasicTCPTest(SocketConnectedTest):
2229
2230    def __init__(self, methodName='runTest'):
2231        SocketConnectedTest.__init__(self, methodName=methodName)
2232
2233    def testRecv(self):
2234        # Testing large receive over TCP
2235        msg = self.cli_conn.recv(1024)
2236        self.assertEqual(msg, MSG)
2237
2238    def _testRecv(self):
2239        self.serv_conn.send(MSG)
2240
2241    def testOverFlowRecv(self):
2242        # Testing receive in chunks over TCP
2243        seg1 = self.cli_conn.recv(len(MSG) - 3)
2244        seg2 = self.cli_conn.recv(1024)
2245        msg = seg1 + seg2
2246        self.assertEqual(msg, MSG)
2247
2248    def _testOverFlowRecv(self):
2249        self.serv_conn.send(MSG)
2250
2251    def testRecvFrom(self):
2252        # Testing large recvfrom() over TCP
2253        msg, addr = self.cli_conn.recvfrom(1024)
2254        self.assertEqual(msg, MSG)
2255
2256    def _testRecvFrom(self):
2257        self.serv_conn.send(MSG)
2258
2259    def testOverFlowRecvFrom(self):
2260        # Testing recvfrom() in chunks over TCP
2261        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
2262        seg2, addr = self.cli_conn.recvfrom(1024)
2263        msg = seg1 + seg2
2264        self.assertEqual(msg, MSG)
2265
2266    def _testOverFlowRecvFrom(self):
2267        self.serv_conn.send(MSG)
2268
2269    def testSendAll(self):
2270        # Testing sendall() with a 2048 byte string over TCP
2271        msg = b''
2272        while 1:
2273            read = self.cli_conn.recv(1024)
2274            if not read:
2275                break
2276            msg += read
2277        self.assertEqual(msg, b'f' * 2048)
2278
2279    def _testSendAll(self):
2280        big_chunk = b'f' * 2048
2281        self.serv_conn.sendall(big_chunk)
2282
2283    def testFromFd(self):
2284        # Testing fromfd()
2285        fd = self.cli_conn.fileno()
2286        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
2287        self.addCleanup(sock.close)
2288        self.assertIsInstance(sock, socket.socket)
2289        msg = sock.recv(1024)
2290        self.assertEqual(msg, MSG)
2291
2292    def _testFromFd(self):
2293        self.serv_conn.send(MSG)
2294
2295    def testDup(self):
2296        # Testing dup()
2297        sock = self.cli_conn.dup()
2298        self.addCleanup(sock.close)
2299        msg = sock.recv(1024)
2300        self.assertEqual(msg, MSG)
2301
2302    def _testDup(self):
2303        self.serv_conn.send(MSG)
2304
2305    def testShutdown(self):
2306        # Testing shutdown()
2307        msg = self.cli_conn.recv(1024)
2308        self.assertEqual(msg, MSG)
2309        # wait for _testShutdown to finish: on OS X, when the server
2310        # closes the connection the client also becomes disconnected,
2311        # and the client's shutdown call will fail. (Issue #4397.)
2312        self.done.wait()
2313
2314    def _testShutdown(self):
2315        self.serv_conn.send(MSG)
2316        self.serv_conn.shutdown(2)
2317
2318    testShutdown_overflow = support.cpython_only(testShutdown)
2319
2320    @support.cpython_only
2321    def _testShutdown_overflow(self):
2322        import _testcapi
2323        self.serv_conn.send(MSG)
2324        # Issue 15989
2325        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2326                          _testcapi.INT_MAX + 1)
2327        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2328                          2 + (_testcapi.UINT_MAX + 1))
2329        self.serv_conn.shutdown(2)
2330
2331    def testDetach(self):
2332        # Testing detach()
2333        fileno = self.cli_conn.fileno()
2334        f = self.cli_conn.detach()
2335        self.assertEqual(f, fileno)
2336        # cli_conn cannot be used anymore...
2337        self.assertTrue(self.cli_conn._closed)
2338        self.assertRaises(OSError, self.cli_conn.recv, 1024)
2339        self.cli_conn.close()
2340        # ...but we can create another socket using the (still open)
2341        # file descriptor
2342        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
2343        self.addCleanup(sock.close)
2344        msg = sock.recv(1024)
2345        self.assertEqual(msg, MSG)
2346
2347    def _testDetach(self):
2348        self.serv_conn.send(MSG)
2349
2350
2351class BasicUDPTest(ThreadedUDPSocketTest):
2352
2353    def __init__(self, methodName='runTest'):
2354        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
2355
2356    def testSendtoAndRecv(self):
2357        # Testing sendto() and Recv() over UDP
2358        msg = self.serv.recv(len(MSG))
2359        self.assertEqual(msg, MSG)
2360
2361    def _testSendtoAndRecv(self):
2362        self.cli.sendto(MSG, 0, (HOST, self.port))
2363
2364    def testRecvFrom(self):
2365        # Testing recvfrom() over UDP
2366        msg, addr = self.serv.recvfrom(len(MSG))
2367        self.assertEqual(msg, MSG)
2368
2369    def _testRecvFrom(self):
2370        self.cli.sendto(MSG, 0, (HOST, self.port))
2371
2372    def testRecvFromNegative(self):
2373        # Negative lengths passed to recvfrom should give ValueError.
2374        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2375
2376    def _testRecvFromNegative(self):
2377        self.cli.sendto(MSG, 0, (HOST, self.port))
2378
2379# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
2380# same test code is used with different families and types of socket
2381# (e.g. stream, datagram), and tests using recvmsg() are repeated
2382# using recvmsg_into().
2383#
2384# The generic test classes such as SendmsgTests and
2385# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
2386# supplied with sockets cli_sock and serv_sock representing the
2387# client's and the server's end of the connection respectively, and
2388# attributes cli_addr and serv_addr holding their (numeric where
2389# appropriate) addresses.
2390#
2391# The final concrete test classes combine these with subclasses of
2392# SocketTestBase which set up client and server sockets of a specific
2393# type, and with subclasses of SendrecvmsgBase such as
2394# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
2395# sockets to cli_sock and serv_sock and override the methods and
2396# attributes of SendrecvmsgBase to fill in destination addresses if
2397# needed when sending, check for specific flags in msg_flags, etc.
2398#
2399# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
2400# recvmsg_into().
2401
2402# XXX: like the other datagram (UDP) tests in this module, the code
2403# here assumes that datagram delivery on the local machine will be
2404# reliable.
2405
2406class SendrecvmsgBase(ThreadSafeCleanupTestCase):
2407    # Base class for sendmsg()/recvmsg() tests.
2408
2409    # Time in seconds to wait before considering a test failed, or
2410    # None for no timeout.  Not all tests actually set a timeout.
2411    fail_timeout = 3.0
2412
2413    def setUp(self):
2414        self.misc_event = threading.Event()
2415        super().setUp()
2416
2417    def sendToServer(self, msg):
2418        # Send msg to the server.
2419        return self.cli_sock.send(msg)
2420
2421    # Tuple of alternative default arguments for sendmsg() when called
2422    # via sendmsgToServer() (e.g. to include a destination address).
2423    sendmsg_to_server_defaults = ()
2424
2425    def sendmsgToServer(self, *args):
2426        # Call sendmsg() on self.cli_sock with the given arguments,
2427        # filling in any arguments which are not supplied with the
2428        # corresponding items of self.sendmsg_to_server_defaults, if
2429        # any.
2430        return self.cli_sock.sendmsg(
2431            *(args + self.sendmsg_to_server_defaults[len(args):]))
2432
2433    def doRecvmsg(self, sock, bufsize, *args):
2434        # Call recvmsg() on sock with given arguments and return its
2435        # result.  Should be used for tests which can use either
2436        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
2437        # this method with one which emulates it using recvmsg_into(),
2438        # thus allowing the same test to be used for both methods.
2439        result = sock.recvmsg(bufsize, *args)
2440        self.registerRecvmsgResult(result)
2441        return result
2442
2443    def registerRecvmsgResult(self, result):
2444        # Called by doRecvmsg() with the return value of recvmsg() or
2445        # recvmsg_into().  Can be overridden to arrange cleanup based
2446        # on the returned ancillary data, for instance.
2447        pass
2448
2449    def checkRecvmsgAddress(self, addr1, addr2):
2450        # Called to compare the received address with the address of
2451        # the peer.
2452        self.assertEqual(addr1, addr2)
2453
2454    # Flags that are normally unset in msg_flags
2455    msg_flags_common_unset = 0
2456    for name in ("MSG_CTRUNC", "MSG_OOB"):
2457        msg_flags_common_unset |= getattr(socket, name, 0)
2458
2459    # Flags that are normally set
2460    msg_flags_common_set = 0
2461
2462    # Flags set when a complete record has been received (e.g. MSG_EOR
2463    # for SCTP)
2464    msg_flags_eor_indicator = 0
2465
2466    # Flags set when a complete record has not been received
2467    # (e.g. MSG_TRUNC for datagram sockets)
2468    msg_flags_non_eor_indicator = 0
2469
2470    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2471        # Method to check the value of msg_flags returned by recvmsg[_into]().
2472        #
2473        # Checks that all bits in msg_flags_common_set attribute are
2474        # set in "flags" and all bits in msg_flags_common_unset are
2475        # unset.
2476        #
2477        # The "eor" argument specifies whether the flags should
2478        # indicate that a full record (or datagram) has been received.
2479        # If "eor" is None, no checks are done; otherwise, checks
2480        # that:
2481        #
2482        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
2483        #    set and all bits in msg_flags_non_eor_indicator are unset
2484        #
2485        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
2486        #    are set and all bits in msg_flags_eor_indicator are unset
2487        #
2488        # If "checkset" and/or "checkunset" are supplied, they require
2489        # the given bits to be set or unset respectively, overriding
2490        # what the attributes require for those bits.
2491        #
2492        # If any bits are set in "ignore", they will not be checked,
2493        # regardless of the other inputs.
2494        #
2495        # Will raise Exception if the inputs require a bit to be both
2496        # set and unset, and it is not ignored.
2497
2498        defaultset = self.msg_flags_common_set
2499        defaultunset = self.msg_flags_common_unset
2500
2501        if eor:
2502            defaultset |= self.msg_flags_eor_indicator
2503            defaultunset |= self.msg_flags_non_eor_indicator
2504        elif eor is not None:
2505            defaultset |= self.msg_flags_non_eor_indicator
2506            defaultunset |= self.msg_flags_eor_indicator
2507
2508        # Function arguments override defaults
2509        defaultset &= ~checkunset
2510        defaultunset &= ~checkset
2511
2512        # Merge arguments with remaining defaults, and check for conflicts
2513        checkset |= defaultset
2514        checkunset |= defaultunset
2515        inboth = checkset & checkunset & ~ignore
2516        if inboth:
2517            raise Exception("contradictory set, unset requirements for flags "
2518                            "{0:#x}".format(inboth))
2519
2520        # Compare with given msg_flags value
2521        mask = (checkset | checkunset) & ~ignore
2522        self.assertEqual(flags & mask, checkset & mask)
2523
2524
2525class RecvmsgIntoMixin(SendrecvmsgBase):
2526    # Mixin to implement doRecvmsg() using recvmsg_into().
2527
2528    def doRecvmsg(self, sock, bufsize, *args):
2529        buf = bytearray(bufsize)
2530        result = sock.recvmsg_into([buf], *args)
2531        self.registerRecvmsgResult(result)
2532        self.assertGreaterEqual(result[0], 0)
2533        self.assertLessEqual(result[0], bufsize)
2534        return (bytes(buf[:result[0]]),) + result[1:]
2535
2536
2537class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2538    # Defines flags to be checked in msg_flags for datagram sockets.
2539
2540    @property
2541    def msg_flags_non_eor_indicator(self):
2542        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2543
2544
2545class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2546    # Defines flags to be checked in msg_flags for SCTP sockets.
2547
2548    @property
2549    def msg_flags_eor_indicator(self):
2550        return super().msg_flags_eor_indicator | socket.MSG_EOR
2551
2552
2553class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2554    # Base class for tests on connectionless-mode sockets.  Users must
2555    # supply sockets on attributes cli and serv to be mapped to
2556    # cli_sock and serv_sock respectively.
2557
2558    @property
2559    def serv_sock(self):
2560        return self.serv
2561
2562    @property
2563    def cli_sock(self):
2564        return self.cli
2565
2566    @property
2567    def sendmsg_to_server_defaults(self):
2568        return ([], [], 0, self.serv_addr)
2569
2570    def sendToServer(self, msg):
2571        return self.cli_sock.sendto(msg, self.serv_addr)
2572
2573
2574class SendrecvmsgConnectedBase(SendrecvmsgBase):
2575    # Base class for tests on connected sockets.  Users must supply
2576    # sockets on attributes serv_conn and cli_conn (representing the
2577    # connections *to* the server and the client), to be mapped to
2578    # cli_sock and serv_sock respectively.
2579
2580    @property
2581    def serv_sock(self):
2582        return self.cli_conn
2583
2584    @property
2585    def cli_sock(self):
2586        return self.serv_conn
2587
2588    def checkRecvmsgAddress(self, addr1, addr2):
2589        # Address is currently "unspecified" for a connected socket,
2590        # so we don't examine it
2591        pass
2592
2593
2594class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2595    # Base class to set a timeout on server's socket.
2596
2597    def setUp(self):
2598        super().setUp()
2599        self.serv_sock.settimeout(self.fail_timeout)
2600
2601
2602class SendmsgTests(SendrecvmsgServerTimeoutBase):
2603    # Tests for sendmsg() which can use any socket type and do not
2604    # involve recvmsg() or recvmsg_into().
2605
2606    def testSendmsg(self):
2607        # Send a simple message with sendmsg().
2608        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2609
2610    def _testSendmsg(self):
2611        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2612
2613    def testSendmsgDataGenerator(self):
2614        # Send from buffer obtained from a generator (not a sequence).
2615        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2616
2617    def _testSendmsgDataGenerator(self):
2618        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2619                         len(MSG))
2620
2621    def testSendmsgAncillaryGenerator(self):
2622        # Gather (empty) ancillary data from a generator.
2623        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2624
2625    def _testSendmsgAncillaryGenerator(self):
2626        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2627                         len(MSG))
2628
2629    def testSendmsgArray(self):
2630        # Send data from an array instead of the usual bytes object.
2631        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2632
2633    def _testSendmsgArray(self):
2634        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2635                         len(MSG))
2636
2637    def testSendmsgGather(self):
2638        # Send message data from more than one buffer (gather write).
2639        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2640
2641    def _testSendmsgGather(self):
2642        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2643
2644    def testSendmsgBadArgs(self):
2645        # Check that sendmsg() rejects invalid arguments.
2646        self.assertEqual(self.serv_sock.recv(1000), b"done")
2647
2648    def _testSendmsgBadArgs(self):
2649        self.assertRaises(TypeError, self.cli_sock.sendmsg)
2650        self.assertRaises(TypeError, self.sendmsgToServer,
2651                          b"not in an iterable")
2652        self.assertRaises(TypeError, self.sendmsgToServer,
2653                          object())
2654        self.assertRaises(TypeError, self.sendmsgToServer,
2655                          [object()])
2656        self.assertRaises(TypeError, self.sendmsgToServer,
2657                          [MSG, object()])
2658        self.assertRaises(TypeError, self.sendmsgToServer,
2659                          [MSG], object())
2660        self.assertRaises(TypeError, self.sendmsgToServer,
2661                          [MSG], [], object())
2662        self.assertRaises(TypeError, self.sendmsgToServer,
2663                          [MSG], [], 0, object())
2664        self.sendToServer(b"done")
2665
2666    def testSendmsgBadCmsg(self):
2667        # Check that invalid ancillary data items are rejected.
2668        self.assertEqual(self.serv_sock.recv(1000), b"done")
2669
2670    def _testSendmsgBadCmsg(self):
2671        self.assertRaises(TypeError, self.sendmsgToServer,
2672                          [MSG], [object()])
2673        self.assertRaises(TypeError, self.sendmsgToServer,
2674                          [MSG], [(object(), 0, b"data")])
2675        self.assertRaises(TypeError, self.sendmsgToServer,
2676                          [MSG], [(0, object(), b"data")])
2677        self.assertRaises(TypeError, self.sendmsgToServer,
2678                          [MSG], [(0, 0, object())])
2679        self.assertRaises(TypeError, self.sendmsgToServer,
2680                          [MSG], [(0, 0)])
2681        self.assertRaises(TypeError, self.sendmsgToServer,
2682                          [MSG], [(0, 0, b"data", 42)])
2683        self.sendToServer(b"done")
2684
2685    @requireAttrs(socket, "CMSG_SPACE")
2686    def testSendmsgBadMultiCmsg(self):
2687        # Check that invalid ancillary data items are rejected when
2688        # more than one item is present.
2689        self.assertEqual(self.serv_sock.recv(1000), b"done")
2690
2691    @testSendmsgBadMultiCmsg.client_skip
2692    def _testSendmsgBadMultiCmsg(self):
2693        self.assertRaises(TypeError, self.sendmsgToServer,
2694                          [MSG], [0, 0, b""])
2695        self.assertRaises(TypeError, self.sendmsgToServer,
2696                          [MSG], [(0, 0, b""), object()])
2697        self.sendToServer(b"done")
2698
2699    def testSendmsgExcessCmsgReject(self):
2700        # Check that sendmsg() rejects excess ancillary data items
2701        # when the number that can be sent is limited.
2702        self.assertEqual(self.serv_sock.recv(1000), b"done")
2703
2704    def _testSendmsgExcessCmsgReject(self):
2705        if not hasattr(socket, "CMSG_SPACE"):
2706            # Can only send one item
2707            with self.assertRaises(OSError) as cm:
2708                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
2709            self.assertIsNone(cm.exception.errno)
2710        self.sendToServer(b"done")
2711
2712    def testSendmsgAfterClose(self):
2713        # Check that sendmsg() fails on a closed socket.
2714        pass
2715
2716    def _testSendmsgAfterClose(self):
2717        self.cli_sock.close()
2718        self.assertRaises(OSError, self.sendmsgToServer, [MSG])
2719
2720
2721class SendmsgStreamTests(SendmsgTests):
2722    # Tests for sendmsg() which require a stream socket and do not
2723    # involve recvmsg() or recvmsg_into().
2724
2725    def testSendmsgExplicitNoneAddr(self):
2726        # Check that peer address can be specified as None.
2727        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2728
2729    def _testSendmsgExplicitNoneAddr(self):
2730        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
2731
2732    def testSendmsgTimeout(self):
2733        # Check that timeout works with sendmsg().
2734        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2735        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2736
2737    def _testSendmsgTimeout(self):
2738        try:
2739            self.cli_sock.settimeout(0.03)
2740            try:
2741                while True:
2742                    self.sendmsgToServer([b"a"*512])
2743            except socket.timeout:
2744                pass
2745            except OSError as exc:
2746                if exc.errno != errno.ENOMEM:
2747                    raise
2748                # bpo-33937 the test randomly fails on Travis CI with
2749                # "OSError: [Errno 12] Cannot allocate memory"
2750            else:
2751                self.fail("socket.timeout not raised")
2752        finally:
2753            self.misc_event.set()
2754
2755    # XXX: would be nice to have more tests for sendmsg flags argument.
2756
2757    # Linux supports MSG_DONTWAIT when sending, but in general, it
2758    # only works when receiving.  Could add other platforms if they
2759    # support it too.
2760    @skipWithClientIf(sys.platform not in {"linux"},
2761                      "MSG_DONTWAIT not known to work on this platform when "
2762                      "sending")
2763    def testSendmsgDontWait(self):
2764        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
2765        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2766        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2767
2768    @testSendmsgDontWait.client_skip
2769    def _testSendmsgDontWait(self):
2770        try:
2771            with self.assertRaises(OSError) as cm:
2772                while True:
2773                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
2774            # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI
2775            # with "OSError: [Errno 12] Cannot allocate memory"
2776            self.assertIn(cm.exception.errno,
2777                          (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM))
2778        finally:
2779            self.misc_event.set()
2780
2781
2782class SendmsgConnectionlessTests(SendmsgTests):
2783    # Tests for sendmsg() which require a connectionless-mode
2784    # (e.g. datagram) socket, and do not involve recvmsg() or
2785    # recvmsg_into().
2786
2787    def testSendmsgNoDestAddr(self):
2788        # Check that sendmsg() fails when no destination address is
2789        # given for unconnected socket.
2790        pass
2791
2792    def _testSendmsgNoDestAddr(self):
2793        self.assertRaises(OSError, self.cli_sock.sendmsg,
2794                          [MSG])
2795        self.assertRaises(OSError, self.cli_sock.sendmsg,
2796                          [MSG], [], 0, None)
2797
2798
2799class RecvmsgGenericTests(SendrecvmsgBase):
2800    # Tests for recvmsg() which can also be emulated using
2801    # recvmsg_into(), and can use any socket type.
2802
2803    def testRecvmsg(self):
2804        # Receive a simple message with recvmsg[_into]().
2805        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2806        self.assertEqual(msg, MSG)
2807        self.checkRecvmsgAddress(addr, self.cli_addr)
2808        self.assertEqual(ancdata, [])
2809        self.checkFlags(flags, eor=True)
2810
2811    def _testRecvmsg(self):
2812        self.sendToServer(MSG)
2813
2814    def testRecvmsgExplicitDefaults(self):
2815        # Test recvmsg[_into]() with default arguments provided explicitly.
2816        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2817                                                   len(MSG), 0, 0)
2818        self.assertEqual(msg, MSG)
2819        self.checkRecvmsgAddress(addr, self.cli_addr)
2820        self.assertEqual(ancdata, [])
2821        self.checkFlags(flags, eor=True)
2822
2823    def _testRecvmsgExplicitDefaults(self):
2824        self.sendToServer(MSG)
2825
2826    def testRecvmsgShorter(self):
2827        # Receive a message smaller than buffer.
2828        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2829                                                   len(MSG) + 42)
2830        self.assertEqual(msg, MSG)
2831        self.checkRecvmsgAddress(addr, self.cli_addr)
2832        self.assertEqual(ancdata, [])
2833        self.checkFlags(flags, eor=True)
2834
2835    def _testRecvmsgShorter(self):
2836        self.sendToServer(MSG)
2837
2838    def testRecvmsgTrunc(self):
2839        # Receive part of message, check for truncation indicators.
2840        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2841                                                   len(MSG) - 3)
2842        self.assertEqual(msg, MSG[:-3])
2843        self.checkRecvmsgAddress(addr, self.cli_addr)
2844        self.assertEqual(ancdata, [])
2845        self.checkFlags(flags, eor=False)
2846
2847    def _testRecvmsgTrunc(self):
2848        self.sendToServer(MSG)
2849
2850    def testRecvmsgShortAncillaryBuf(self):
2851        # Test ancillary data buffer too small to hold any ancillary data.
2852        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2853                                                   len(MSG), 1)
2854        self.assertEqual(msg, MSG)
2855        self.checkRecvmsgAddress(addr, self.cli_addr)
2856        self.assertEqual(ancdata, [])
2857        self.checkFlags(flags, eor=True)
2858
2859    def _testRecvmsgShortAncillaryBuf(self):
2860        self.sendToServer(MSG)
2861
2862    def testRecvmsgLongAncillaryBuf(self):
2863        # Test large ancillary data buffer.
2864        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2865                                                   len(MSG), 10240)
2866        self.assertEqual(msg, MSG)
2867        self.checkRecvmsgAddress(addr, self.cli_addr)
2868        self.assertEqual(ancdata, [])
2869        self.checkFlags(flags, eor=True)
2870
2871    def _testRecvmsgLongAncillaryBuf(self):
2872        self.sendToServer(MSG)
2873
2874    def testRecvmsgAfterClose(self):
2875        # Check that recvmsg[_into]() fails on a closed socket.
2876        self.serv_sock.close()
2877        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
2878
2879    def _testRecvmsgAfterClose(self):
2880        pass
2881
2882    def testRecvmsgTimeout(self):
2883        # Check that timeout works.
2884        try:
2885            self.serv_sock.settimeout(0.03)
2886            self.assertRaises(socket.timeout,
2887                              self.doRecvmsg, self.serv_sock, len(MSG))
2888        finally:
2889            self.misc_event.set()
2890
2891    def _testRecvmsgTimeout(self):
2892        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2893
2894    @requireAttrs(socket, "MSG_PEEK")
2895    def testRecvmsgPeek(self):
2896        # Check that MSG_PEEK in flags enables examination of pending
2897        # data without consuming it.
2898
2899        # Receive part of data with MSG_PEEK.
2900        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2901                                                   len(MSG) - 3, 0,
2902                                                   socket.MSG_PEEK)
2903        self.assertEqual(msg, MSG[:-3])
2904        self.checkRecvmsgAddress(addr, self.cli_addr)
2905        self.assertEqual(ancdata, [])
2906        # Ignoring MSG_TRUNC here (so this test is the same for stream
2907        # and datagram sockets).  Some wording in POSIX seems to
2908        # suggest that it needn't be set when peeking, but that may
2909        # just be a slip.
2910        self.checkFlags(flags, eor=False,
2911                        ignore=getattr(socket, "MSG_TRUNC", 0))
2912
2913        # Receive all data with MSG_PEEK.
2914        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2915                                                   len(MSG), 0,
2916                                                   socket.MSG_PEEK)
2917        self.assertEqual(msg, MSG)
2918        self.checkRecvmsgAddress(addr, self.cli_addr)
2919        self.assertEqual(ancdata, [])
2920        self.checkFlags(flags, eor=True)
2921
2922        # Check that the same data can still be received normally.
2923        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2924        self.assertEqual(msg, MSG)
2925        self.checkRecvmsgAddress(addr, self.cli_addr)
2926        self.assertEqual(ancdata, [])
2927        self.checkFlags(flags, eor=True)
2928
2929    @testRecvmsgPeek.client_skip
2930    def _testRecvmsgPeek(self):
2931        self.sendToServer(MSG)
2932
2933    @requireAttrs(socket.socket, "sendmsg")
2934    def testRecvmsgFromSendmsg(self):
2935        # Test receiving with recvmsg[_into]() when message is sent
2936        # using sendmsg().
2937        self.serv_sock.settimeout(self.fail_timeout)
2938        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2939        self.assertEqual(msg, MSG)
2940        self.checkRecvmsgAddress(addr, self.cli_addr)
2941        self.assertEqual(ancdata, [])
2942        self.checkFlags(flags, eor=True)
2943
2944    @testRecvmsgFromSendmsg.client_skip
2945    def _testRecvmsgFromSendmsg(self):
2946        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2947
2948
2949class RecvmsgGenericStreamTests(RecvmsgGenericTests):
2950    # Tests which require a stream socket and can use either recvmsg()
2951    # or recvmsg_into().
2952
2953    def testRecvmsgEOF(self):
2954        # Receive end-of-stream indicator (b"", peer socket closed).
2955        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2956        self.assertEqual(msg, b"")
2957        self.checkRecvmsgAddress(addr, self.cli_addr)
2958        self.assertEqual(ancdata, [])
2959        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
2960
2961    def _testRecvmsgEOF(self):
2962        self.cli_sock.close()
2963
2964    def testRecvmsgOverflow(self):
2965        # Receive a message in more than one chunk.
2966        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2967                                                    len(MSG) - 3)
2968        self.checkRecvmsgAddress(addr, self.cli_addr)
2969        self.assertEqual(ancdata, [])
2970        self.checkFlags(flags, eor=False)
2971
2972        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2973        self.checkRecvmsgAddress(addr, self.cli_addr)
2974        self.assertEqual(ancdata, [])
2975        self.checkFlags(flags, eor=True)
2976
2977        msg = seg1 + seg2
2978        self.assertEqual(msg, MSG)
2979
2980    def _testRecvmsgOverflow(self):
2981        self.sendToServer(MSG)
2982
2983
2984class RecvmsgTests(RecvmsgGenericTests):
2985    # Tests for recvmsg() which can use any socket type.
2986
2987    def testRecvmsgBadArgs(self):
2988        # Check that recvmsg() rejects invalid arguments.
2989        self.assertRaises(TypeError, self.serv_sock.recvmsg)
2990        self.assertRaises(ValueError, self.serv_sock.recvmsg,
2991                          -1, 0, 0)
2992        self.assertRaises(ValueError, self.serv_sock.recvmsg,
2993                          len(MSG), -1, 0)
2994        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2995                          [bytearray(10)], 0, 0)
2996        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2997                          object(), 0, 0)
2998        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2999                          len(MSG), object(), 0)
3000        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3001                          len(MSG), 0, object())
3002
3003        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
3004        self.assertEqual(msg, MSG)
3005        self.checkRecvmsgAddress(addr, self.cli_addr)
3006        self.assertEqual(ancdata, [])
3007        self.checkFlags(flags, eor=True)
3008
3009    def _testRecvmsgBadArgs(self):
3010        self.sendToServer(MSG)
3011
3012
3013class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
3014    # Tests for recvmsg_into() which can use any socket type.
3015
3016    def testRecvmsgIntoBadArgs(self):
3017        # Check that recvmsg_into() rejects invalid arguments.
3018        buf = bytearray(len(MSG))
3019        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
3020        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3021                          len(MSG), 0, 0)
3022        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3023                          buf, 0, 0)
3024        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3025                          [object()], 0, 0)
3026        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3027                          [b"I'm not writable"], 0, 0)
3028        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3029                          [buf, object()], 0, 0)
3030        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
3031                          [buf], -1, 0)
3032        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3033                          [buf], object(), 0)
3034        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3035                          [buf], 0, object())
3036
3037        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
3038        self.assertEqual(nbytes, len(MSG))
3039        self.assertEqual(buf, bytearray(MSG))
3040        self.checkRecvmsgAddress(addr, self.cli_addr)
3041        self.assertEqual(ancdata, [])
3042        self.checkFlags(flags, eor=True)
3043
3044    def _testRecvmsgIntoBadArgs(self):
3045        self.sendToServer(MSG)
3046
3047    def testRecvmsgIntoGenerator(self):
3048        # Receive into buffer obtained from a generator (not a sequence).
3049        buf = bytearray(len(MSG))
3050        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3051            (o for o in [buf]))
3052        self.assertEqual(nbytes, len(MSG))
3053        self.assertEqual(buf, bytearray(MSG))
3054        self.checkRecvmsgAddress(addr, self.cli_addr)
3055        self.assertEqual(ancdata, [])
3056        self.checkFlags(flags, eor=True)
3057
3058    def _testRecvmsgIntoGenerator(self):
3059        self.sendToServer(MSG)
3060
3061    def testRecvmsgIntoArray(self):
3062        # Receive into an array rather than the usual bytearray.
3063        buf = array.array("B", [0] * len(MSG))
3064        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
3065        self.assertEqual(nbytes, len(MSG))
3066        self.assertEqual(buf.tobytes(), MSG)
3067        self.checkRecvmsgAddress(addr, self.cli_addr)
3068        self.assertEqual(ancdata, [])
3069        self.checkFlags(flags, eor=True)
3070
3071    def _testRecvmsgIntoArray(self):
3072        self.sendToServer(MSG)
3073
3074    def testRecvmsgIntoScatter(self):
3075        # Receive into multiple buffers (scatter write).
3076        b1 = bytearray(b"----")
3077        b2 = bytearray(b"0123456789")
3078        b3 = bytearray(b"--------------")
3079        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3080            [b1, memoryview(b2)[2:9], b3])
3081        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
3082        self.assertEqual(b1, bytearray(b"Mary"))
3083        self.assertEqual(b2, bytearray(b"01 had a 9"))
3084        self.assertEqual(b3, bytearray(b"little lamb---"))
3085        self.checkRecvmsgAddress(addr, self.cli_addr)
3086        self.assertEqual(ancdata, [])
3087        self.checkFlags(flags, eor=True)
3088
3089    def _testRecvmsgIntoScatter(self):
3090        self.sendToServer(b"Mary had a little lamb")
3091
3092
3093class CmsgMacroTests(unittest.TestCase):
3094    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
3095    # assumptions used by sendmsg() and recvmsg[_into](), which share
3096    # code with these functions.
3097
3098    # Match the definition in socketmodule.c
3099    try:
3100        import _testcapi
3101    except ImportError:
3102        socklen_t_limit = 0x7fffffff
3103    else:
3104        socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
3105
3106    @requireAttrs(socket, "CMSG_LEN")
3107    def testCMSG_LEN(self):
3108        # Test CMSG_LEN() with various valid and invalid values,
3109        # checking the assumptions used by recvmsg() and sendmsg().
3110        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
3111        values = list(range(257)) + list(range(toobig - 257, toobig))
3112
3113        # struct cmsghdr has at least three members, two of which are ints
3114        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
3115        for n in values:
3116            ret = socket.CMSG_LEN(n)
3117            # This is how recvmsg() calculates the data size
3118            self.assertEqual(ret - socket.CMSG_LEN(0), n)
3119            self.assertLessEqual(ret, self.socklen_t_limit)
3120
3121        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
3122        # sendmsg() shares code with these functions, and requires
3123        # that it reject values over the limit.
3124        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
3125        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
3126
3127    @requireAttrs(socket, "CMSG_SPACE")
3128    def testCMSG_SPACE(self):
3129        # Test CMSG_SPACE() with various valid and invalid values,
3130        # checking the assumptions used by sendmsg().
3131        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
3132        values = list(range(257)) + list(range(toobig - 257, toobig))
3133
3134        last = socket.CMSG_SPACE(0)
3135        # struct cmsghdr has at least three members, two of which are ints
3136        self.assertGreater(last, array.array("i").itemsize * 2)
3137        for n in values:
3138            ret = socket.CMSG_SPACE(n)
3139            self.assertGreaterEqual(ret, last)
3140            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
3141            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
3142            self.assertLessEqual(ret, self.socklen_t_limit)
3143            last = ret
3144
3145        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
3146        # sendmsg() shares code with these functions, and requires
3147        # that it reject values over the limit.
3148        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
3149        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
3150
3151
3152class SCMRightsTest(SendrecvmsgServerTimeoutBase):
3153    # Tests for file descriptor passing on Unix-domain sockets.
3154
3155    # Invalid file descriptor value that's unlikely to evaluate to a
3156    # real FD even if one of its bytes is replaced with a different
3157    # value (which shouldn't actually happen).
3158    badfd = -0x5555
3159
3160    def newFDs(self, n):
3161        # Return a list of n file descriptors for newly-created files
3162        # containing their list indices as ASCII numbers.
3163        fds = []
3164        for i in range(n):
3165            fd, path = tempfile.mkstemp()
3166            self.addCleanup(os.unlink, path)
3167            self.addCleanup(os.close, fd)
3168            os.write(fd, str(i).encode())
3169            fds.append(fd)
3170        return fds
3171
3172    def checkFDs(self, fds):
3173        # Check that the file descriptors in the given list contain
3174        # their correct list indices as ASCII numbers.
3175        for n, fd in enumerate(fds):
3176            os.lseek(fd, 0, os.SEEK_SET)
3177            self.assertEqual(os.read(fd, 1024), str(n).encode())
3178
3179    def registerRecvmsgResult(self, result):
3180        self.addCleanup(self.closeRecvmsgFDs, result)
3181
3182    def closeRecvmsgFDs(self, recvmsg_result):
3183        # Close all file descriptors specified in the ancillary data
3184        # of the given return value from recvmsg() or recvmsg_into().
3185        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
3186            if (cmsg_level == socket.SOL_SOCKET and
3187                    cmsg_type == socket.SCM_RIGHTS):
3188                fds = array.array("i")
3189                fds.frombytes(cmsg_data[:
3190                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3191                for fd in fds:
3192                    os.close(fd)
3193
3194    def createAndSendFDs(self, n):
3195        # Send n new file descriptors created by newFDs() to the
3196        # server, with the constant MSG as the non-ancillary data.
3197        self.assertEqual(
3198            self.sendmsgToServer([MSG],
3199                                 [(socket.SOL_SOCKET,
3200                                   socket.SCM_RIGHTS,
3201                                   array.array("i", self.newFDs(n)))]),
3202            len(MSG))
3203
3204    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
3205        # Check that constant MSG was received with numfds file
3206        # descriptors in a maximum of maxcmsgs control messages (which
3207        # must contain only complete integers).  By default, check
3208        # that MSG_CTRUNC is unset, but ignore any flags in
3209        # ignoreflags.
3210        msg, ancdata, flags, addr = result
3211        self.assertEqual(msg, MSG)
3212        self.checkRecvmsgAddress(addr, self.cli_addr)
3213        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3214                        ignore=ignoreflags)
3215
3216        self.assertIsInstance(ancdata, list)
3217        self.assertLessEqual(len(ancdata), maxcmsgs)
3218        fds = array.array("i")
3219        for item in ancdata:
3220            self.assertIsInstance(item, tuple)
3221            cmsg_level, cmsg_type, cmsg_data = item
3222            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3223            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3224            self.assertIsInstance(cmsg_data, bytes)
3225            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
3226            fds.frombytes(cmsg_data)
3227
3228        self.assertEqual(len(fds), numfds)
3229        self.checkFDs(fds)
3230
3231    def testFDPassSimple(self):
3232        # Pass a single FD (array read from bytes object).
3233        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
3234                                               len(MSG), 10240))
3235
3236    def _testFDPassSimple(self):
3237        self.assertEqual(
3238            self.sendmsgToServer(
3239                [MSG],
3240                [(socket.SOL_SOCKET,
3241                  socket.SCM_RIGHTS,
3242                  array.array("i", self.newFDs(1)).tobytes())]),
3243            len(MSG))
3244
3245    def testMultipleFDPass(self):
3246        # Pass multiple FDs in a single array.
3247        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
3248                                               len(MSG), 10240))
3249
3250    def _testMultipleFDPass(self):
3251        self.createAndSendFDs(4)
3252
3253    @requireAttrs(socket, "CMSG_SPACE")
3254    def testFDPassCMSG_SPACE(self):
3255        # Test using CMSG_SPACE() to calculate ancillary buffer size.
3256        self.checkRecvmsgFDs(
3257            4, self.doRecvmsg(self.serv_sock, len(MSG),
3258                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
3259
3260    @testFDPassCMSG_SPACE.client_skip
3261    def _testFDPassCMSG_SPACE(self):
3262        self.createAndSendFDs(4)
3263
3264    def testFDPassCMSG_LEN(self):
3265        # Test using CMSG_LEN() to calculate ancillary buffer size.
3266        self.checkRecvmsgFDs(1,
3267                             self.doRecvmsg(self.serv_sock, len(MSG),
3268                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
3269                             # RFC 3542 says implementations may set
3270                             # MSG_CTRUNC if there isn't enough space
3271                             # for trailing padding.
3272                             ignoreflags=socket.MSG_CTRUNC)
3273
3274    def _testFDPassCMSG_LEN(self):
3275        self.createAndSendFDs(1)
3276
3277    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3278    @unittest.skipIf(AIX, "skipping, see issue #22397")
3279    @requireAttrs(socket, "CMSG_SPACE")
3280    def testFDPassSeparate(self):
3281        # Pass two FDs in two separate arrays.  Arrays may be combined
3282        # into a single control message by the OS.
3283        self.checkRecvmsgFDs(2,
3284                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
3285                             maxcmsgs=2)
3286
3287    @testFDPassSeparate.client_skip
3288    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3289    @unittest.skipIf(AIX, "skipping, see issue #22397")
3290    def _testFDPassSeparate(self):
3291        fd0, fd1 = self.newFDs(2)
3292        self.assertEqual(
3293            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3294                                          socket.SCM_RIGHTS,
3295                                          array.array("i", [fd0])),
3296                                         (socket.SOL_SOCKET,
3297                                          socket.SCM_RIGHTS,
3298                                          array.array("i", [fd1]))]),
3299            len(MSG))
3300
3301    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3302    @unittest.skipIf(AIX, "skipping, see issue #22397")
3303    @requireAttrs(socket, "CMSG_SPACE")
3304    def testFDPassSeparateMinSpace(self):
3305        # Pass two FDs in two separate arrays, receiving them into the
3306        # minimum space for two arrays.
3307        num_fds = 2
3308        self.checkRecvmsgFDs(num_fds,
3309                             self.doRecvmsg(self.serv_sock, len(MSG),
3310                                            socket.CMSG_SPACE(SIZEOF_INT) +
3311                                            socket.CMSG_LEN(SIZEOF_INT * num_fds)),
3312                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
3313
3314    @testFDPassSeparateMinSpace.client_skip
3315    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3316    @unittest.skipIf(AIX, "skipping, see issue #22397")
3317    def _testFDPassSeparateMinSpace(self):
3318        fd0, fd1 = self.newFDs(2)
3319        self.assertEqual(
3320            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3321                                          socket.SCM_RIGHTS,
3322                                          array.array("i", [fd0])),
3323                                         (socket.SOL_SOCKET,
3324                                          socket.SCM_RIGHTS,
3325                                          array.array("i", [fd1]))]),
3326            len(MSG))
3327
3328    def sendAncillaryIfPossible(self, msg, ancdata):
3329        # Try to send msg and ancdata to server, but if the system
3330        # call fails, just send msg with no ancillary data.
3331        try:
3332            nbytes = self.sendmsgToServer([msg], ancdata)
3333        except OSError as e:
3334            # Check that it was the system call that failed
3335            self.assertIsInstance(e.errno, int)
3336            nbytes = self.sendmsgToServer([msg])
3337        self.assertEqual(nbytes, len(msg))
3338
3339    @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
3340    def testFDPassEmpty(self):
3341        # Try to pass an empty FD array.  Can receive either no array
3342        # or an empty array.
3343        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
3344                                               len(MSG), 10240),
3345                             ignoreflags=socket.MSG_CTRUNC)
3346
3347    def _testFDPassEmpty(self):
3348        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
3349                                            socket.SCM_RIGHTS,
3350                                            b"")])
3351
3352    def testFDPassPartialInt(self):
3353        # Try to pass a truncated FD array.
3354        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3355                                                   len(MSG), 10240)
3356        self.assertEqual(msg, MSG)
3357        self.checkRecvmsgAddress(addr, self.cli_addr)
3358        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3359        self.assertLessEqual(len(ancdata), 1)
3360        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3361            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3362            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3363            self.assertLess(len(cmsg_data), SIZEOF_INT)
3364
3365    def _testFDPassPartialInt(self):
3366        self.sendAncillaryIfPossible(
3367            MSG,
3368            [(socket.SOL_SOCKET,
3369              socket.SCM_RIGHTS,
3370              array.array("i", [self.badfd]).tobytes()[:-1])])
3371
3372    @requireAttrs(socket, "CMSG_SPACE")
3373    def testFDPassPartialIntInMiddle(self):
3374        # Try to pass two FD arrays, the first of which is truncated.
3375        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3376                                                   len(MSG), 10240)
3377        self.assertEqual(msg, MSG)
3378        self.checkRecvmsgAddress(addr, self.cli_addr)
3379        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3380        self.assertLessEqual(len(ancdata), 2)
3381        fds = array.array("i")
3382        # Arrays may have been combined in a single control message
3383        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3384            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3385            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3386            fds.frombytes(cmsg_data[:
3387                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3388        self.assertLessEqual(len(fds), 2)
3389        self.checkFDs(fds)
3390
3391    @testFDPassPartialIntInMiddle.client_skip
3392    def _testFDPassPartialIntInMiddle(self):
3393        fd0, fd1 = self.newFDs(2)
3394        self.sendAncillaryIfPossible(
3395            MSG,
3396            [(socket.SOL_SOCKET,
3397              socket.SCM_RIGHTS,
3398              array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
3399             (socket.SOL_SOCKET,
3400              socket.SCM_RIGHTS,
3401              array.array("i", [fd1]))])
3402
3403    def checkTruncatedHeader(self, result, ignoreflags=0):
3404        # Check that no ancillary data items are returned when data is
3405        # truncated inside the cmsghdr structure.
3406        msg, ancdata, flags, addr = result
3407        self.assertEqual(msg, MSG)
3408        self.checkRecvmsgAddress(addr, self.cli_addr)
3409        self.assertEqual(ancdata, [])
3410        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3411                        ignore=ignoreflags)
3412
3413    def testCmsgTruncNoBufSize(self):
3414        # Check that no ancillary data is received when no buffer size
3415        # is specified.
3416        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
3417                                  # BSD seems to set MSG_CTRUNC only
3418                                  # if an item has been partially
3419                                  # received.
3420                                  ignoreflags=socket.MSG_CTRUNC)
3421
3422    def _testCmsgTruncNoBufSize(self):
3423        self.createAndSendFDs(1)
3424
3425    def testCmsgTrunc0(self):
3426        # Check that no ancillary data is received when buffer size is 0.
3427        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
3428                                  ignoreflags=socket.MSG_CTRUNC)
3429
3430    def _testCmsgTrunc0(self):
3431        self.createAndSendFDs(1)
3432
3433    # Check that no ancillary data is returned for various non-zero
3434    # (but still too small) buffer sizes.
3435
3436    def testCmsgTrunc1(self):
3437        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
3438
3439    def _testCmsgTrunc1(self):
3440        self.createAndSendFDs(1)
3441
3442    def testCmsgTrunc2Int(self):
3443        # The cmsghdr structure has at least three members, two of
3444        # which are ints, so we still shouldn't see any ancillary
3445        # data.
3446        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3447                                                 SIZEOF_INT * 2))
3448
3449    def _testCmsgTrunc2Int(self):
3450        self.createAndSendFDs(1)
3451
3452    def testCmsgTruncLen0Minus1(self):
3453        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3454                                                 socket.CMSG_LEN(0) - 1))
3455
3456    def _testCmsgTruncLen0Minus1(self):
3457        self.createAndSendFDs(1)
3458
3459    # The following tests try to truncate the control message in the
3460    # middle of the FD array.
3461
3462    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3463        # Check that file descriptor data is truncated to between
3464        # mindata and maxdata bytes when received with buffer size
3465        # ancbuf, and that any complete file descriptor numbers are
3466        # valid.
3467        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3468                                                   len(MSG), ancbuf)
3469        self.assertEqual(msg, MSG)
3470        self.checkRecvmsgAddress(addr, self.cli_addr)
3471        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3472
3473        if mindata == 0 and ancdata == []:
3474            return
3475        self.assertEqual(len(ancdata), 1)
3476        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3477        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3478        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3479        self.assertGreaterEqual(len(cmsg_data), mindata)
3480        self.assertLessEqual(len(cmsg_data), maxdata)
3481        fds = array.array("i")
3482        fds.frombytes(cmsg_data[:
3483                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3484        self.checkFDs(fds)
3485
3486    def testCmsgTruncLen0(self):
3487        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3488
3489    def _testCmsgTruncLen0(self):
3490        self.createAndSendFDs(1)
3491
3492    def testCmsgTruncLen0Plus1(self):
3493        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3494
3495    def _testCmsgTruncLen0Plus1(self):
3496        self.createAndSendFDs(2)
3497
3498    def testCmsgTruncLen1(self):
3499        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3500                                 maxdata=SIZEOF_INT)
3501
3502    def _testCmsgTruncLen1(self):
3503        self.createAndSendFDs(2)
3504
3505    def testCmsgTruncLen2Minus1(self):
3506        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3507                                 maxdata=(2 * SIZEOF_INT) - 1)
3508
3509    def _testCmsgTruncLen2Minus1(self):
3510        self.createAndSendFDs(2)
3511
3512
3513class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3514    # Test sendmsg() and recvmsg[_into]() using the ancillary data
3515    # features of the RFC 3542 Advanced Sockets API for IPv6.
3516    # Currently we can only handle certain data items (e.g. traffic
3517    # class, hop limit, MTU discovery and fragmentation settings)
3518    # without resorting to unportable means such as the struct module,
3519    # but the tests here are aimed at testing the ancillary data
3520    # handling in sendmsg() and recvmsg() rather than the IPv6 API
3521    # itself.
3522
3523    # Test value to use when setting hop limit of packet
3524    hop_limit = 2
3525
3526    # Test value to use when setting traffic class of packet.
3527    # -1 means "use kernel default".
3528    traffic_class = -1
3529
3530    def ancillaryMapping(self, ancdata):
3531        # Given ancillary data list ancdata, return a mapping from
3532        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3533        # Check that no (level, type) pair appears more than once.
3534        d = {}
3535        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3536            self.assertNotIn((cmsg_level, cmsg_type), d)
3537            d[(cmsg_level, cmsg_type)] = cmsg_data
3538        return d
3539
3540    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3541        # Receive hop limit into ancbufsize bytes of ancillary data
3542        # space.  Check that data is MSG, ancillary data is not
3543        # truncated (but ignore any flags in ignoreflags), and hop
3544        # limit is between 0 and maxhop inclusive.
3545        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3546                                  socket.IPV6_RECVHOPLIMIT, 1)
3547        self.misc_event.set()
3548        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3549                                                   len(MSG), ancbufsize)
3550
3551        self.assertEqual(msg, MSG)
3552        self.checkRecvmsgAddress(addr, self.cli_addr)
3553        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3554                        ignore=ignoreflags)
3555
3556        self.assertEqual(len(ancdata), 1)
3557        self.assertIsInstance(ancdata[0], tuple)
3558        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3559        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3560        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3561        self.assertIsInstance(cmsg_data, bytes)
3562        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3563        a = array.array("i")
3564        a.frombytes(cmsg_data)
3565        self.assertGreaterEqual(a[0], 0)
3566        self.assertLessEqual(a[0], maxhop)
3567
3568    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3569    def testRecvHopLimit(self):
3570        # Test receiving the packet hop limit as ancillary data.
3571        self.checkHopLimit(ancbufsize=10240)
3572
3573    @testRecvHopLimit.client_skip
3574    def _testRecvHopLimit(self):
3575        # Need to wait until server has asked to receive ancillary
3576        # data, as implementations are not required to buffer it
3577        # otherwise.
3578        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3579        self.sendToServer(MSG)
3580
3581    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3582    def testRecvHopLimitCMSG_SPACE(self):
3583        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3584        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3585
3586    @testRecvHopLimitCMSG_SPACE.client_skip
3587    def _testRecvHopLimitCMSG_SPACE(self):
3588        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3589        self.sendToServer(MSG)
3590
3591    # Could test receiving into buffer sized using CMSG_LEN, but RFC
3592    # 3542 says portable applications must provide space for trailing
3593    # padding.  Implementations may set MSG_CTRUNC if there isn't
3594    # enough space for the padding.
3595
3596    @requireAttrs(socket.socket, "sendmsg")
3597    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3598    def testSetHopLimit(self):
3599        # Test setting hop limit on outgoing packet and receiving it
3600        # at the other end.
3601        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3602
3603    @testSetHopLimit.client_skip
3604    def _testSetHopLimit(self):
3605        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3606        self.assertEqual(
3607            self.sendmsgToServer([MSG],
3608                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3609                                   array.array("i", [self.hop_limit]))]),
3610            len(MSG))
3611
3612    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3613                                     ignoreflags=0):
3614        # Receive traffic class and hop limit into ancbufsize bytes of
3615        # ancillary data space.  Check that data is MSG, ancillary
3616        # data is not truncated (but ignore any flags in ignoreflags),
3617        # and traffic class and hop limit are in range (hop limit no
3618        # more than maxhop).
3619        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3620                                  socket.IPV6_RECVHOPLIMIT, 1)
3621        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3622                                  socket.IPV6_RECVTCLASS, 1)
3623        self.misc_event.set()
3624        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3625                                                   len(MSG), ancbufsize)
3626
3627        self.assertEqual(msg, MSG)
3628        self.checkRecvmsgAddress(addr, self.cli_addr)
3629        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3630                        ignore=ignoreflags)
3631        self.assertEqual(len(ancdata), 2)
3632        ancmap = self.ancillaryMapping(ancdata)
3633
3634        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3635        self.assertEqual(len(tcdata), SIZEOF_INT)
3636        a = array.array("i")
3637        a.frombytes(tcdata)
3638        self.assertGreaterEqual(a[0], 0)
3639        self.assertLessEqual(a[0], 255)
3640
3641        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3642        self.assertEqual(len(hldata), SIZEOF_INT)
3643        a = array.array("i")
3644        a.frombytes(hldata)
3645        self.assertGreaterEqual(a[0], 0)
3646        self.assertLessEqual(a[0], maxhop)
3647
3648    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3649                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3650    def testRecvTrafficClassAndHopLimit(self):
3651        # Test receiving traffic class and hop limit as ancillary data.
3652        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3653
3654    @testRecvTrafficClassAndHopLimit.client_skip
3655    def _testRecvTrafficClassAndHopLimit(self):
3656        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3657        self.sendToServer(MSG)
3658
3659    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3660                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3661    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3662        # Test receiving traffic class and hop limit, using
3663        # CMSG_SPACE() to calculate buffer size.
3664        self.checkTrafficClassAndHopLimit(
3665            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3666
3667    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3668    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3669        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3670        self.sendToServer(MSG)
3671
3672    @requireAttrs(socket.socket, "sendmsg")
3673    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3674                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3675    def testSetTrafficClassAndHopLimit(self):
3676        # Test setting traffic class and hop limit on outgoing packet,
3677        # and receiving them at the other end.
3678        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3679                                          maxhop=self.hop_limit)
3680
3681    @testSetTrafficClassAndHopLimit.client_skip
3682    def _testSetTrafficClassAndHopLimit(self):
3683        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3684        self.assertEqual(
3685            self.sendmsgToServer([MSG],
3686                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3687                                   array.array("i", [self.traffic_class])),
3688                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3689                                   array.array("i", [self.hop_limit]))]),
3690            len(MSG))
3691
3692    @requireAttrs(socket.socket, "sendmsg")
3693    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3694                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3695    def testOddCmsgSize(self):
3696        # Try to send ancillary data with first item one byte too
3697        # long.  Fall back to sending with correct size if this fails,
3698        # and check that second item was handled correctly.
3699        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3700                                          maxhop=self.hop_limit)
3701
3702    @testOddCmsgSize.client_skip
3703    def _testOddCmsgSize(self):
3704        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3705        try:
3706            nbytes = self.sendmsgToServer(
3707                [MSG],
3708                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3709                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
3710                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3711                  array.array("i", [self.hop_limit]))])
3712        except OSError as e:
3713            self.assertIsInstance(e.errno, int)
3714            nbytes = self.sendmsgToServer(
3715                [MSG],
3716                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3717                  array.array("i", [self.traffic_class])),
3718                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3719                  array.array("i", [self.hop_limit]))])
3720            self.assertEqual(nbytes, len(MSG))
3721
3722    # Tests for proper handling of truncated ancillary data
3723
3724    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
3725        # Receive hop limit into ancbufsize bytes of ancillary data
3726        # space, which should be too small to contain the ancillary
3727        # data header (if ancbufsize is None, pass no second argument
3728        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
3729        # (unless included in ignoreflags), and no ancillary data is
3730        # returned.
3731        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3732                                  socket.IPV6_RECVHOPLIMIT, 1)
3733        self.misc_event.set()
3734        args = () if ancbufsize is None else (ancbufsize,)
3735        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3736                                                   len(MSG), *args)
3737
3738        self.assertEqual(msg, MSG)
3739        self.checkRecvmsgAddress(addr, self.cli_addr)
3740        self.assertEqual(ancdata, [])
3741        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3742                        ignore=ignoreflags)
3743
3744    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3745    def testCmsgTruncNoBufSize(self):
3746        # Check that no ancillary data is received when no ancillary
3747        # buffer size is provided.
3748        self.checkHopLimitTruncatedHeader(ancbufsize=None,
3749                                          # BSD seems to set
3750                                          # MSG_CTRUNC only if an item
3751                                          # has been partially
3752                                          # received.
3753                                          ignoreflags=socket.MSG_CTRUNC)
3754
3755    @testCmsgTruncNoBufSize.client_skip
3756    def _testCmsgTruncNoBufSize(self):
3757        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3758        self.sendToServer(MSG)
3759
3760    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3761    def testSingleCmsgTrunc0(self):
3762        # Check that no ancillary data is received when ancillary
3763        # buffer size is zero.
3764        self.checkHopLimitTruncatedHeader(ancbufsize=0,
3765                                          ignoreflags=socket.MSG_CTRUNC)
3766
3767    @testSingleCmsgTrunc0.client_skip
3768    def _testSingleCmsgTrunc0(self):
3769        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3770        self.sendToServer(MSG)
3771
3772    # Check that no ancillary data is returned for various non-zero
3773    # (but still too small) buffer sizes.
3774
3775    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3776    def testSingleCmsgTrunc1(self):
3777        self.checkHopLimitTruncatedHeader(ancbufsize=1)
3778
3779    @testSingleCmsgTrunc1.client_skip
3780    def _testSingleCmsgTrunc1(self):
3781        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3782        self.sendToServer(MSG)
3783
3784    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3785    def testSingleCmsgTrunc2Int(self):
3786        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
3787
3788    @testSingleCmsgTrunc2Int.client_skip
3789    def _testSingleCmsgTrunc2Int(self):
3790        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3791        self.sendToServer(MSG)
3792
3793    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3794    def testSingleCmsgTruncLen0Minus1(self):
3795        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
3796
3797    @testSingleCmsgTruncLen0Minus1.client_skip
3798    def _testSingleCmsgTruncLen0Minus1(self):
3799        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3800        self.sendToServer(MSG)
3801
3802    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3803    def testSingleCmsgTruncInData(self):
3804        # Test truncation of a control message inside its associated
3805        # data.  The message may be returned with its data truncated,
3806        # or not returned at all.
3807        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3808                                  socket.IPV6_RECVHOPLIMIT, 1)
3809        self.misc_event.set()
3810        msg, ancdata, flags, addr = self.doRecvmsg(
3811            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
3812
3813        self.assertEqual(msg, MSG)
3814        self.checkRecvmsgAddress(addr, self.cli_addr)
3815        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3816
3817        self.assertLessEqual(len(ancdata), 1)
3818        if ancdata:
3819            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3820            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3821            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3822            self.assertLess(len(cmsg_data), SIZEOF_INT)
3823
3824    @testSingleCmsgTruncInData.client_skip
3825    def _testSingleCmsgTruncInData(self):
3826        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3827        self.sendToServer(MSG)
3828
3829    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
3830        # Receive traffic class and hop limit into ancbufsize bytes of
3831        # ancillary data space, which should be large enough to
3832        # contain the first item, but too small to contain the header
3833        # of the second.  Check that data is MSG, MSG_CTRUNC is set
3834        # (unless included in ignoreflags), and only one ancillary
3835        # data item is returned.
3836        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3837                                  socket.IPV6_RECVHOPLIMIT, 1)
3838        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3839                                  socket.IPV6_RECVTCLASS, 1)
3840        self.misc_event.set()
3841        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3842                                                   len(MSG), ancbufsize)
3843
3844        self.assertEqual(msg, MSG)
3845        self.checkRecvmsgAddress(addr, self.cli_addr)
3846        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3847                        ignore=ignoreflags)
3848
3849        self.assertEqual(len(ancdata), 1)
3850        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3851        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3852        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
3853        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3854        a = array.array("i")
3855        a.frombytes(cmsg_data)
3856        self.assertGreaterEqual(a[0], 0)
3857        self.assertLessEqual(a[0], 255)
3858
3859    # Try the above test with various buffer sizes.
3860
3861    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3862                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3863    def testSecondCmsgTrunc0(self):
3864        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
3865                                        ignoreflags=socket.MSG_CTRUNC)
3866
3867    @testSecondCmsgTrunc0.client_skip
3868    def _testSecondCmsgTrunc0(self):
3869        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3870        self.sendToServer(MSG)
3871
3872    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3873                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3874    def testSecondCmsgTrunc1(self):
3875        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
3876
3877    @testSecondCmsgTrunc1.client_skip
3878    def _testSecondCmsgTrunc1(self):
3879        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3880        self.sendToServer(MSG)
3881
3882    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3883                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3884    def testSecondCmsgTrunc2Int(self):
3885        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3886                                        2 * SIZEOF_INT)
3887
3888    @testSecondCmsgTrunc2Int.client_skip
3889    def _testSecondCmsgTrunc2Int(self):
3890        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3891        self.sendToServer(MSG)
3892
3893    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3894                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3895    def testSecondCmsgTruncLen0Minus1(self):
3896        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3897                                        socket.CMSG_LEN(0) - 1)
3898
3899    @testSecondCmsgTruncLen0Minus1.client_skip
3900    def _testSecondCmsgTruncLen0Minus1(self):
3901        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3902        self.sendToServer(MSG)
3903
3904    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3905                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3906    def testSecomdCmsgTruncInData(self):
3907        # Test truncation of the second of two control messages inside
3908        # its associated data.
3909        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3910                                  socket.IPV6_RECVHOPLIMIT, 1)
3911        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3912                                  socket.IPV6_RECVTCLASS, 1)
3913        self.misc_event.set()
3914        msg, ancdata, flags, addr = self.doRecvmsg(
3915            self.serv_sock, len(MSG),
3916            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
3917
3918        self.assertEqual(msg, MSG)
3919        self.checkRecvmsgAddress(addr, self.cli_addr)
3920        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3921
3922        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
3923
3924        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3925        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3926        cmsg_types.remove(cmsg_type)
3927        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3928        a = array.array("i")
3929        a.frombytes(cmsg_data)
3930        self.assertGreaterEqual(a[0], 0)
3931        self.assertLessEqual(a[0], 255)
3932
3933        if ancdata:
3934            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3935            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3936            cmsg_types.remove(cmsg_type)
3937            self.assertLess(len(cmsg_data), SIZEOF_INT)
3938
3939        self.assertEqual(ancdata, [])
3940
3941    @testSecomdCmsgTruncInData.client_skip
3942    def _testSecomdCmsgTruncInData(self):
3943        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3944        self.sendToServer(MSG)
3945
3946
3947# Derive concrete test classes for different socket types.
3948
3949class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
3950                             SendrecvmsgConnectionlessBase,
3951                             ThreadedSocketTestMixin, UDPTestBase):
3952    pass
3953
3954@requireAttrs(socket.socket, "sendmsg")
3955class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
3956    pass
3957
3958@requireAttrs(socket.socket, "recvmsg")
3959class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
3960    pass
3961
3962@requireAttrs(socket.socket, "recvmsg_into")
3963class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
3964    pass
3965
3966
3967class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
3968                              SendrecvmsgConnectionlessBase,
3969                              ThreadedSocketTestMixin, UDP6TestBase):
3970
3971    def checkRecvmsgAddress(self, addr1, addr2):
3972        # Called to compare the received address with the address of
3973        # the peer, ignoring scope ID
3974        self.assertEqual(addr1[:-1], addr2[:-1])
3975
3976@requireAttrs(socket.socket, "sendmsg")
3977@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3978@requireSocket("AF_INET6", "SOCK_DGRAM")
3979class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
3980    pass
3981
3982@requireAttrs(socket.socket, "recvmsg")
3983@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3984@requireSocket("AF_INET6", "SOCK_DGRAM")
3985class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
3986    pass
3987
3988@requireAttrs(socket.socket, "recvmsg_into")
3989@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3990@requireSocket("AF_INET6", "SOCK_DGRAM")
3991class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
3992    pass
3993
3994@requireAttrs(socket.socket, "recvmsg")
3995@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3996@requireAttrs(socket, "IPPROTO_IPV6")
3997@requireSocket("AF_INET6", "SOCK_DGRAM")
3998class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
3999                                      SendrecvmsgUDP6TestBase):
4000    pass
4001
4002@requireAttrs(socket.socket, "recvmsg_into")
4003@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
4004@requireAttrs(socket, "IPPROTO_IPV6")
4005@requireSocket("AF_INET6", "SOCK_DGRAM")
4006class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
4007                                          RFC3542AncillaryTest,
4008                                          SendrecvmsgUDP6TestBase):
4009    pass
4010
4011
4012class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
4013                             ConnectedStreamTestMixin, TCPTestBase):
4014    pass
4015
4016@requireAttrs(socket.socket, "sendmsg")
4017class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
4018    pass
4019
4020@requireAttrs(socket.socket, "recvmsg")
4021class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
4022                     SendrecvmsgTCPTestBase):
4023    pass
4024
4025@requireAttrs(socket.socket, "recvmsg_into")
4026class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4027                         SendrecvmsgTCPTestBase):
4028    pass
4029
4030
4031class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
4032                                    SendrecvmsgConnectedBase,
4033                                    ConnectedStreamTestMixin, SCTPStreamBase):
4034    pass
4035
4036@requireAttrs(socket.socket, "sendmsg")
4037@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4038@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4039class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
4040    pass
4041
4042@requireAttrs(socket.socket, "recvmsg")
4043@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4044@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4045class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4046                            SendrecvmsgSCTPStreamTestBase):
4047
4048    def testRecvmsgEOF(self):
4049        try:
4050            super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
4051        except OSError as e:
4052            if e.errno != errno.ENOTCONN:
4053                raise
4054            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4055
4056@requireAttrs(socket.socket, "recvmsg_into")
4057@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4058@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4059class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4060                                SendrecvmsgSCTPStreamTestBase):
4061
4062    def testRecvmsgEOF(self):
4063        try:
4064            super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
4065        except OSError as e:
4066            if e.errno != errno.ENOTCONN:
4067                raise
4068            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4069
4070
4071class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
4072                                    ConnectedStreamTestMixin, UnixStreamBase):
4073    pass
4074
4075@requireAttrs(socket.socket, "sendmsg")
4076@requireAttrs(socket, "AF_UNIX")
4077class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
4078    pass
4079
4080@requireAttrs(socket.socket, "recvmsg")
4081@requireAttrs(socket, "AF_UNIX")
4082class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4083                            SendrecvmsgUnixStreamTestBase):
4084    pass
4085
4086@requireAttrs(socket.socket, "recvmsg_into")
4087@requireAttrs(socket, "AF_UNIX")
4088class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4089                                SendrecvmsgUnixStreamTestBase):
4090    pass
4091
4092@requireAttrs(socket.socket, "sendmsg", "recvmsg")
4093@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4094class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
4095    pass
4096
4097@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
4098@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4099class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
4100                                     SendrecvmsgUnixStreamTestBase):
4101    pass
4102
4103
4104# Test interrupting the interruptible send/receive methods with a
4105# signal when a timeout is set.  These tests avoid having multiple
4106# threads alive during the test so that the OS cannot deliver the
4107# signal to the wrong one.
4108
4109class InterruptedTimeoutBase(unittest.TestCase):
4110    # Base class for interrupted send/receive tests.  Installs an
4111    # empty handler for SIGALRM and removes it on teardown, along with
4112    # any scheduled alarms.
4113
4114    def setUp(self):
4115        super().setUp()
4116        orig_alrm_handler = signal.signal(signal.SIGALRM,
4117                                          lambda signum, frame: 1 / 0)
4118        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
4119
4120    # Timeout for socket operations
4121    timeout = 4.0
4122
4123    # Provide setAlarm() method to schedule delivery of SIGALRM after
4124    # given number of seconds, or cancel it if zero, and an
4125    # appropriate time value to use.  Use setitimer() if available.
4126    if hasattr(signal, "setitimer"):
4127        alarm_time = 0.05
4128
4129        def setAlarm(self, seconds):
4130            signal.setitimer(signal.ITIMER_REAL, seconds)
4131    else:
4132        # Old systems may deliver the alarm up to one second early
4133        alarm_time = 2
4134
4135        def setAlarm(self, seconds):
4136            signal.alarm(seconds)
4137
4138
4139# Require siginterrupt() in order to ensure that system calls are
4140# interrupted by default.
4141@requireAttrs(signal, "siginterrupt")
4142@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4143                     "Don't have signal.alarm or signal.setitimer")
4144class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
4145    # Test interrupting the recv*() methods with signals when a
4146    # timeout is set.
4147
4148    def setUp(self):
4149        super().setUp()
4150        self.serv.settimeout(self.timeout)
4151
4152    def checkInterruptedRecv(self, func, *args, **kwargs):
4153        # Check that func(*args, **kwargs) raises
4154        # errno of EINTR when interrupted by a signal.
4155        try:
4156            self.setAlarm(self.alarm_time)
4157            with self.assertRaises(ZeroDivisionError) as cm:
4158                func(*args, **kwargs)
4159        finally:
4160            self.setAlarm(0)
4161
4162    def testInterruptedRecvTimeout(self):
4163        self.checkInterruptedRecv(self.serv.recv, 1024)
4164
4165    def testInterruptedRecvIntoTimeout(self):
4166        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
4167
4168    def testInterruptedRecvfromTimeout(self):
4169        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
4170
4171    def testInterruptedRecvfromIntoTimeout(self):
4172        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
4173
4174    @requireAttrs(socket.socket, "recvmsg")
4175    def testInterruptedRecvmsgTimeout(self):
4176        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
4177
4178    @requireAttrs(socket.socket, "recvmsg_into")
4179    def testInterruptedRecvmsgIntoTimeout(self):
4180        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
4181
4182
4183# Require siginterrupt() in order to ensure that system calls are
4184# interrupted by default.
4185@requireAttrs(signal, "siginterrupt")
4186@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4187                     "Don't have signal.alarm or signal.setitimer")
4188class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
4189                                 ThreadSafeCleanupTestCase,
4190                                 SocketListeningTestMixin, TCPTestBase):
4191    # Test interrupting the interruptible send*() methods with signals
4192    # when a timeout is set.
4193
4194    def setUp(self):
4195        super().setUp()
4196        self.serv_conn = self.newSocket()
4197        self.addCleanup(self.serv_conn.close)
4198        # Use a thread to complete the connection, but wait for it to
4199        # terminate before running the test, so that there is only one
4200        # thread to accept the signal.
4201        cli_thread = threading.Thread(target=self.doConnect)
4202        cli_thread.start()
4203        self.cli_conn, addr = self.serv.accept()
4204        self.addCleanup(self.cli_conn.close)
4205        cli_thread.join()
4206        self.serv_conn.settimeout(self.timeout)
4207
4208    def doConnect(self):
4209        self.serv_conn.connect(self.serv_addr)
4210
4211    def checkInterruptedSend(self, func, *args, **kwargs):
4212        # Check that func(*args, **kwargs), run in a loop, raises
4213        # OSError with an errno of EINTR when interrupted by a
4214        # signal.
4215        try:
4216            with self.assertRaises(ZeroDivisionError) as cm:
4217                while True:
4218                    self.setAlarm(self.alarm_time)
4219                    func(*args, **kwargs)
4220        finally:
4221            self.setAlarm(0)
4222
4223    # Issue #12958: The following tests have problems on OS X prior to 10.7
4224    @support.requires_mac_ver(10, 7)
4225    def testInterruptedSendTimeout(self):
4226        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
4227
4228    @support.requires_mac_ver(10, 7)
4229    def testInterruptedSendtoTimeout(self):
4230        # Passing an actual address here as Python's wrapper for
4231        # sendto() doesn't allow passing a zero-length one; POSIX
4232        # requires that the address is ignored since the socket is
4233        # connection-mode, however.
4234        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
4235                                  self.serv_addr)
4236
4237    @support.requires_mac_ver(10, 7)
4238    @requireAttrs(socket.socket, "sendmsg")
4239    def testInterruptedSendmsgTimeout(self):
4240        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
4241
4242
4243class TCPCloserTest(ThreadedTCPSocketTest):
4244
4245    def testClose(self):
4246        conn, addr = self.serv.accept()
4247        conn.close()
4248
4249        sd = self.cli
4250        read, write, err = select.select([sd], [], [], 1.0)
4251        self.assertEqual(read, [sd])
4252        self.assertEqual(sd.recv(1), b'')
4253
4254        # Calling close() many times should be safe.
4255        conn.close()
4256        conn.close()
4257
4258    def _testClose(self):
4259        self.cli.connect((HOST, self.port))
4260        time.sleep(1.0)
4261
4262
4263class BasicSocketPairTest(SocketPairTest):
4264
4265    def __init__(self, methodName='runTest'):
4266        SocketPairTest.__init__(self, methodName=methodName)
4267
4268    def _check_defaults(self, sock):
4269        self.assertIsInstance(sock, socket.socket)
4270        if hasattr(socket, 'AF_UNIX'):
4271            self.assertEqual(sock.family, socket.AF_UNIX)
4272        else:
4273            self.assertEqual(sock.family, socket.AF_INET)
4274        self.assertEqual(sock.type, socket.SOCK_STREAM)
4275        self.assertEqual(sock.proto, 0)
4276
4277    def _testDefaults(self):
4278        self._check_defaults(self.cli)
4279
4280    def testDefaults(self):
4281        self._check_defaults(self.serv)
4282
4283    def testRecv(self):
4284        msg = self.serv.recv(1024)
4285        self.assertEqual(msg, MSG)
4286
4287    def _testRecv(self):
4288        self.cli.send(MSG)
4289
4290    def testSend(self):
4291        self.serv.send(MSG)
4292
4293    def _testSend(self):
4294        msg = self.cli.recv(1024)
4295        self.assertEqual(msg, MSG)
4296
4297
4298class NonBlockingTCPTests(ThreadedTCPSocketTest):
4299
4300    def __init__(self, methodName='runTest'):
4301        self.event = threading.Event()
4302        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
4303
4304    def assert_sock_timeout(self, sock, timeout):
4305        self.assertEqual(self.serv.gettimeout(), timeout)
4306
4307        blocking = (timeout != 0.0)
4308        self.assertEqual(sock.getblocking(), blocking)
4309
4310        if fcntl is not None:
4311            # When a Python socket has a non-zero timeout, it's switched
4312            # internally to a non-blocking mode. Later, sock.sendall(),
4313            # sock.recv(), and other socket operations use a select() call and
4314            # handle EWOULDBLOCK/EGAIN on all socket operations. That's how
4315            # timeouts are enforced.
4316            fd_blocking = (timeout is None)
4317
4318            flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK)
4319            self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking)
4320
4321    def testSetBlocking(self):
4322        # Test setblocking() and settimeout() methods
4323        self.serv.setblocking(True)
4324        self.assert_sock_timeout(self.serv, None)
4325
4326        self.serv.setblocking(False)
4327        self.assert_sock_timeout(self.serv, 0.0)
4328
4329        self.serv.settimeout(None)
4330        self.assert_sock_timeout(self.serv, None)
4331
4332        self.serv.settimeout(0)
4333        self.assert_sock_timeout(self.serv, 0)
4334
4335        self.serv.settimeout(10)
4336        self.assert_sock_timeout(self.serv, 10)
4337
4338        self.serv.settimeout(0)
4339        self.assert_sock_timeout(self.serv, 0)
4340
4341    def _testSetBlocking(self):
4342        pass
4343
4344    @support.cpython_only
4345    def testSetBlocking_overflow(self):
4346        # Issue 15989
4347        import _testcapi
4348        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
4349            self.skipTest('needs UINT_MAX < ULONG_MAX')
4350
4351        self.serv.setblocking(False)
4352        self.assertEqual(self.serv.gettimeout(), 0.0)
4353
4354        self.serv.setblocking(_testcapi.UINT_MAX + 1)
4355        self.assertIsNone(self.serv.gettimeout())
4356
4357    _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
4358
4359    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
4360                         'test needs socket.SOCK_NONBLOCK')
4361    @support.requires_linux_version(2, 6, 28)
4362    def testInitNonBlocking(self):
4363        # create a socket with SOCK_NONBLOCK
4364        self.serv.close()
4365        self.serv = socket.socket(socket.AF_INET,
4366                                  socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
4367        self.assert_sock_timeout(self.serv, 0)
4368
4369    def _testInitNonBlocking(self):
4370        pass
4371
4372    def testInheritFlagsBlocking(self):
4373        # bpo-7995: accept() on a listening socket with a timeout and the
4374        # default timeout is None, the resulting socket must be blocking.
4375        with socket_setdefaulttimeout(None):
4376            self.serv.settimeout(10)
4377            conn, addr = self.serv.accept()
4378            self.addCleanup(conn.close)
4379            self.assertIsNone(conn.gettimeout())
4380
4381    def _testInheritFlagsBlocking(self):
4382        self.cli.connect((HOST, self.port))
4383
4384    def testInheritFlagsTimeout(self):
4385        # bpo-7995: accept() on a listening socket with a timeout and the
4386        # default timeout is None, the resulting socket must inherit
4387        # the default timeout.
4388        default_timeout = 20.0
4389        with socket_setdefaulttimeout(default_timeout):
4390            self.serv.settimeout(10)
4391            conn, addr = self.serv.accept()
4392            self.addCleanup(conn.close)
4393            self.assertEqual(conn.gettimeout(), default_timeout)
4394
4395    def _testInheritFlagsTimeout(self):
4396        self.cli.connect((HOST, self.port))
4397
4398    def testAccept(self):
4399        # Testing non-blocking accept
4400        self.serv.setblocking(0)
4401
4402        # connect() didn't start: non-blocking accept() fails
4403        start_time = time.monotonic()
4404        with self.assertRaises(BlockingIOError):
4405            conn, addr = self.serv.accept()
4406        dt = time.monotonic() - start_time
4407        self.assertLess(dt, 1.0)
4408
4409        self.event.set()
4410
4411        read, write, err = select.select([self.serv], [], [], MAIN_TIMEOUT)
4412        if self.serv not in read:
4413            self.fail("Error trying to do accept after select.")
4414
4415        # connect() completed: non-blocking accept() doesn't block
4416        conn, addr = self.serv.accept()
4417        self.addCleanup(conn.close)
4418        self.assertIsNone(conn.gettimeout())
4419
4420    def _testAccept(self):
4421        # don't connect before event is set to check
4422        # that non-blocking accept() raises BlockingIOError
4423        self.event.wait()
4424
4425        self.cli.connect((HOST, self.port))
4426
4427    def testRecv(self):
4428        # Testing non-blocking recv
4429        conn, addr = self.serv.accept()
4430        self.addCleanup(conn.close)
4431        conn.setblocking(0)
4432
4433        # the server didn't send data yet: non-blocking recv() fails
4434        with self.assertRaises(BlockingIOError):
4435            msg = conn.recv(len(MSG))
4436
4437        self.event.set()
4438
4439        read, write, err = select.select([conn], [], [], MAIN_TIMEOUT)
4440        if conn not in read:
4441            self.fail("Error during select call to non-blocking socket.")
4442
4443        # the server sent data yet: non-blocking recv() doesn't block
4444        msg = conn.recv(len(MSG))
4445        self.assertEqual(msg, MSG)
4446
4447    def _testRecv(self):
4448        self.cli.connect((HOST, self.port))
4449
4450        # don't send anything before event is set to check
4451        # that non-blocking recv() raises BlockingIOError
4452        self.event.wait()
4453
4454        # send data: recv() will no longer block
4455        self.cli.sendall(MSG)
4456
4457
4458class FileObjectClassTestCase(SocketConnectedTest):
4459    """Unit tests for the object returned by socket.makefile()
4460
4461    self.read_file is the io object returned by makefile() on
4462    the client connection.  You can read from this file to
4463    get output from the server.
4464
4465    self.write_file is the io object returned by makefile() on the
4466    server connection.  You can write to this file to send output
4467    to the client.
4468    """
4469
4470    bufsize = -1 # Use default buffer size
4471    encoding = 'utf-8'
4472    errors = 'strict'
4473    newline = None
4474
4475    read_mode = 'rb'
4476    read_msg = MSG
4477    write_mode = 'wb'
4478    write_msg = MSG
4479
4480    def __init__(self, methodName='runTest'):
4481        SocketConnectedTest.__init__(self, methodName=methodName)
4482
4483    def setUp(self):
4484        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4485            threading.Event() for i in range(4)]
4486        SocketConnectedTest.setUp(self)
4487        self.read_file = self.cli_conn.makefile(
4488            self.read_mode, self.bufsize,
4489            encoding = self.encoding,
4490            errors = self.errors,
4491            newline = self.newline)
4492
4493    def tearDown(self):
4494        self.serv_finished.set()
4495        self.read_file.close()
4496        self.assertTrue(self.read_file.closed)
4497        self.read_file = None
4498        SocketConnectedTest.tearDown(self)
4499
4500    def clientSetUp(self):
4501        SocketConnectedTest.clientSetUp(self)
4502        self.write_file = self.serv_conn.makefile(
4503            self.write_mode, self.bufsize,
4504            encoding = self.encoding,
4505            errors = self.errors,
4506            newline = self.newline)
4507
4508    def clientTearDown(self):
4509        self.cli_finished.set()
4510        self.write_file.close()
4511        self.assertTrue(self.write_file.closed)
4512        self.write_file = None
4513        SocketConnectedTest.clientTearDown(self)
4514
4515    def testReadAfterTimeout(self):
4516        # Issue #7322: A file object must disallow further reads
4517        # after a timeout has occurred.
4518        self.cli_conn.settimeout(1)
4519        self.read_file.read(3)
4520        # First read raises a timeout
4521        self.assertRaises(socket.timeout, self.read_file.read, 1)
4522        # Second read is disallowed
4523        with self.assertRaises(OSError) as ctx:
4524            self.read_file.read(1)
4525        self.assertIn("cannot read from timed out object", str(ctx.exception))
4526
4527    def _testReadAfterTimeout(self):
4528        self.write_file.write(self.write_msg[0:3])
4529        self.write_file.flush()
4530        self.serv_finished.wait()
4531
4532    def testSmallRead(self):
4533        # Performing small file read test
4534        first_seg = self.read_file.read(len(self.read_msg)-3)
4535        second_seg = self.read_file.read(3)
4536        msg = first_seg + second_seg
4537        self.assertEqual(msg, self.read_msg)
4538
4539    def _testSmallRead(self):
4540        self.write_file.write(self.write_msg)
4541        self.write_file.flush()
4542
4543    def testFullRead(self):
4544        # read until EOF
4545        msg = self.read_file.read()
4546        self.assertEqual(msg, self.read_msg)
4547
4548    def _testFullRead(self):
4549        self.write_file.write(self.write_msg)
4550        self.write_file.close()
4551
4552    def testUnbufferedRead(self):
4553        # Performing unbuffered file read test
4554        buf = type(self.read_msg)()
4555        while 1:
4556            char = self.read_file.read(1)
4557            if not char:
4558                break
4559            buf += char
4560        self.assertEqual(buf, self.read_msg)
4561
4562    def _testUnbufferedRead(self):
4563        self.write_file.write(self.write_msg)
4564        self.write_file.flush()
4565
4566    def testReadline(self):
4567        # Performing file readline test
4568        line = self.read_file.readline()
4569        self.assertEqual(line, self.read_msg)
4570
4571    def _testReadline(self):
4572        self.write_file.write(self.write_msg)
4573        self.write_file.flush()
4574
4575    def testCloseAfterMakefile(self):
4576        # The file returned by makefile should keep the socket open.
4577        self.cli_conn.close()
4578        # read until EOF
4579        msg = self.read_file.read()
4580        self.assertEqual(msg, self.read_msg)
4581
4582    def _testCloseAfterMakefile(self):
4583        self.write_file.write(self.write_msg)
4584        self.write_file.flush()
4585
4586    def testMakefileAfterMakefileClose(self):
4587        self.read_file.close()
4588        msg = self.cli_conn.recv(len(MSG))
4589        if isinstance(self.read_msg, str):
4590            msg = msg.decode()
4591        self.assertEqual(msg, self.read_msg)
4592
4593    def _testMakefileAfterMakefileClose(self):
4594        self.write_file.write(self.write_msg)
4595        self.write_file.flush()
4596
4597    def testClosedAttr(self):
4598        self.assertTrue(not self.read_file.closed)
4599
4600    def _testClosedAttr(self):
4601        self.assertTrue(not self.write_file.closed)
4602
4603    def testAttributes(self):
4604        self.assertEqual(self.read_file.mode, self.read_mode)
4605        self.assertEqual(self.read_file.name, self.cli_conn.fileno())
4606
4607    def _testAttributes(self):
4608        self.assertEqual(self.write_file.mode, self.write_mode)
4609        self.assertEqual(self.write_file.name, self.serv_conn.fileno())
4610
4611    def testRealClose(self):
4612        self.read_file.close()
4613        self.assertRaises(ValueError, self.read_file.fileno)
4614        self.cli_conn.close()
4615        self.assertRaises(OSError, self.cli_conn.getsockname)
4616
4617    def _testRealClose(self):
4618        pass
4619
4620
4621class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
4622
4623    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
4624
4625    In this case (and in this case only), it should be possible to
4626    create a file object, read a line from it, create another file
4627    object, read another line from it, without loss of data in the
4628    first file object's buffer.  Note that http.client relies on this
4629    when reading multiple requests from the same socket."""
4630
4631    bufsize = 0 # Use unbuffered mode
4632
4633    def testUnbufferedReadline(self):
4634        # Read a line, create a new file object, read another line with it
4635        line = self.read_file.readline() # first line
4636        self.assertEqual(line, b"A. " + self.write_msg) # first line
4637        self.read_file = self.cli_conn.makefile('rb', 0)
4638        line = self.read_file.readline() # second line
4639        self.assertEqual(line, b"B. " + self.write_msg) # second line
4640
4641    def _testUnbufferedReadline(self):
4642        self.write_file.write(b"A. " + self.write_msg)
4643        self.write_file.write(b"B. " + self.write_msg)
4644        self.write_file.flush()
4645
4646    def testMakefileClose(self):
4647        # The file returned by makefile should keep the socket open...
4648        self.cli_conn.close()
4649        msg = self.cli_conn.recv(1024)
4650        self.assertEqual(msg, self.read_msg)
4651        # ...until the file is itself closed
4652        self.read_file.close()
4653        self.assertRaises(OSError, self.cli_conn.recv, 1024)
4654
4655    def _testMakefileClose(self):
4656        self.write_file.write(self.write_msg)
4657        self.write_file.flush()
4658
4659    def testMakefileCloseSocketDestroy(self):
4660        refcount_before = sys.getrefcount(self.cli_conn)
4661        self.read_file.close()
4662        refcount_after = sys.getrefcount(self.cli_conn)
4663        self.assertEqual(refcount_before - 1, refcount_after)
4664
4665    def _testMakefileCloseSocketDestroy(self):
4666        pass
4667
4668    # Non-blocking ops
4669    # NOTE: to set `read_file` as non-blocking, we must call
4670    # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
4671
4672    def testSmallReadNonBlocking(self):
4673        self.cli_conn.setblocking(False)
4674        self.assertEqual(self.read_file.readinto(bytearray(10)), None)
4675        self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
4676        self.evt1.set()
4677        self.evt2.wait(1.0)
4678        first_seg = self.read_file.read(len(self.read_msg) - 3)
4679        if first_seg is None:
4680            # Data not arrived (can happen under Windows), wait a bit
4681            time.sleep(0.5)
4682            first_seg = self.read_file.read(len(self.read_msg) - 3)
4683        buf = bytearray(10)
4684        n = self.read_file.readinto(buf)
4685        self.assertEqual(n, 3)
4686        msg = first_seg + buf[:n]
4687        self.assertEqual(msg, self.read_msg)
4688        self.assertEqual(self.read_file.readinto(bytearray(16)), None)
4689        self.assertEqual(self.read_file.read(1), None)
4690
4691    def _testSmallReadNonBlocking(self):
4692        self.evt1.wait(1.0)
4693        self.write_file.write(self.write_msg)
4694        self.write_file.flush()
4695        self.evt2.set()
4696        # Avoid closing the socket before the server test has finished,
4697        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
4698        self.serv_finished.wait(5.0)
4699
4700    def testWriteNonBlocking(self):
4701        self.cli_finished.wait(5.0)
4702        # The client thread can't skip directly - the SkipTest exception
4703        # would appear as a failure.
4704        if self.serv_skipped:
4705            self.skipTest(self.serv_skipped)
4706
4707    def _testWriteNonBlocking(self):
4708        self.serv_skipped = None
4709        self.serv_conn.setblocking(False)
4710        # Try to saturate the socket buffer pipe with repeated large writes.
4711        BIG = b"x" * support.SOCK_MAX_SIZE
4712        LIMIT = 10
4713        # The first write() succeeds since a chunk of data can be buffered
4714        n = self.write_file.write(BIG)
4715        self.assertGreater(n, 0)
4716        for i in range(LIMIT):
4717            n = self.write_file.write(BIG)
4718            if n is None:
4719                # Succeeded
4720                break
4721            self.assertGreater(n, 0)
4722        else:
4723            # Let us know that this test didn't manage to establish
4724            # the expected conditions. This is not a failure in itself but,
4725            # if it happens repeatedly, the test should be fixed.
4726            self.serv_skipped = "failed to saturate the socket buffer"
4727
4728
4729class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4730
4731    bufsize = 1 # Default-buffered for reading; line-buffered for writing
4732
4733
4734class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4735
4736    bufsize = 2 # Exercise the buffering code
4737
4738
4739class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
4740    """Tests for socket.makefile() in text mode (rather than binary)"""
4741
4742    read_mode = 'r'
4743    read_msg = MSG.decode('utf-8')
4744    write_mode = 'wb'
4745    write_msg = MSG
4746    newline = ''
4747
4748
4749class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
4750    """Tests for socket.makefile() in text mode (rather than binary)"""
4751
4752    read_mode = 'rb'
4753    read_msg = MSG
4754    write_mode = 'w'
4755    write_msg = MSG.decode('utf-8')
4756    newline = ''
4757
4758
4759class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
4760    """Tests for socket.makefile() in text mode (rather than binary)"""
4761
4762    read_mode = 'r'
4763    read_msg = MSG.decode('utf-8')
4764    write_mode = 'w'
4765    write_msg = MSG.decode('utf-8')
4766    newline = ''
4767
4768
4769class NetworkConnectionTest(object):
4770    """Prove network connection."""
4771
4772    def clientSetUp(self):
4773        # We're inherited below by BasicTCPTest2, which also inherits
4774        # BasicTCPTest, which defines self.port referenced below.
4775        self.cli = socket.create_connection((HOST, self.port))
4776        self.serv_conn = self.cli
4777
4778class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
4779    """Tests that NetworkConnection does not break existing TCP functionality.
4780    """
4781
4782class NetworkConnectionNoServer(unittest.TestCase):
4783
4784    class MockSocket(socket.socket):
4785        def connect(self, *args):
4786            raise socket.timeout('timed out')
4787
4788    @contextlib.contextmanager
4789    def mocked_socket_module(self):
4790        """Return a socket which times out on connect"""
4791        old_socket = socket.socket
4792        socket.socket = self.MockSocket
4793        try:
4794            yield
4795        finally:
4796            socket.socket = old_socket
4797
4798    def test_connect(self):
4799        port = support.find_unused_port()
4800        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4801        self.addCleanup(cli.close)
4802        with self.assertRaises(OSError) as cm:
4803            cli.connect((HOST, port))
4804        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
4805
4806    def test_create_connection(self):
4807        # Issue #9792: errors raised by create_connection() should have
4808        # a proper errno attribute.
4809        port = support.find_unused_port()
4810        with self.assertRaises(OSError) as cm:
4811            socket.create_connection((HOST, port))
4812
4813        # Issue #16257: create_connection() calls getaddrinfo() against
4814        # 'localhost'.  This may result in an IPV6 addr being returned
4815        # as well as an IPV4 one:
4816        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
4817        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
4818        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
4819        #
4820        # create_connection() enumerates through all the addresses returned
4821        # and if it doesn't successfully bind to any of them, it propagates
4822        # the last exception it encountered.
4823        #
4824        # On Solaris, ENETUNREACH is returned in this circumstance instead
4825        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
4826        # expected errnos.
4827        expected_errnos = support.get_socket_conn_refused_errs()
4828        self.assertIn(cm.exception.errno, expected_errnos)
4829
4830    def test_create_connection_timeout(self):
4831        # Issue #9792: create_connection() should not recast timeout errors
4832        # as generic socket errors.
4833        with self.mocked_socket_module():
4834            try:
4835                socket.create_connection((HOST, 1234))
4836            except socket.timeout:
4837                pass
4838            except OSError as exc:
4839                if support.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
4840                    raise
4841            else:
4842                self.fail('socket.timeout not raised')
4843
4844
4845class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
4846
4847    def __init__(self, methodName='runTest'):
4848        SocketTCPTest.__init__(self, methodName=methodName)
4849        ThreadableTest.__init__(self)
4850
4851    def clientSetUp(self):
4852        self.source_port = support.find_unused_port()
4853
4854    def clientTearDown(self):
4855        self.cli.close()
4856        self.cli = None
4857        ThreadableTest.clientTearDown(self)
4858
4859    def _justAccept(self):
4860        conn, addr = self.serv.accept()
4861        conn.close()
4862
4863    testFamily = _justAccept
4864    def _testFamily(self):
4865        self.cli = socket.create_connection((HOST, self.port), timeout=30)
4866        self.addCleanup(self.cli.close)
4867        self.assertEqual(self.cli.family, 2)
4868
4869    testSourceAddress = _justAccept
4870    def _testSourceAddress(self):
4871        self.cli = socket.create_connection((HOST, self.port), timeout=30,
4872                source_address=('', self.source_port))
4873        self.addCleanup(self.cli.close)
4874        self.assertEqual(self.cli.getsockname()[1], self.source_port)
4875        # The port number being used is sufficient to show that the bind()
4876        # call happened.
4877
4878    testTimeoutDefault = _justAccept
4879    def _testTimeoutDefault(self):
4880        # passing no explicit timeout uses socket's global default
4881        self.assertTrue(socket.getdefaulttimeout() is None)
4882        socket.setdefaulttimeout(42)
4883        try:
4884            self.cli = socket.create_connection((HOST, self.port))
4885            self.addCleanup(self.cli.close)
4886        finally:
4887            socket.setdefaulttimeout(None)
4888        self.assertEqual(self.cli.gettimeout(), 42)
4889
4890    testTimeoutNone = _justAccept
4891    def _testTimeoutNone(self):
4892        # None timeout means the same as sock.settimeout(None)
4893        self.assertTrue(socket.getdefaulttimeout() is None)
4894        socket.setdefaulttimeout(30)
4895        try:
4896            self.cli = socket.create_connection((HOST, self.port), timeout=None)
4897            self.addCleanup(self.cli.close)
4898        finally:
4899            socket.setdefaulttimeout(None)
4900        self.assertEqual(self.cli.gettimeout(), None)
4901
4902    testTimeoutValueNamed = _justAccept
4903    def _testTimeoutValueNamed(self):
4904        self.cli = socket.create_connection((HOST, self.port), timeout=30)
4905        self.assertEqual(self.cli.gettimeout(), 30)
4906
4907    testTimeoutValueNonamed = _justAccept
4908    def _testTimeoutValueNonamed(self):
4909        self.cli = socket.create_connection((HOST, self.port), 30)
4910        self.addCleanup(self.cli.close)
4911        self.assertEqual(self.cli.gettimeout(), 30)
4912
4913
4914class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
4915
4916    def __init__(self, methodName='runTest'):
4917        SocketTCPTest.__init__(self, methodName=methodName)
4918        ThreadableTest.__init__(self)
4919
4920    def clientSetUp(self):
4921        pass
4922
4923    def clientTearDown(self):
4924        self.cli.close()
4925        self.cli = None
4926        ThreadableTest.clientTearDown(self)
4927
4928    def testInsideTimeout(self):
4929        conn, addr = self.serv.accept()
4930        self.addCleanup(conn.close)
4931        time.sleep(3)
4932        conn.send(b"done!")
4933    testOutsideTimeout = testInsideTimeout
4934
4935    def _testInsideTimeout(self):
4936        self.cli = sock = socket.create_connection((HOST, self.port))
4937        data = sock.recv(5)
4938        self.assertEqual(data, b"done!")
4939
4940    def _testOutsideTimeout(self):
4941        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
4942        self.assertRaises(socket.timeout, lambda: sock.recv(5))
4943
4944
4945class TCPTimeoutTest(SocketTCPTest):
4946
4947    def testTCPTimeout(self):
4948        def raise_timeout(*args, **kwargs):
4949            self.serv.settimeout(1.0)
4950            self.serv.accept()
4951        self.assertRaises(socket.timeout, raise_timeout,
4952                              "Error generating a timeout exception (TCP)")
4953
4954    def testTimeoutZero(self):
4955        ok = False
4956        try:
4957            self.serv.settimeout(0.0)
4958            foo = self.serv.accept()
4959        except socket.timeout:
4960            self.fail("caught timeout instead of error (TCP)")
4961        except OSError:
4962            ok = True
4963        except:
4964            self.fail("caught unexpected exception (TCP)")
4965        if not ok:
4966            self.fail("accept() returned success when we did not expect it")
4967
4968    @unittest.skipUnless(hasattr(signal, 'alarm'),
4969                         'test needs signal.alarm()')
4970    def testInterruptedTimeout(self):
4971        # XXX I don't know how to do this test on MSWindows or any other
4972        # platform that doesn't support signal.alarm() or os.kill(), though
4973        # the bug should have existed on all platforms.
4974        self.serv.settimeout(5.0)   # must be longer than alarm
4975        class Alarm(Exception):
4976            pass
4977        def alarm_handler(signal, frame):
4978            raise Alarm
4979        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
4980        try:
4981            try:
4982                signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
4983                foo = self.serv.accept()
4984            except socket.timeout:
4985                self.fail("caught timeout instead of Alarm")
4986            except Alarm:
4987                pass
4988            except:
4989                self.fail("caught other exception instead of Alarm:"
4990                          " %s(%s):\n%s" %
4991                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
4992            else:
4993                self.fail("nothing caught")
4994            finally:
4995                signal.alarm(0)         # shut off alarm
4996        except Alarm:
4997            self.fail("got Alarm in wrong place")
4998        finally:
4999            # no alarm can be pending.  Safe to restore old handler.
5000            signal.signal(signal.SIGALRM, old_alarm)
5001
5002class UDPTimeoutTest(SocketUDPTest):
5003
5004    def testUDPTimeout(self):
5005        def raise_timeout(*args, **kwargs):
5006            self.serv.settimeout(1.0)
5007            self.serv.recv(1024)
5008        self.assertRaises(socket.timeout, raise_timeout,
5009                              "Error generating a timeout exception (UDP)")
5010
5011    def testTimeoutZero(self):
5012        ok = False
5013        try:
5014            self.serv.settimeout(0.0)
5015            foo = self.serv.recv(1024)
5016        except socket.timeout:
5017            self.fail("caught timeout instead of error (UDP)")
5018        except OSError:
5019            ok = True
5020        except:
5021            self.fail("caught unexpected exception (UDP)")
5022        if not ok:
5023            self.fail("recv() returned success when we did not expect it")
5024
5025class TestExceptions(unittest.TestCase):
5026
5027    def testExceptionTree(self):
5028        self.assertTrue(issubclass(OSError, Exception))
5029        self.assertTrue(issubclass(socket.herror, OSError))
5030        self.assertTrue(issubclass(socket.gaierror, OSError))
5031        self.assertTrue(issubclass(socket.timeout, OSError))
5032
5033    def test_setblocking_invalidfd(self):
5034        # Regression test for issue #28471
5035
5036        sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
5037        sock = socket.socket(
5038            socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
5039        sock0.close()
5040        self.addCleanup(sock.detach)
5041
5042        with self.assertRaises(OSError):
5043            sock.setblocking(False)
5044
5045
5046@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
5047class TestLinuxAbstractNamespace(unittest.TestCase):
5048
5049    UNIX_PATH_MAX = 108
5050
5051    def testLinuxAbstractNamespace(self):
5052        address = b"\x00python-test-hello\x00\xff"
5053        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
5054            s1.bind(address)
5055            s1.listen()
5056            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
5057                s2.connect(s1.getsockname())
5058                with s1.accept()[0] as s3:
5059                    self.assertEqual(s1.getsockname(), address)
5060                    self.assertEqual(s2.getpeername(), address)
5061
5062    def testMaxName(self):
5063        address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
5064        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5065            s.bind(address)
5066            self.assertEqual(s.getsockname(), address)
5067
5068    def testNameOverflow(self):
5069        address = "\x00" + "h" * self.UNIX_PATH_MAX
5070        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5071            self.assertRaises(OSError, s.bind, address)
5072
5073    def testStrName(self):
5074        # Check that an abstract name can be passed as a string.
5075        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5076        try:
5077            s.bind("\x00python\x00test\x00")
5078            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5079        finally:
5080            s.close()
5081
5082    def testBytearrayName(self):
5083        # Check that an abstract name can be passed as a bytearray.
5084        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5085            s.bind(bytearray(b"\x00python\x00test\x00"))
5086            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5087
5088@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
5089class TestUnixDomain(unittest.TestCase):
5090
5091    def setUp(self):
5092        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5093
5094    def tearDown(self):
5095        self.sock.close()
5096
5097    def encoded(self, path):
5098        # Return the given path encoded in the file system encoding,
5099        # or skip the test if this is not possible.
5100        try:
5101            return os.fsencode(path)
5102        except UnicodeEncodeError:
5103            self.skipTest(
5104                "Pathname {0!a} cannot be represented in file "
5105                "system encoding {1!r}".format(
5106                    path, sys.getfilesystemencoding()))
5107
5108    def bind(self, sock, path):
5109        # Bind the socket
5110        try:
5111            support.bind_unix_socket(sock, path)
5112        except OSError as e:
5113            if str(e) == "AF_UNIX path too long":
5114                self.skipTest(
5115                    "Pathname {0!a} is too long to serve as an AF_UNIX path"
5116                    .format(path))
5117            else:
5118                raise
5119
5120    def testUnbound(self):
5121        # Issue #30205 (note getsockname() can return None on OS X)
5122        self.assertIn(self.sock.getsockname(), ('', None))
5123
5124    def testStrAddr(self):
5125        # Test binding to and retrieving a normal string pathname.
5126        path = os.path.abspath(support.TESTFN)
5127        self.bind(self.sock, path)
5128        self.addCleanup(support.unlink, path)
5129        self.assertEqual(self.sock.getsockname(), path)
5130
5131    def testBytesAddr(self):
5132        # Test binding to a bytes pathname.
5133        path = os.path.abspath(support.TESTFN)
5134        self.bind(self.sock, self.encoded(path))
5135        self.addCleanup(support.unlink, path)
5136        self.assertEqual(self.sock.getsockname(), path)
5137
5138    def testSurrogateescapeBind(self):
5139        # Test binding to a valid non-ASCII pathname, with the
5140        # non-ASCII bytes supplied using surrogateescape encoding.
5141        path = os.path.abspath(support.TESTFN_UNICODE)
5142        b = self.encoded(path)
5143        self.bind(self.sock, b.decode("ascii", "surrogateescape"))
5144        self.addCleanup(support.unlink, path)
5145        self.assertEqual(self.sock.getsockname(), path)
5146
5147    def testUnencodableAddr(self):
5148        # Test binding to a pathname that cannot be encoded in the
5149        # file system encoding.
5150        if support.TESTFN_UNENCODABLE is None:
5151            self.skipTest("No unencodable filename available")
5152        path = os.path.abspath(support.TESTFN_UNENCODABLE)
5153        self.bind(self.sock, path)
5154        self.addCleanup(support.unlink, path)
5155        self.assertEqual(self.sock.getsockname(), path)
5156
5157
5158class BufferIOTest(SocketConnectedTest):
5159    """
5160    Test the buffer versions of socket.recv() and socket.send().
5161    """
5162    def __init__(self, methodName='runTest'):
5163        SocketConnectedTest.__init__(self, methodName=methodName)
5164
5165    def testRecvIntoArray(self):
5166        buf = array.array("B", [0] * len(MSG))
5167        nbytes = self.cli_conn.recv_into(buf)
5168        self.assertEqual(nbytes, len(MSG))
5169        buf = buf.tobytes()
5170        msg = buf[:len(MSG)]
5171        self.assertEqual(msg, MSG)
5172
5173    def _testRecvIntoArray(self):
5174        buf = bytes(MSG)
5175        self.serv_conn.send(buf)
5176
5177    def testRecvIntoBytearray(self):
5178        buf = bytearray(1024)
5179        nbytes = self.cli_conn.recv_into(buf)
5180        self.assertEqual(nbytes, len(MSG))
5181        msg = buf[:len(MSG)]
5182        self.assertEqual(msg, MSG)
5183
5184    _testRecvIntoBytearray = _testRecvIntoArray
5185
5186    def testRecvIntoMemoryview(self):
5187        buf = bytearray(1024)
5188        nbytes = self.cli_conn.recv_into(memoryview(buf))
5189        self.assertEqual(nbytes, len(MSG))
5190        msg = buf[:len(MSG)]
5191        self.assertEqual(msg, MSG)
5192
5193    _testRecvIntoMemoryview = _testRecvIntoArray
5194
5195    def testRecvFromIntoArray(self):
5196        buf = array.array("B", [0] * len(MSG))
5197        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5198        self.assertEqual(nbytes, len(MSG))
5199        buf = buf.tobytes()
5200        msg = buf[:len(MSG)]
5201        self.assertEqual(msg, MSG)
5202
5203    def _testRecvFromIntoArray(self):
5204        buf = bytes(MSG)
5205        self.serv_conn.send(buf)
5206
5207    def testRecvFromIntoBytearray(self):
5208        buf = bytearray(1024)
5209        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5210        self.assertEqual(nbytes, len(MSG))
5211        msg = buf[:len(MSG)]
5212        self.assertEqual(msg, MSG)
5213
5214    _testRecvFromIntoBytearray = _testRecvFromIntoArray
5215
5216    def testRecvFromIntoMemoryview(self):
5217        buf = bytearray(1024)
5218        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
5219        self.assertEqual(nbytes, len(MSG))
5220        msg = buf[:len(MSG)]
5221        self.assertEqual(msg, MSG)
5222
5223    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
5224
5225    def testRecvFromIntoSmallBuffer(self):
5226        # See issue #20246.
5227        buf = bytearray(8)
5228        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
5229
5230    def _testRecvFromIntoSmallBuffer(self):
5231        self.serv_conn.send(MSG)
5232
5233    def testRecvFromIntoEmptyBuffer(self):
5234        buf = bytearray()
5235        self.cli_conn.recvfrom_into(buf)
5236        self.cli_conn.recvfrom_into(buf, 0)
5237
5238    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
5239
5240
5241TIPC_STYPE = 2000
5242TIPC_LOWER = 200
5243TIPC_UPPER = 210
5244
5245def isTipcAvailable():
5246    """Check if the TIPC module is loaded
5247
5248    The TIPC module is not loaded automatically on Ubuntu and probably
5249    other Linux distros.
5250    """
5251    if not hasattr(socket, "AF_TIPC"):
5252        return False
5253    try:
5254        f = open("/proc/modules")
5255    except (FileNotFoundError, IsADirectoryError, PermissionError):
5256        # It's ok if the file does not exist, is a directory or if we
5257        # have not the permission to read it.
5258        return False
5259    with f:
5260        for line in f:
5261            if line.startswith("tipc "):
5262                return True
5263    return False
5264
5265@unittest.skipUnless(isTipcAvailable(),
5266                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5267class TIPCTest(unittest.TestCase):
5268    def testRDM(self):
5269        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5270        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5271        self.addCleanup(srv.close)
5272        self.addCleanup(cli.close)
5273
5274        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5275        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5276                TIPC_LOWER, TIPC_UPPER)
5277        srv.bind(srvaddr)
5278
5279        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5280                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5281        cli.sendto(MSG, sendaddr)
5282
5283        msg, recvaddr = srv.recvfrom(1024)
5284
5285        self.assertEqual(cli.getsockname(), recvaddr)
5286        self.assertEqual(msg, MSG)
5287
5288
5289@unittest.skipUnless(isTipcAvailable(),
5290                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5291class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
5292    def __init__(self, methodName = 'runTest'):
5293        unittest.TestCase.__init__(self, methodName = methodName)
5294        ThreadableTest.__init__(self)
5295
5296    def setUp(self):
5297        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5298        self.addCleanup(self.srv.close)
5299        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5300        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5301                TIPC_LOWER, TIPC_UPPER)
5302        self.srv.bind(srvaddr)
5303        self.srv.listen()
5304        self.serverExplicitReady()
5305        self.conn, self.connaddr = self.srv.accept()
5306        self.addCleanup(self.conn.close)
5307
5308    def clientSetUp(self):
5309        # There is a hittable race between serverExplicitReady() and the
5310        # accept() call; sleep a little while to avoid it, otherwise
5311        # we could get an exception
5312        time.sleep(0.1)
5313        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5314        self.addCleanup(self.cli.close)
5315        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5316                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5317        self.cli.connect(addr)
5318        self.cliaddr = self.cli.getsockname()
5319
5320    def testStream(self):
5321        msg = self.conn.recv(1024)
5322        self.assertEqual(msg, MSG)
5323        self.assertEqual(self.cliaddr, self.connaddr)
5324
5325    def _testStream(self):
5326        self.cli.send(MSG)
5327        self.cli.close()
5328
5329
5330class ContextManagersTest(ThreadedTCPSocketTest):
5331
5332    def _testSocketClass(self):
5333        # base test
5334        with socket.socket() as sock:
5335            self.assertFalse(sock._closed)
5336        self.assertTrue(sock._closed)
5337        # close inside with block
5338        with socket.socket() as sock:
5339            sock.close()
5340        self.assertTrue(sock._closed)
5341        # exception inside with block
5342        with socket.socket() as sock:
5343            self.assertRaises(OSError, sock.sendall, b'foo')
5344        self.assertTrue(sock._closed)
5345
5346    def testCreateConnectionBase(self):
5347        conn, addr = self.serv.accept()
5348        self.addCleanup(conn.close)
5349        data = conn.recv(1024)
5350        conn.sendall(data)
5351
5352    def _testCreateConnectionBase(self):
5353        address = self.serv.getsockname()
5354        with socket.create_connection(address) as sock:
5355            self.assertFalse(sock._closed)
5356            sock.sendall(b'foo')
5357            self.assertEqual(sock.recv(1024), b'foo')
5358        self.assertTrue(sock._closed)
5359
5360    def testCreateConnectionClose(self):
5361        conn, addr = self.serv.accept()
5362        self.addCleanup(conn.close)
5363        data = conn.recv(1024)
5364        conn.sendall(data)
5365
5366    def _testCreateConnectionClose(self):
5367        address = self.serv.getsockname()
5368        with socket.create_connection(address) as sock:
5369            sock.close()
5370        self.assertTrue(sock._closed)
5371        self.assertRaises(OSError, sock.sendall, b'foo')
5372
5373
5374class InheritanceTest(unittest.TestCase):
5375    @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
5376                         "SOCK_CLOEXEC not defined")
5377    @support.requires_linux_version(2, 6, 28)
5378    def test_SOCK_CLOEXEC(self):
5379        with socket.socket(socket.AF_INET,
5380                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
5381            self.assertEqual(s.type, socket.SOCK_STREAM)
5382            self.assertFalse(s.get_inheritable())
5383
5384    def test_default_inheritable(self):
5385        sock = socket.socket()
5386        with sock:
5387            self.assertEqual(sock.get_inheritable(), False)
5388
5389    def test_dup(self):
5390        sock = socket.socket()
5391        with sock:
5392            newsock = sock.dup()
5393            sock.close()
5394            with newsock:
5395                self.assertEqual(newsock.get_inheritable(), False)
5396
5397    def test_set_inheritable(self):
5398        sock = socket.socket()
5399        with sock:
5400            sock.set_inheritable(True)
5401            self.assertEqual(sock.get_inheritable(), True)
5402
5403            sock.set_inheritable(False)
5404            self.assertEqual(sock.get_inheritable(), False)
5405
5406    @unittest.skipIf(fcntl is None, "need fcntl")
5407    def test_get_inheritable_cloexec(self):
5408        sock = socket.socket()
5409        with sock:
5410            fd = sock.fileno()
5411            self.assertEqual(sock.get_inheritable(), False)
5412
5413            # clear FD_CLOEXEC flag
5414            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
5415            flags &= ~fcntl.FD_CLOEXEC
5416            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
5417
5418            self.assertEqual(sock.get_inheritable(), True)
5419
5420    @unittest.skipIf(fcntl is None, "need fcntl")
5421    def test_set_inheritable_cloexec(self):
5422        sock = socket.socket()
5423        with sock:
5424            fd = sock.fileno()
5425            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5426                             fcntl.FD_CLOEXEC)
5427
5428            sock.set_inheritable(True)
5429            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5430                             0)
5431
5432
5433    def test_socketpair(self):
5434        s1, s2 = socket.socketpair()
5435        self.addCleanup(s1.close)
5436        self.addCleanup(s2.close)
5437        self.assertEqual(s1.get_inheritable(), False)
5438        self.assertEqual(s2.get_inheritable(), False)
5439
5440
5441@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
5442                     "SOCK_NONBLOCK not defined")
5443class NonblockConstantTest(unittest.TestCase):
5444    def checkNonblock(self, s, nonblock=True, timeout=0.0):
5445        if nonblock:
5446            self.assertEqual(s.type, socket.SOCK_STREAM)
5447            self.assertEqual(s.gettimeout(), timeout)
5448            self.assertTrue(
5449                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5450            if timeout == 0:
5451                # timeout == 0: means that getblocking() must be False.
5452                self.assertFalse(s.getblocking())
5453            else:
5454                # If timeout > 0, the socket will be in a "blocking" mode
5455                # from the standpoint of the Python API.  For Python socket
5456                # object, "blocking" means that operations like 'sock.recv()'
5457                # will block.  Internally, file descriptors for
5458                # "blocking" Python sockets *with timeouts* are in a
5459                # *non-blocking* mode, and 'sock.recv()' uses 'select()'
5460                # and handles EWOULDBLOCK/EAGAIN to enforce the timeout.
5461                self.assertTrue(s.getblocking())
5462        else:
5463            self.assertEqual(s.type, socket.SOCK_STREAM)
5464            self.assertEqual(s.gettimeout(), None)
5465            self.assertFalse(
5466                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5467            self.assertTrue(s.getblocking())
5468
5469    @support.requires_linux_version(2, 6, 28)
5470    def test_SOCK_NONBLOCK(self):
5471        # a lot of it seems silly and redundant, but I wanted to test that
5472        # changing back and forth worked ok
5473        with socket.socket(socket.AF_INET,
5474                           socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
5475            self.checkNonblock(s)
5476            s.setblocking(1)
5477            self.checkNonblock(s, nonblock=False)
5478            s.setblocking(0)
5479            self.checkNonblock(s)
5480            s.settimeout(None)
5481            self.checkNonblock(s, nonblock=False)
5482            s.settimeout(2.0)
5483            self.checkNonblock(s, timeout=2.0)
5484            s.setblocking(1)
5485            self.checkNonblock(s, nonblock=False)
5486        # defaulttimeout
5487        t = socket.getdefaulttimeout()
5488        socket.setdefaulttimeout(0.0)
5489        with socket.socket() as s:
5490            self.checkNonblock(s)
5491        socket.setdefaulttimeout(None)
5492        with socket.socket() as s:
5493            self.checkNonblock(s, False)
5494        socket.setdefaulttimeout(2.0)
5495        with socket.socket() as s:
5496            self.checkNonblock(s, timeout=2.0)
5497        socket.setdefaulttimeout(None)
5498        with socket.socket() as s:
5499            self.checkNonblock(s, False)
5500        socket.setdefaulttimeout(t)
5501
5502
5503@unittest.skipUnless(os.name == "nt", "Windows specific")
5504@unittest.skipUnless(multiprocessing, "need multiprocessing")
5505class TestSocketSharing(SocketTCPTest):
5506    # This must be classmethod and not staticmethod or multiprocessing
5507    # won't be able to bootstrap it.
5508    @classmethod
5509    def remoteProcessServer(cls, q):
5510        # Recreate socket from shared data
5511        sdata = q.get()
5512        message = q.get()
5513
5514        s = socket.fromshare(sdata)
5515        s2, c = s.accept()
5516
5517        # Send the message
5518        s2.sendall(message)
5519        s2.close()
5520        s.close()
5521
5522    def testShare(self):
5523        # Transfer the listening server socket to another process
5524        # and service it from there.
5525
5526        # Create process:
5527        q = multiprocessing.Queue()
5528        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
5529        p.start()
5530
5531        # Get the shared socket data
5532        data = self.serv.share(p.pid)
5533
5534        # Pass the shared socket to the other process
5535        addr = self.serv.getsockname()
5536        self.serv.close()
5537        q.put(data)
5538
5539        # The data that the server will send us
5540        message = b"slapmahfro"
5541        q.put(message)
5542
5543        # Connect
5544        s = socket.create_connection(addr)
5545        #  listen for the data
5546        m = []
5547        while True:
5548            data = s.recv(100)
5549            if not data:
5550                break
5551            m.append(data)
5552        s.close()
5553        received = b"".join(m)
5554        self.assertEqual(received, message)
5555        p.join()
5556
5557    def testShareLength(self):
5558        data = self.serv.share(os.getpid())
5559        self.assertRaises(ValueError, socket.fromshare, data[:-1])
5560        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
5561
5562    def compareSockets(self, org, other):
5563        # socket sharing is expected to work only for blocking socket
5564        # since the internal python timeout value isn't transferred.
5565        self.assertEqual(org.gettimeout(), None)
5566        self.assertEqual(org.gettimeout(), other.gettimeout())
5567
5568        self.assertEqual(org.family, other.family)
5569        self.assertEqual(org.type, other.type)
5570        # If the user specified "0" for proto, then
5571        # internally windows will have picked the correct value.
5572        # Python introspection on the socket however will still return
5573        # 0.  For the shared socket, the python value is recreated
5574        # from the actual value, so it may not compare correctly.
5575        if org.proto != 0:
5576            self.assertEqual(org.proto, other.proto)
5577
5578    def testShareLocal(self):
5579        data = self.serv.share(os.getpid())
5580        s = socket.fromshare(data)
5581        try:
5582            self.compareSockets(self.serv, s)
5583        finally:
5584            s.close()
5585
5586    def testTypes(self):
5587        families = [socket.AF_INET, socket.AF_INET6]
5588        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
5589        for f in families:
5590            for t in types:
5591                try:
5592                    source = socket.socket(f, t)
5593                except OSError:
5594                    continue # This combination is not supported
5595                try:
5596                    data = source.share(os.getpid())
5597                    shared = socket.fromshare(data)
5598                    try:
5599                        self.compareSockets(source, shared)
5600                    finally:
5601                        shared.close()
5602                finally:
5603                    source.close()
5604
5605
5606class SendfileUsingSendTest(ThreadedTCPSocketTest):
5607    """
5608    Test the send() implementation of socket.sendfile().
5609    """
5610
5611    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
5612    BUFSIZE = 8192
5613    FILEDATA = b""
5614    TIMEOUT = 2
5615
5616    @classmethod
5617    def setUpClass(cls):
5618        def chunks(total, step):
5619            assert total >= step
5620            while total > step:
5621                yield step
5622                total -= step
5623            if total:
5624                yield total
5625
5626        chunk = b"".join([random.choice(string.ascii_letters).encode()
5627                          for i in range(cls.BUFSIZE)])
5628        with open(support.TESTFN, 'wb') as f:
5629            for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
5630                f.write(chunk)
5631        with open(support.TESTFN, 'rb') as f:
5632            cls.FILEDATA = f.read()
5633            assert len(cls.FILEDATA) == cls.FILESIZE
5634
5635    @classmethod
5636    def tearDownClass(cls):
5637        support.unlink(support.TESTFN)
5638
5639    def accept_conn(self):
5640        self.serv.settimeout(MAIN_TIMEOUT)
5641        conn, addr = self.serv.accept()
5642        conn.settimeout(self.TIMEOUT)
5643        self.addCleanup(conn.close)
5644        return conn
5645
5646    def recv_data(self, conn):
5647        received = []
5648        while True:
5649            chunk = conn.recv(self.BUFSIZE)
5650            if not chunk:
5651                break
5652            received.append(chunk)
5653        return b''.join(received)
5654
5655    def meth_from_sock(self, sock):
5656        # Depending on the mixin class being run return either send()
5657        # or sendfile() method implementation.
5658        return getattr(sock, "_sendfile_use_send")
5659
5660    # regular file
5661
5662    def _testRegularFile(self):
5663        address = self.serv.getsockname()
5664        file = open(support.TESTFN, 'rb')
5665        with socket.create_connection(address) as sock, file as file:
5666            meth = self.meth_from_sock(sock)
5667            sent = meth(file)
5668            self.assertEqual(sent, self.FILESIZE)
5669            self.assertEqual(file.tell(), self.FILESIZE)
5670
5671    def testRegularFile(self):
5672        conn = self.accept_conn()
5673        data = self.recv_data(conn)
5674        self.assertEqual(len(data), self.FILESIZE)
5675        self.assertEqual(data, self.FILEDATA)
5676
5677    # non regular file
5678
5679    def _testNonRegularFile(self):
5680        address = self.serv.getsockname()
5681        file = io.BytesIO(self.FILEDATA)
5682        with socket.create_connection(address) as sock, file as file:
5683            sent = sock.sendfile(file)
5684            self.assertEqual(sent, self.FILESIZE)
5685            self.assertEqual(file.tell(), self.FILESIZE)
5686            self.assertRaises(socket._GiveupOnSendfile,
5687                              sock._sendfile_use_sendfile, file)
5688
5689    def testNonRegularFile(self):
5690        conn = self.accept_conn()
5691        data = self.recv_data(conn)
5692        self.assertEqual(len(data), self.FILESIZE)
5693        self.assertEqual(data, self.FILEDATA)
5694
5695    # empty file
5696
5697    def _testEmptyFileSend(self):
5698        address = self.serv.getsockname()
5699        filename = support.TESTFN + "2"
5700        with open(filename, 'wb'):
5701            self.addCleanup(support.unlink, filename)
5702        file = open(filename, 'rb')
5703        with socket.create_connection(address) as sock, file as file:
5704            meth = self.meth_from_sock(sock)
5705            sent = meth(file)
5706            self.assertEqual(sent, 0)
5707            self.assertEqual(file.tell(), 0)
5708
5709    def testEmptyFileSend(self):
5710        conn = self.accept_conn()
5711        data = self.recv_data(conn)
5712        self.assertEqual(data, b"")
5713
5714    # offset
5715
5716    def _testOffset(self):
5717        address = self.serv.getsockname()
5718        file = open(support.TESTFN, 'rb')
5719        with socket.create_connection(address) as sock, file as file:
5720            meth = self.meth_from_sock(sock)
5721            sent = meth(file, offset=5000)
5722            self.assertEqual(sent, self.FILESIZE - 5000)
5723            self.assertEqual(file.tell(), self.FILESIZE)
5724
5725    def testOffset(self):
5726        conn = self.accept_conn()
5727        data = self.recv_data(conn)
5728        self.assertEqual(len(data), self.FILESIZE - 5000)
5729        self.assertEqual(data, self.FILEDATA[5000:])
5730
5731    # count
5732
5733    def _testCount(self):
5734        address = self.serv.getsockname()
5735        file = open(support.TESTFN, 'rb')
5736        with socket.create_connection(address, timeout=2) as sock, file as file:
5737            count = 5000007
5738            meth = self.meth_from_sock(sock)
5739            sent = meth(file, count=count)
5740            self.assertEqual(sent, count)
5741            self.assertEqual(file.tell(), count)
5742
5743    def testCount(self):
5744        count = 5000007
5745        conn = self.accept_conn()
5746        data = self.recv_data(conn)
5747        self.assertEqual(len(data), count)
5748        self.assertEqual(data, self.FILEDATA[:count])
5749
5750    # count small
5751
5752    def _testCountSmall(self):
5753        address = self.serv.getsockname()
5754        file = open(support.TESTFN, 'rb')
5755        with socket.create_connection(address, timeout=2) as sock, file as file:
5756            count = 1
5757            meth = self.meth_from_sock(sock)
5758            sent = meth(file, count=count)
5759            self.assertEqual(sent, count)
5760            self.assertEqual(file.tell(), count)
5761
5762    def testCountSmall(self):
5763        count = 1
5764        conn = self.accept_conn()
5765        data = self.recv_data(conn)
5766        self.assertEqual(len(data), count)
5767        self.assertEqual(data, self.FILEDATA[:count])
5768
5769    # count + offset
5770
5771    def _testCountWithOffset(self):
5772        address = self.serv.getsockname()
5773        file = open(support.TESTFN, 'rb')
5774        with socket.create_connection(address, timeout=2) as sock, file as file:
5775            count = 100007
5776            meth = self.meth_from_sock(sock)
5777            sent = meth(file, offset=2007, count=count)
5778            self.assertEqual(sent, count)
5779            self.assertEqual(file.tell(), count + 2007)
5780
5781    def testCountWithOffset(self):
5782        count = 100007
5783        conn = self.accept_conn()
5784        data = self.recv_data(conn)
5785        self.assertEqual(len(data), count)
5786        self.assertEqual(data, self.FILEDATA[2007:count+2007])
5787
5788    # non blocking sockets are not supposed to work
5789
5790    def _testNonBlocking(self):
5791        address = self.serv.getsockname()
5792        file = open(support.TESTFN, 'rb')
5793        with socket.create_connection(address) as sock, file as file:
5794            sock.setblocking(False)
5795            meth = self.meth_from_sock(sock)
5796            self.assertRaises(ValueError, meth, file)
5797            self.assertRaises(ValueError, sock.sendfile, file)
5798
5799    def testNonBlocking(self):
5800        conn = self.accept_conn()
5801        if conn.recv(8192):
5802            self.fail('was not supposed to receive any data')
5803
5804    # timeout (non-triggered)
5805
5806    def _testWithTimeout(self):
5807        address = self.serv.getsockname()
5808        file = open(support.TESTFN, 'rb')
5809        with socket.create_connection(address, timeout=2) as sock, file as file:
5810            meth = self.meth_from_sock(sock)
5811            sent = meth(file)
5812            self.assertEqual(sent, self.FILESIZE)
5813
5814    def testWithTimeout(self):
5815        conn = self.accept_conn()
5816        data = self.recv_data(conn)
5817        self.assertEqual(len(data), self.FILESIZE)
5818        self.assertEqual(data, self.FILEDATA)
5819
5820    # timeout (triggered)
5821
5822    def _testWithTimeoutTriggeredSend(self):
5823        address = self.serv.getsockname()
5824        with open(support.TESTFN, 'rb') as file:
5825            with socket.create_connection(address) as sock:
5826                sock.settimeout(0.01)
5827                meth = self.meth_from_sock(sock)
5828                self.assertRaises(socket.timeout, meth, file)
5829
5830    def testWithTimeoutTriggeredSend(self):
5831        conn = self.accept_conn()
5832        conn.recv(88192)
5833
5834    # errors
5835
5836    def _test_errors(self):
5837        pass
5838
5839    def test_errors(self):
5840        with open(support.TESTFN, 'rb') as file:
5841            with socket.socket(type=socket.SOCK_DGRAM) as s:
5842                meth = self.meth_from_sock(s)
5843                self.assertRaisesRegex(
5844                    ValueError, "SOCK_STREAM", meth, file)
5845        with open(support.TESTFN, 'rt') as file:
5846            with socket.socket() as s:
5847                meth = self.meth_from_sock(s)
5848                self.assertRaisesRegex(
5849                    ValueError, "binary mode", meth, file)
5850        with open(support.TESTFN, 'rb') as file:
5851            with socket.socket() as s:
5852                meth = self.meth_from_sock(s)
5853                self.assertRaisesRegex(TypeError, "positive integer",
5854                                       meth, file, count='2')
5855                self.assertRaisesRegex(TypeError, "positive integer",
5856                                       meth, file, count=0.1)
5857                self.assertRaisesRegex(ValueError, "positive integer",
5858                                       meth, file, count=0)
5859                self.assertRaisesRegex(ValueError, "positive integer",
5860                                       meth, file, count=-1)
5861
5862
5863@unittest.skipUnless(hasattr(os, "sendfile"),
5864                     'os.sendfile() required for this test.')
5865class SendfileUsingSendfileTest(SendfileUsingSendTest):
5866    """
5867    Test the sendfile() implementation of socket.sendfile().
5868    """
5869    def meth_from_sock(self, sock):
5870        return getattr(sock, "_sendfile_use_sendfile")
5871
5872
5873@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
5874class LinuxKernelCryptoAPI(unittest.TestCase):
5875    # tests for AF_ALG
5876    def create_alg(self, typ, name):
5877        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5878        try:
5879            sock.bind((typ, name))
5880        except FileNotFoundError as e:
5881            # type / algorithm is not available
5882            sock.close()
5883            raise unittest.SkipTest(str(e), typ, name)
5884        else:
5885            return sock
5886
5887    # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY,
5888    # at least on ppc64le architecture
5889    @support.requires_linux_version(4, 5)
5890    def test_sha256(self):
5891        expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
5892                                 "177a9cb410ff61f20015ad")
5893        with self.create_alg('hash', 'sha256') as algo:
5894            op, _ = algo.accept()
5895            with op:
5896                op.sendall(b"abc")
5897                self.assertEqual(op.recv(512), expected)
5898
5899            op, _ = algo.accept()
5900            with op:
5901                op.send(b'a', socket.MSG_MORE)
5902                op.send(b'b', socket.MSG_MORE)
5903                op.send(b'c', socket.MSG_MORE)
5904                op.send(b'')
5905                self.assertEqual(op.recv(512), expected)
5906
5907    def test_hmac_sha1(self):
5908        expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
5909        with self.create_alg('hash', 'hmac(sha1)') as algo:
5910            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
5911            op, _ = algo.accept()
5912            with op:
5913                op.sendall(b"what do ya want for nothing?")
5914                self.assertEqual(op.recv(512), expected)
5915
5916    # Although it should work with 3.19 and newer the test blocks on
5917    # Ubuntu 15.10 with Kernel 4.2.0-19.
5918    @support.requires_linux_version(4, 3)
5919    def test_aes_cbc(self):
5920        key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
5921        iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
5922        msg = b"Single block msg"
5923        ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
5924        msglen = len(msg)
5925        with self.create_alg('skcipher', 'cbc(aes)') as algo:
5926            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5927            op, _ = algo.accept()
5928            with op:
5929                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5930                                 flags=socket.MSG_MORE)
5931                op.sendall(msg)
5932                self.assertEqual(op.recv(msglen), ciphertext)
5933
5934            op, _ = algo.accept()
5935            with op:
5936                op.sendmsg_afalg([ciphertext],
5937                                 op=socket.ALG_OP_DECRYPT, iv=iv)
5938                self.assertEqual(op.recv(msglen), msg)
5939
5940            # long message
5941            multiplier = 1024
5942            longmsg = [msg] * multiplier
5943            op, _ = algo.accept()
5944            with op:
5945                op.sendmsg_afalg(longmsg,
5946                                 op=socket.ALG_OP_ENCRYPT, iv=iv)
5947                enc = op.recv(msglen * multiplier)
5948            self.assertEqual(len(enc), msglen * multiplier)
5949            self.assertEqual(enc[:msglen], ciphertext)
5950
5951            op, _ = algo.accept()
5952            with op:
5953                op.sendmsg_afalg([enc],
5954                                 op=socket.ALG_OP_DECRYPT, iv=iv)
5955                dec = op.recv(msglen * multiplier)
5956            self.assertEqual(len(dec), msglen * multiplier)
5957            self.assertEqual(dec, msg * multiplier)
5958
5959    @support.requires_linux_version(4, 9)  # see issue29324
5960    def test_aead_aes_gcm(self):
5961        key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
5962        iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
5963        plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
5964        assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
5965        expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
5966        expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
5967
5968        taglen = len(expected_tag)
5969        assoclen = len(assoc)
5970
5971        with self.create_alg('aead', 'gcm(aes)') as algo:
5972            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5973            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
5974                            None, taglen)
5975
5976            # send assoc, plain and tag buffer in separate steps
5977            op, _ = algo.accept()
5978            with op:
5979                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5980                                 assoclen=assoclen, flags=socket.MSG_MORE)
5981                op.sendall(assoc, socket.MSG_MORE)
5982                op.sendall(plain)
5983                res = op.recv(assoclen + len(plain) + taglen)
5984                self.assertEqual(expected_ct, res[assoclen:-taglen])
5985                self.assertEqual(expected_tag, res[-taglen:])
5986
5987            # now with msg
5988            op, _ = algo.accept()
5989            with op:
5990                msg = assoc + plain
5991                op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
5992                                 assoclen=assoclen)
5993                res = op.recv(assoclen + len(plain) + taglen)
5994                self.assertEqual(expected_ct, res[assoclen:-taglen])
5995                self.assertEqual(expected_tag, res[-taglen:])
5996
5997            # create anc data manually
5998            pack_uint32 = struct.Struct('I').pack
5999            op, _ = algo.accept()
6000            with op:
6001                msg = assoc + plain
6002                op.sendmsg(
6003                    [msg],
6004                    ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
6005                     [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
6006                     [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
6007                    )
6008                )
6009                res = op.recv(len(msg) + taglen)
6010                self.assertEqual(expected_ct, res[assoclen:-taglen])
6011                self.assertEqual(expected_tag, res[-taglen:])
6012
6013            # decrypt and verify
6014            op, _ = algo.accept()
6015            with op:
6016                msg = assoc + expected_ct + expected_tag
6017                op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
6018                                 assoclen=assoclen)
6019                res = op.recv(len(msg) - taglen)
6020                self.assertEqual(plain, res[assoclen:])
6021
6022    @support.requires_linux_version(4, 3)  # see test_aes_cbc
6023    def test_drbg_pr_sha256(self):
6024        # deterministic random bit generator, prediction resistance, sha256
6025        with self.create_alg('rng', 'drbg_pr_sha256') as algo:
6026            extra_seed = os.urandom(32)
6027            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
6028            op, _ = algo.accept()
6029            with op:
6030                rn = op.recv(32)
6031                self.assertEqual(len(rn), 32)
6032
6033    def test_sendmsg_afalg_args(self):
6034        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6035        with sock:
6036            with self.assertRaises(TypeError):
6037                sock.sendmsg_afalg()
6038
6039            with self.assertRaises(TypeError):
6040                sock.sendmsg_afalg(op=None)
6041
6042            with self.assertRaises(TypeError):
6043                sock.sendmsg_afalg(1)
6044
6045            with self.assertRaises(TypeError):
6046                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
6047
6048            with self.assertRaises(TypeError):
6049                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
6050
6051    def test_length_restriction(self):
6052        # bpo-35050, off-by-one error in length check
6053        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6054        self.addCleanup(sock.close)
6055
6056        # salg_type[14]
6057        with self.assertRaises(FileNotFoundError):
6058            sock.bind(("t" * 13, "name"))
6059        with self.assertRaisesRegex(ValueError, "type too long"):
6060            sock.bind(("t" * 14, "name"))
6061
6062        # salg_name[64]
6063        with self.assertRaises(FileNotFoundError):
6064            sock.bind(("type", "n" * 63))
6065        with self.assertRaisesRegex(ValueError, "name too long"):
6066            sock.bind(("type", "n" * 64))
6067
6068
6069@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
6070class TestMSWindowsTCPFlags(unittest.TestCase):
6071    knownTCPFlags = {
6072                       # available since long time ago
6073                       'TCP_MAXSEG',
6074                       'TCP_NODELAY',
6075                       # available starting with Windows 10 1607
6076                       'TCP_FASTOPEN',
6077                       # available starting with Windows 10 1703
6078                       'TCP_KEEPCNT',
6079                       # available starting with Windows 10 1709
6080                       'TCP_KEEPIDLE',
6081                       'TCP_KEEPINTVL'
6082                       }
6083
6084    def test_new_tcp_flags(self):
6085        provided = [s for s in dir(socket) if s.startswith('TCP')]
6086        unknown = [s for s in provided if s not in self.knownTCPFlags]
6087
6088        self.assertEqual([], unknown,
6089            "New TCP flags were discovered. See bpo-32394 for more information")
6090
6091
6092class CreateServerTest(unittest.TestCase):
6093
6094    def test_address(self):
6095        port = support.find_unused_port()
6096        with socket.create_server(("127.0.0.1", port)) as sock:
6097            self.assertEqual(sock.getsockname()[0], "127.0.0.1")
6098            self.assertEqual(sock.getsockname()[1], port)
6099        if support.IPV6_ENABLED:
6100            with socket.create_server(("::1", port),
6101                                      family=socket.AF_INET6) as sock:
6102                self.assertEqual(sock.getsockname()[0], "::1")
6103                self.assertEqual(sock.getsockname()[1], port)
6104
6105    def test_family_and_type(self):
6106        with socket.create_server(("127.0.0.1", 0)) as sock:
6107            self.assertEqual(sock.family, socket.AF_INET)
6108            self.assertEqual(sock.type, socket.SOCK_STREAM)
6109        if support.IPV6_ENABLED:
6110            with socket.create_server(("::1", 0), family=socket.AF_INET6) as s:
6111                self.assertEqual(s.family, socket.AF_INET6)
6112                self.assertEqual(sock.type, socket.SOCK_STREAM)
6113
6114    def test_reuse_port(self):
6115        if not hasattr(socket, "SO_REUSEPORT"):
6116            with self.assertRaises(ValueError):
6117                socket.create_server(("localhost", 0), reuse_port=True)
6118        else:
6119            with socket.create_server(("localhost", 0)) as sock:
6120                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6121                self.assertEqual(opt, 0)
6122            with socket.create_server(("localhost", 0), reuse_port=True) as sock:
6123                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6124                self.assertNotEqual(opt, 0)
6125
6126    @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
6127                     not hasattr(_socket, 'IPV6_V6ONLY'),
6128                     "IPV6_V6ONLY option not supported")
6129    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
6130    def test_ipv6_only_default(self):
6131        with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock:
6132            assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
6133
6134    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6135                     "dualstack_ipv6 not supported")
6136    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
6137    def test_dualstack_ipv6_family(self):
6138        with socket.create_server(("::1", 0), family=socket.AF_INET6,
6139                                  dualstack_ipv6=True) as sock:
6140            self.assertEqual(sock.family, socket.AF_INET6)
6141
6142
6143class CreateServerFunctionalTest(unittest.TestCase):
6144    timeout = 3
6145
6146    def setUp(self):
6147        self.thread = None
6148
6149    def tearDown(self):
6150        if self.thread is not None:
6151            self.thread.join(self.timeout)
6152
6153    def echo_server(self, sock):
6154        def run(sock):
6155            with sock:
6156                conn, _ = sock.accept()
6157                with conn:
6158                    event.wait(self.timeout)
6159                    msg = conn.recv(1024)
6160                    if not msg:
6161                        return
6162                    conn.sendall(msg)
6163
6164        event = threading.Event()
6165        sock.settimeout(self.timeout)
6166        self.thread = threading.Thread(target=run, args=(sock, ))
6167        self.thread.start()
6168        event.set()
6169
6170    def echo_client(self, addr, family):
6171        with socket.socket(family=family) as sock:
6172            sock.settimeout(self.timeout)
6173            sock.connect(addr)
6174            sock.sendall(b'foo')
6175            self.assertEqual(sock.recv(1024), b'foo')
6176
6177    def test_tcp4(self):
6178        port = support.find_unused_port()
6179        with socket.create_server(("", port)) as sock:
6180            self.echo_server(sock)
6181            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6182
6183    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
6184    def test_tcp6(self):
6185        port = support.find_unused_port()
6186        with socket.create_server(("", port),
6187                                  family=socket.AF_INET6) as sock:
6188            self.echo_server(sock)
6189            self.echo_client(("::1", port), socket.AF_INET6)
6190
6191    # --- dual stack tests
6192
6193    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6194                     "dualstack_ipv6 not supported")
6195    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
6196    def test_dual_stack_client_v4(self):
6197        port = support.find_unused_port()
6198        with socket.create_server(("", port), family=socket.AF_INET6,
6199                                  dualstack_ipv6=True) as sock:
6200            self.echo_server(sock)
6201            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6202
6203    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6204                     "dualstack_ipv6 not supported")
6205    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test')
6206    def test_dual_stack_client_v6(self):
6207        port = support.find_unused_port()
6208        with socket.create_server(("", port), family=socket.AF_INET6,
6209                                  dualstack_ipv6=True) as sock:
6210            self.echo_server(sock)
6211            self.echo_client(("::1", port), socket.AF_INET6)
6212
6213
6214def test_main():
6215    tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
6216             TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
6217             UDPTimeoutTest, CreateServerTest, CreateServerFunctionalTest]
6218
6219    tests.extend([
6220        NonBlockingTCPTests,
6221        FileObjectClassTestCase,
6222        UnbufferedFileObjectClassTestCase,
6223        LineBufferedFileObjectClassTestCase,
6224        SmallBufferedFileObjectClassTestCase,
6225        UnicodeReadFileObjectClassTestCase,
6226        UnicodeWriteFileObjectClassTestCase,
6227        UnicodeReadWriteFileObjectClassTestCase,
6228        NetworkConnectionNoServer,
6229        NetworkConnectionAttributesTest,
6230        NetworkConnectionBehaviourTest,
6231        ContextManagersTest,
6232        InheritanceTest,
6233        NonblockConstantTest
6234    ])
6235    tests.append(BasicSocketPairTest)
6236    tests.append(TestUnixDomain)
6237    tests.append(TestLinuxAbstractNamespace)
6238    tests.extend([TIPCTest, TIPCThreadableTest])
6239    tests.extend([BasicCANTest, CANTest])
6240    tests.extend([BasicRDSTest, RDSTest])
6241    tests.append(LinuxKernelCryptoAPI)
6242    tests.append(BasicQIPCRTRTest)
6243    tests.extend([
6244        BasicVSOCKTest,
6245        ThreadedVSOCKSocketStreamTest,
6246    ])
6247    tests.extend([
6248        CmsgMacroTests,
6249        SendmsgUDPTest,
6250        RecvmsgUDPTest,
6251        RecvmsgIntoUDPTest,
6252        SendmsgUDP6Test,
6253        RecvmsgUDP6Test,
6254        RecvmsgRFC3542AncillaryUDP6Test,
6255        RecvmsgIntoRFC3542AncillaryUDP6Test,
6256        RecvmsgIntoUDP6Test,
6257        SendmsgTCPTest,
6258        RecvmsgTCPTest,
6259        RecvmsgIntoTCPTest,
6260        SendmsgSCTPStreamTest,
6261        RecvmsgSCTPStreamTest,
6262        RecvmsgIntoSCTPStreamTest,
6263        SendmsgUnixStreamTest,
6264        RecvmsgUnixStreamTest,
6265        RecvmsgIntoUnixStreamTest,
6266        RecvmsgSCMRightsStreamTest,
6267        RecvmsgIntoSCMRightsStreamTest,
6268        # These are slow when setitimer() is not available
6269        InterruptedRecvTimeoutTest,
6270        InterruptedSendTimeoutTest,
6271        TestSocketSharing,
6272        SendfileUsingSendTest,
6273        SendfileUsingSendfileTest,
6274    ])
6275    tests.append(TestMSWindowsTCPFlags)
6276
6277    thread_info = support.threading_setup()
6278    support.run_unittest(*tests)
6279    support.threading_cleanup(*thread_info)
6280
6281if __name__ == "__main__":
6282    test_main()
6283