• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from test.support import os_helper
4from test.support import socket_helper
5from test.support import threading_helper
6
7import errno
8import io
9import itertools
10import socket
11import select
12import tempfile
13import time
14import traceback
15import queue
16import sys
17import os
18import platform
19import array
20import contextlib
21from weakref import proxy
22import signal
23import math
24import pickle
25import struct
26import random
27import shutil
28import string
29import _thread as thread
30import threading
31try:
32    import multiprocessing
33except ImportError:
34    multiprocessing = False
35try:
36    import fcntl
37except ImportError:
38    fcntl = None
39
40support.requires_working_socket(module=True)
41
42HOST = socket_helper.HOST
43# test unicode string and carriage return
44MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
45
46VSOCKPORT = 1234
47AIX = platform.system() == "AIX"
48
49try:
50    import _socket
51except ImportError:
52    _socket = None
53
54def get_cid():
55    if fcntl is None:
56        return None
57    if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'):
58        return None
59    try:
60        with open("/dev/vsock", "rb") as f:
61            r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, "    ")
62    except OSError:
63        return None
64    else:
65        return struct.unpack("I", r)[0]
66
67def _have_socket_can():
68    """Check whether CAN sockets are supported on this host."""
69    try:
70        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
71    except (AttributeError, OSError):
72        return False
73    else:
74        s.close()
75    return True
76
77def _have_socket_can_isotp():
78    """Check whether CAN ISOTP sockets are supported on this host."""
79    try:
80        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
81    except (AttributeError, OSError):
82        return False
83    else:
84        s.close()
85    return True
86
87def _have_socket_can_j1939():
88    """Check whether CAN J1939 sockets are supported on this host."""
89    try:
90        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939)
91    except (AttributeError, OSError):
92        return False
93    else:
94        s.close()
95    return True
96
97def _have_socket_rds():
98    """Check whether RDS sockets are supported on this host."""
99    try:
100        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
101    except (AttributeError, OSError):
102        return False
103    else:
104        s.close()
105    return True
106
107def _have_socket_alg():
108    """Check whether AF_ALG sockets are supported on this host."""
109    try:
110        s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
111    except (AttributeError, OSError):
112        return False
113    else:
114        s.close()
115    return True
116
117def _have_socket_qipcrtr():
118    """Check whether AF_QIPCRTR sockets are supported on this host."""
119    try:
120        s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0)
121    except (AttributeError, OSError):
122        return False
123    else:
124        s.close()
125    return True
126
127def _have_socket_vsock():
128    """Check whether AF_VSOCK sockets are supported on this host."""
129    ret = get_cid() is not None
130    return ret
131
132
133def _have_socket_bluetooth():
134    """Check whether AF_BLUETOOTH sockets are supported on this host."""
135    try:
136        # RFCOMM is supported by all platforms with bluetooth support. Windows
137        # does not support omitting the protocol.
138        s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
139    except (AttributeError, OSError):
140        return False
141    else:
142        s.close()
143    return True
144
145
146@contextlib.contextmanager
147def socket_setdefaulttimeout(timeout):
148    old_timeout = socket.getdefaulttimeout()
149    try:
150        socket.setdefaulttimeout(timeout)
151        yield
152    finally:
153        socket.setdefaulttimeout(old_timeout)
154
155
156HAVE_SOCKET_CAN = _have_socket_can()
157
158HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
159
160HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939()
161
162HAVE_SOCKET_RDS = _have_socket_rds()
163
164HAVE_SOCKET_ALG = _have_socket_alg()
165
166HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr()
167
168HAVE_SOCKET_VSOCK = _have_socket_vsock()
169
170HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE")
171
172HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
173
174# Size in bytes of the int type
175SIZEOF_INT = array.array("i").itemsize
176
177class SocketTCPTest(unittest.TestCase):
178
179    def setUp(self):
180        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
181        self.port = socket_helper.bind_port(self.serv)
182        self.serv.listen()
183
184    def tearDown(self):
185        self.serv.close()
186        self.serv = None
187
188class SocketUDPTest(unittest.TestCase):
189
190    def setUp(self):
191        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
192        self.port = socket_helper.bind_port(self.serv)
193
194    def tearDown(self):
195        self.serv.close()
196        self.serv = None
197
198class SocketUDPLITETest(SocketUDPTest):
199
200    def setUp(self):
201        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
202        self.port = socket_helper.bind_port(self.serv)
203
204class ThreadSafeCleanupTestCase:
205    """Subclass of unittest.TestCase with thread-safe cleanup methods.
206
207    This subclass protects the addCleanup() and doCleanups() methods
208    with a recursive lock.
209    """
210
211    def __init__(self, *args, **kwargs):
212        super().__init__(*args, **kwargs)
213        self._cleanup_lock = threading.RLock()
214
215    def addCleanup(self, *args, **kwargs):
216        with self._cleanup_lock:
217            return super().addCleanup(*args, **kwargs)
218
219    def doCleanups(self, *args, **kwargs):
220        with self._cleanup_lock:
221            return super().doCleanups(*args, **kwargs)
222
223class SocketCANTest(unittest.TestCase):
224
225    """To be able to run this test, a `vcan0` CAN interface can be created with
226    the following commands:
227    # modprobe vcan
228    # ip link add dev vcan0 type vcan
229    # ip link set up vcan0
230    """
231    interface = 'vcan0'
232    bufsize = 128
233
234    """The CAN frame structure is defined in <linux/can.h>:
235
236    struct can_frame {
237        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
238        __u8    can_dlc; /* data length code: 0 .. 8 */
239        __u8    data[8] __attribute__((aligned(8)));
240    };
241    """
242    can_frame_fmt = "=IB3x8s"
243    can_frame_size = struct.calcsize(can_frame_fmt)
244
245    """The Broadcast Management Command frame structure is defined
246    in <linux/can/bcm.h>:
247
248    struct bcm_msg_head {
249        __u32 opcode;
250        __u32 flags;
251        __u32 count;
252        struct timeval ival1, ival2;
253        canid_t can_id;
254        __u32 nframes;
255        struct can_frame frames[0];
256    }
257
258    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
259    `struct can_frame` definition). Must use native not standard types for packing.
260    """
261    bcm_cmd_msg_fmt = "@3I4l2I"
262    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
263
264    def setUp(self):
265        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
266        self.addCleanup(self.s.close)
267        try:
268            self.s.bind((self.interface,))
269        except OSError:
270            self.skipTest('network interface `%s` does not exist' %
271                           self.interface)
272
273
274class SocketRDSTest(unittest.TestCase):
275
276    """To be able to run this test, the `rds` kernel module must be loaded:
277    # modprobe rds
278    """
279    bufsize = 8192
280
281    def setUp(self):
282        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
283        self.addCleanup(self.serv.close)
284        try:
285            self.port = socket_helper.bind_port(self.serv)
286        except OSError:
287            self.skipTest('unable to bind RDS socket')
288
289
290class ThreadableTest:
291    """Threadable Test class
292
293    The ThreadableTest class makes it easy to create a threaded
294    client/server pair from an existing unit test. To create a
295    new threaded class from an existing unit test, use multiple
296    inheritance:
297
298        class NewClass (OldClass, ThreadableTest):
299            pass
300
301    This class defines two new fixture functions with obvious
302    purposes for overriding:
303
304        clientSetUp ()
305        clientTearDown ()
306
307    Any new test functions within the class must then define
308    tests in pairs, where the test name is preceded with a
309    '_' to indicate the client portion of the test. Ex:
310
311        def testFoo(self):
312            # Server portion
313
314        def _testFoo(self):
315            # Client portion
316
317    Any exceptions raised by the clients during their tests
318    are caught and transferred to the main thread to alert
319    the testing framework.
320
321    Note, the server setup function cannot call any blocking
322    functions that rely on the client thread during setup,
323    unless serverExplicitReady() is called just before
324    the blocking call (such as in setting up a client/server
325    connection and performing the accept() in setUp().
326    """
327
328    def __init__(self):
329        # Swap the true setup function
330        self.__setUp = self.setUp
331        self.setUp = self._setUp
332
333    def serverExplicitReady(self):
334        """This method allows the server to explicitly indicate that
335        it wants the client thread to proceed. This is useful if the
336        server is about to execute a blocking routine that is
337        dependent upon the client thread during its setup routine."""
338        self.server_ready.set()
339
340    def _setUp(self):
341        self.enterContext(threading_helper.wait_threads_exit())
342
343        self.server_ready = threading.Event()
344        self.client_ready = threading.Event()
345        self.done = threading.Event()
346        self.queue = queue.Queue(1)
347        self.server_crashed = False
348
349        def raise_queued_exception():
350            if self.queue.qsize():
351                raise self.queue.get()
352        self.addCleanup(raise_queued_exception)
353
354        # Do some munging to start the client test.
355        methodname = self.id()
356        i = methodname.rfind('.')
357        methodname = methodname[i+1:]
358        test_method = getattr(self, '_' + methodname)
359        self.client_thread = thread.start_new_thread(
360            self.clientRun, (test_method,))
361
362        try:
363            self.__setUp()
364        except:
365            self.server_crashed = True
366            raise
367        finally:
368            self.server_ready.set()
369        self.client_ready.wait()
370        self.addCleanup(self.done.wait)
371
372    def clientRun(self, test_func):
373        self.server_ready.wait()
374        try:
375            self.clientSetUp()
376        except BaseException as e:
377            self.queue.put(e)
378            self.clientTearDown()
379            return
380        finally:
381            self.client_ready.set()
382        if self.server_crashed:
383            self.clientTearDown()
384            return
385        if not hasattr(test_func, '__call__'):
386            raise TypeError("test_func must be a callable function")
387        try:
388            test_func()
389        except BaseException as e:
390            self.queue.put(e)
391        finally:
392            self.clientTearDown()
393
394    def clientSetUp(self):
395        raise NotImplementedError("clientSetUp must be implemented.")
396
397    def clientTearDown(self):
398        self.done.set()
399        thread.exit()
400
401class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
402
403    def __init__(self, methodName='runTest'):
404        SocketTCPTest.__init__(self, methodName=methodName)
405        ThreadableTest.__init__(self)
406
407    def clientSetUp(self):
408        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
409
410    def clientTearDown(self):
411        self.cli.close()
412        self.cli = None
413        ThreadableTest.clientTearDown(self)
414
415class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
416
417    def __init__(self, methodName='runTest'):
418        SocketUDPTest.__init__(self, methodName=methodName)
419        ThreadableTest.__init__(self)
420
421    def clientSetUp(self):
422        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
423
424    def clientTearDown(self):
425        self.cli.close()
426        self.cli = None
427        ThreadableTest.clientTearDown(self)
428
429@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
430          'UDPLITE sockets required for this test.')
431class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest):
432
433    def __init__(self, methodName='runTest'):
434        SocketUDPLITETest.__init__(self, methodName=methodName)
435        ThreadableTest.__init__(self)
436
437    def clientSetUp(self):
438        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
439
440    def clientTearDown(self):
441        self.cli.close()
442        self.cli = None
443        ThreadableTest.clientTearDown(self)
444
445class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
446
447    def __init__(self, methodName='runTest'):
448        SocketCANTest.__init__(self, methodName=methodName)
449        ThreadableTest.__init__(self)
450
451    def clientSetUp(self):
452        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
453        try:
454            self.cli.bind((self.interface,))
455        except OSError:
456            # skipTest should not be called here, and will be called in the
457            # server instead
458            pass
459
460    def clientTearDown(self):
461        self.cli.close()
462        self.cli = None
463        ThreadableTest.clientTearDown(self)
464
465class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
466
467    def __init__(self, methodName='runTest'):
468        SocketRDSTest.__init__(self, methodName=methodName)
469        ThreadableTest.__init__(self)
470
471    def clientSetUp(self):
472        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
473        try:
474            # RDS sockets must be bound explicitly to send or receive data
475            self.cli.bind((HOST, 0))
476            self.cli_addr = self.cli.getsockname()
477        except OSError:
478            # skipTest should not be called here, and will be called in the
479            # server instead
480            pass
481
482    def clientTearDown(self):
483        self.cli.close()
484        self.cli = None
485        ThreadableTest.clientTearDown(self)
486
487@unittest.skipIf(fcntl is None, "need fcntl")
488@unittest.skipUnless(HAVE_SOCKET_VSOCK,
489          'VSOCK sockets required for this test.')
490@unittest.skipUnless(get_cid() != 2,
491          "This test can only be run on a virtual guest.")
492class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest):
493
494    def __init__(self, methodName='runTest'):
495        unittest.TestCase.__init__(self, methodName=methodName)
496        ThreadableTest.__init__(self)
497
498    def setUp(self):
499        self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
500        self.addCleanup(self.serv.close)
501        self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT))
502        self.serv.listen()
503        self.serverExplicitReady()
504        self.conn, self.connaddr = self.serv.accept()
505        self.addCleanup(self.conn.close)
506
507    def clientSetUp(self):
508        time.sleep(0.1)
509        self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
510        self.addCleanup(self.cli.close)
511        cid = get_cid()
512        self.cli.connect((cid, VSOCKPORT))
513
514    def testStream(self):
515        msg = self.conn.recv(1024)
516        self.assertEqual(msg, MSG)
517
518    def _testStream(self):
519        self.cli.send(MSG)
520        self.cli.close()
521
522class SocketConnectedTest(ThreadedTCPSocketTest):
523    """Socket tests for client-server connection.
524
525    self.cli_conn is a client socket connected to the server.  The
526    setUp() method guarantees that it is connected to the server.
527    """
528
529    def __init__(self, methodName='runTest'):
530        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
531
532    def setUp(self):
533        ThreadedTCPSocketTest.setUp(self)
534        # Indicate explicitly we're ready for the client thread to
535        # proceed and then perform the blocking call to accept
536        self.serverExplicitReady()
537        conn, addr = self.serv.accept()
538        self.cli_conn = conn
539
540    def tearDown(self):
541        self.cli_conn.close()
542        self.cli_conn = None
543        ThreadedTCPSocketTest.tearDown(self)
544
545    def clientSetUp(self):
546        ThreadedTCPSocketTest.clientSetUp(self)
547        self.cli.connect((HOST, self.port))
548        self.serv_conn = self.cli
549
550    def clientTearDown(self):
551        self.serv_conn.close()
552        self.serv_conn = None
553        ThreadedTCPSocketTest.clientTearDown(self)
554
555class SocketPairTest(unittest.TestCase, ThreadableTest):
556
557    def __init__(self, methodName='runTest'):
558        unittest.TestCase.__init__(self, methodName=methodName)
559        ThreadableTest.__init__(self)
560        self.cli = None
561        self.serv = None
562
563    def socketpair(self):
564        # To be overridden by some child classes.
565        return socket.socketpair()
566
567    def setUp(self):
568        self.serv, self.cli = self.socketpair()
569
570    def tearDown(self):
571        if self.serv:
572            self.serv.close()
573        self.serv = None
574
575    def clientSetUp(self):
576        pass
577
578    def clientTearDown(self):
579        if self.cli:
580            self.cli.close()
581        self.cli = None
582        ThreadableTest.clientTearDown(self)
583
584
585# The following classes are used by the sendmsg()/recvmsg() tests.
586# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
587# gives a drop-in replacement for SocketConnectedTest, but different
588# address families can be used, and the attributes serv_addr and
589# cli_addr will be set to the addresses of the endpoints.
590
591class SocketTestBase(unittest.TestCase):
592    """A base class for socket tests.
593
594    Subclasses must provide methods newSocket() to return a new socket
595    and bindSock(sock) to bind it to an unused address.
596
597    Creates a socket self.serv and sets self.serv_addr to its address.
598    """
599
600    def setUp(self):
601        self.serv = self.newSocket()
602        self.bindServer()
603
604    def bindServer(self):
605        """Bind server socket and set self.serv_addr to its address."""
606        self.bindSock(self.serv)
607        self.serv_addr = self.serv.getsockname()
608
609    def tearDown(self):
610        self.serv.close()
611        self.serv = None
612
613
614class SocketListeningTestMixin(SocketTestBase):
615    """Mixin to listen on the server socket."""
616
617    def setUp(self):
618        super().setUp()
619        self.serv.listen()
620
621
622class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
623                              ThreadableTest):
624    """Mixin to add client socket and allow client/server tests.
625
626    Client socket is self.cli and its address is self.cli_addr.  See
627    ThreadableTest for usage information.
628    """
629
630    def __init__(self, *args, **kwargs):
631        super().__init__(*args, **kwargs)
632        ThreadableTest.__init__(self)
633
634    def clientSetUp(self):
635        self.cli = self.newClientSocket()
636        self.bindClient()
637
638    def newClientSocket(self):
639        """Return a new socket for use as client."""
640        return self.newSocket()
641
642    def bindClient(self):
643        """Bind client socket and set self.cli_addr to its address."""
644        self.bindSock(self.cli)
645        self.cli_addr = self.cli.getsockname()
646
647    def clientTearDown(self):
648        self.cli.close()
649        self.cli = None
650        ThreadableTest.clientTearDown(self)
651
652
653class ConnectedStreamTestMixin(SocketListeningTestMixin,
654                               ThreadedSocketTestMixin):
655    """Mixin to allow client/server stream tests with connected client.
656
657    Server's socket representing connection to client is self.cli_conn
658    and client's connection to server is self.serv_conn.  (Based on
659    SocketConnectedTest.)
660    """
661
662    def setUp(self):
663        super().setUp()
664        # Indicate explicitly we're ready for the client thread to
665        # proceed and then perform the blocking call to accept
666        self.serverExplicitReady()
667        conn, addr = self.serv.accept()
668        self.cli_conn = conn
669
670    def tearDown(self):
671        self.cli_conn.close()
672        self.cli_conn = None
673        super().tearDown()
674
675    def clientSetUp(self):
676        super().clientSetUp()
677        self.cli.connect(self.serv_addr)
678        self.serv_conn = self.cli
679
680    def clientTearDown(self):
681        try:
682            self.serv_conn.close()
683            self.serv_conn = None
684        except AttributeError:
685            pass
686        super().clientTearDown()
687
688
689class UnixSocketTestBase(SocketTestBase):
690    """Base class for Unix-domain socket tests."""
691
692    # This class is used for file descriptor passing tests, so we
693    # create the sockets in a private directory so that other users
694    # can't send anything that might be problematic for a privileged
695    # user running the tests.
696
697    def setUp(self):
698        self.dir_path = tempfile.mkdtemp()
699        self.addCleanup(os.rmdir, self.dir_path)
700        super().setUp()
701
702    def bindSock(self, sock):
703        path = tempfile.mktemp(dir=self.dir_path)
704        socket_helper.bind_unix_socket(sock, path)
705        self.addCleanup(os_helper.unlink, path)
706
707class UnixStreamBase(UnixSocketTestBase):
708    """Base class for Unix-domain SOCK_STREAM tests."""
709
710    def newSocket(self):
711        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
712
713
714class InetTestBase(SocketTestBase):
715    """Base class for IPv4 socket tests."""
716
717    host = HOST
718
719    def setUp(self):
720        super().setUp()
721        self.port = self.serv_addr[1]
722
723    def bindSock(self, sock):
724        socket_helper.bind_port(sock, host=self.host)
725
726class TCPTestBase(InetTestBase):
727    """Base class for TCP-over-IPv4 tests."""
728
729    def newSocket(self):
730        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
731
732class UDPTestBase(InetTestBase):
733    """Base class for UDP-over-IPv4 tests."""
734
735    def newSocket(self):
736        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
737
738class UDPLITETestBase(InetTestBase):
739    """Base class for UDPLITE-over-IPv4 tests."""
740
741    def newSocket(self):
742        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
743
744class SCTPStreamBase(InetTestBase):
745    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
746
747    def newSocket(self):
748        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
749                             socket.IPPROTO_SCTP)
750
751
752class Inet6TestBase(InetTestBase):
753    """Base class for IPv6 socket tests."""
754
755    host = socket_helper.HOSTv6
756
757class UDP6TestBase(Inet6TestBase):
758    """Base class for UDP-over-IPv6 tests."""
759
760    def newSocket(self):
761        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
762
763class UDPLITE6TestBase(Inet6TestBase):
764    """Base class for UDPLITE-over-IPv6 tests."""
765
766    def newSocket(self):
767        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
768
769
770# Test-skipping decorators for use with ThreadableTest.
771
772def skipWithClientIf(condition, reason):
773    """Skip decorated test if condition is true, add client_skip decorator.
774
775    If the decorated object is not a class, sets its attribute
776    "client_skip" to a decorator which will return an empty function
777    if the test is to be skipped, or the original function if it is
778    not.  This can be used to avoid running the client part of a
779    skipped test when using ThreadableTest.
780    """
781    def client_pass(*args, **kwargs):
782        pass
783    def skipdec(obj):
784        retval = unittest.skip(reason)(obj)
785        if not isinstance(obj, type):
786            retval.client_skip = lambda f: client_pass
787        return retval
788    def noskipdec(obj):
789        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
790            obj.client_skip = lambda f: f
791        return obj
792    return skipdec if condition else noskipdec
793
794
795def requireAttrs(obj, *attributes):
796    """Skip decorated test if obj is missing any of the given attributes.
797
798    Sets client_skip attribute as skipWithClientIf() does.
799    """
800    missing = [name for name in attributes if not hasattr(obj, name)]
801    return skipWithClientIf(
802        missing, "don't have " + ", ".join(name for name in missing))
803
804
805def requireSocket(*args):
806    """Skip decorated test if a socket cannot be created with given arguments.
807
808    When an argument is given as a string, will use the value of that
809    attribute of the socket module, or skip the test if it doesn't
810    exist.  Sets client_skip attribute as skipWithClientIf() does.
811    """
812    err = None
813    missing = [obj for obj in args if
814               isinstance(obj, str) and not hasattr(socket, obj)]
815    if missing:
816        err = "don't have " + ", ".join(name for name in missing)
817    else:
818        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
819                    for obj in args]
820        try:
821            s = socket.socket(*callargs)
822        except OSError as e:
823            # XXX: check errno?
824            err = str(e)
825        else:
826            s.close()
827    return skipWithClientIf(
828        err is not None,
829        "can't create socket({0}): {1}".format(
830            ", ".join(str(o) for o in args), err))
831
832
833#######################################################################
834## Begin Tests
835
836class GeneralModuleTests(unittest.TestCase):
837
838    def test_SocketType_is_socketobject(self):
839        import _socket
840        self.assertTrue(socket.SocketType is _socket.socket)
841        s = socket.socket()
842        self.assertIsInstance(s, socket.SocketType)
843        s.close()
844
845    def test_repr(self):
846        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
847        with s:
848            self.assertIn('fd=%i' % s.fileno(), repr(s))
849            self.assertIn('family=%s' % socket.AF_INET, repr(s))
850            self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
851            self.assertIn('proto=0', repr(s))
852            self.assertNotIn('raddr', repr(s))
853            s.bind(('127.0.0.1', 0))
854            self.assertIn('laddr', repr(s))
855            self.assertIn(str(s.getsockname()), repr(s))
856        self.assertIn('[closed]', repr(s))
857        self.assertNotIn('laddr', repr(s))
858
859    @unittest.skipUnless(_socket is not None, 'need _socket module')
860    def test_csocket_repr(self):
861        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
862        try:
863            expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
864                        % (s.fileno(), s.family, s.type, s.proto))
865            self.assertEqual(repr(s), expected)
866        finally:
867            s.close()
868        expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
869                    % (s.family, s.type, s.proto))
870        self.assertEqual(repr(s), expected)
871
872    def test_weakref(self):
873        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
874            p = proxy(s)
875            self.assertEqual(p.fileno(), s.fileno())
876        s = None
877        support.gc_collect()  # For PyPy or other GCs.
878        try:
879            p.fileno()
880        except ReferenceError:
881            pass
882        else:
883            self.fail('Socket proxy still exists')
884
885    def testSocketError(self):
886        # Testing socket module exceptions
887        msg = "Error raising socket exception (%s)."
888        with self.assertRaises(OSError, msg=msg % 'OSError'):
889            raise OSError
890        with self.assertRaises(OSError, msg=msg % 'socket.herror'):
891            raise socket.herror
892        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
893            raise socket.gaierror
894
895    def testSendtoErrors(self):
896        # Testing that sendto doesn't mask failures. See #10169.
897        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
898        self.addCleanup(s.close)
899        s.bind(('', 0))
900        sockname = s.getsockname()
901        # 2 args
902        with self.assertRaises(TypeError) as cm:
903            s.sendto('\u2620', sockname)
904        self.assertEqual(str(cm.exception),
905                         "a bytes-like object is required, not 'str'")
906        with self.assertRaises(TypeError) as cm:
907            s.sendto(5j, sockname)
908        self.assertEqual(str(cm.exception),
909                         "a bytes-like object is required, not 'complex'")
910        with self.assertRaises(TypeError) as cm:
911            s.sendto(b'foo', None)
912        self.assertIn('not NoneType',str(cm.exception))
913        # 3 args
914        with self.assertRaises(TypeError) as cm:
915            s.sendto('\u2620', 0, sockname)
916        self.assertEqual(str(cm.exception),
917                         "a bytes-like object is required, not 'str'")
918        with self.assertRaises(TypeError) as cm:
919            s.sendto(5j, 0, sockname)
920        self.assertEqual(str(cm.exception),
921                         "a bytes-like object is required, not 'complex'")
922        with self.assertRaises(TypeError) as cm:
923            s.sendto(b'foo', 0, None)
924        self.assertIn('not NoneType', str(cm.exception))
925        with self.assertRaises(TypeError) as cm:
926            s.sendto(b'foo', 'bar', sockname)
927        with self.assertRaises(TypeError) as cm:
928            s.sendto(b'foo', None, None)
929        # wrong number of args
930        with self.assertRaises(TypeError) as cm:
931            s.sendto(b'foo')
932        self.assertIn('(1 given)', str(cm.exception))
933        with self.assertRaises(TypeError) as cm:
934            s.sendto(b'foo', 0, sockname, 4)
935        self.assertIn('(4 given)', str(cm.exception))
936
937    def testCrucialConstants(self):
938        # Testing for mission critical constants
939        socket.AF_INET
940        if socket.has_ipv6:
941            socket.AF_INET6
942        socket.SOCK_STREAM
943        socket.SOCK_DGRAM
944        socket.SOCK_RAW
945        socket.SOCK_RDM
946        socket.SOCK_SEQPACKET
947        socket.SOL_SOCKET
948        socket.SO_REUSEADDR
949
950    def testCrucialIpProtoConstants(self):
951        socket.IPPROTO_TCP
952        socket.IPPROTO_UDP
953        if socket.has_ipv6:
954            socket.IPPROTO_IPV6
955
956    @unittest.skipUnless(os.name == "nt", "Windows specific")
957    def testWindowsSpecificConstants(self):
958        socket.IPPROTO_ICLFXBM
959        socket.IPPROTO_ST
960        socket.IPPROTO_CBT
961        socket.IPPROTO_IGP
962        socket.IPPROTO_RDP
963        socket.IPPROTO_PGM
964        socket.IPPROTO_L2TP
965        socket.IPPROTO_SCTP
966
967    @unittest.skipIf(support.is_wasi, "WASI is missing these methods")
968    def test_socket_methods(self):
969        # socket methods that depend on a configure HAVE_ check. They should
970        # be present on all platforms except WASI.
971        names = [
972            "_accept", "bind", "connect", "connect_ex", "getpeername",
973            "getsockname", "listen", "recvfrom", "recvfrom_into", "sendto",
974            "setsockopt", "shutdown"
975        ]
976        for name in names:
977            if not hasattr(socket.socket, name):
978                self.fail(f"socket method {name} is missing")
979
980    @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
981    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
982    def test3542SocketOptions(self):
983        # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542
984        opts = {
985            'IPV6_CHECKSUM',
986            'IPV6_DONTFRAG',
987            'IPV6_DSTOPTS',
988            'IPV6_HOPLIMIT',
989            'IPV6_HOPOPTS',
990            'IPV6_NEXTHOP',
991            'IPV6_PATHMTU',
992            'IPV6_PKTINFO',
993            'IPV6_RECVDSTOPTS',
994            'IPV6_RECVHOPLIMIT',
995            'IPV6_RECVHOPOPTS',
996            'IPV6_RECVPATHMTU',
997            'IPV6_RECVPKTINFO',
998            'IPV6_RECVRTHDR',
999            'IPV6_RECVTCLASS',
1000            'IPV6_RTHDR',
1001            'IPV6_RTHDRDSTOPTS',
1002            'IPV6_RTHDR_TYPE_0',
1003            'IPV6_TCLASS',
1004            'IPV6_USE_MIN_MTU',
1005        }
1006        for opt in opts:
1007            self.assertTrue(
1008                hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'"
1009            )
1010
1011    def testHostnameRes(self):
1012        # Testing hostname resolution mechanisms
1013        hostname = socket.gethostname()
1014        try:
1015            ip = socket.gethostbyname(hostname)
1016        except OSError:
1017            # Probably name lookup wasn't set up right; skip this test
1018            self.skipTest('name lookup failure')
1019        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
1020        try:
1021            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
1022        except OSError:
1023            # Probably a similar problem as above; skip this test
1024            self.skipTest('name lookup failure')
1025        all_host_names = [hostname, hname] + aliases
1026        fqhn = socket.getfqdn(ip)
1027        if not fqhn in all_host_names:
1028            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
1029
1030    def test_host_resolution(self):
1031        for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']:
1032            self.assertEqual(socket.gethostbyname(addr), addr)
1033
1034        # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have
1035        # a matching name entry (e.g. 'ip6-localhost')
1036        for host in [socket_helper.HOSTv4]:
1037            self.assertIn(host, socket.gethostbyaddr(host)[2])
1038
1039    def test_host_resolution_bad_address(self):
1040        # These are all malformed IP addresses and expected not to resolve to
1041        # any result.  But some ISPs, e.g. AWS and AT&T, may successfully
1042        # resolve these IPs. In particular, AT&T's DNS Error Assist service
1043        # will break this test.  See https://bugs.python.org/issue42092 for a
1044        # workaround.
1045        explanation = (
1046            "resolving an invalid IP address did not raise OSError; "
1047            "can be caused by a broken DNS server"
1048        )
1049        for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
1050                     '1:1:1:1:1:1:1:1:1']:
1051            with self.assertRaises(OSError, msg=addr):
1052                socket.gethostbyname(addr)
1053            with self.assertRaises(OSError, msg=explanation):
1054                socket.gethostbyaddr(addr)
1055
1056    @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
1057    @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
1058    def test_sethostname(self):
1059        oldhn = socket.gethostname()
1060        try:
1061            socket.sethostname('new')
1062        except OSError as e:
1063            if e.errno == errno.EPERM:
1064                self.skipTest("test should be run as root")
1065            else:
1066                raise
1067        try:
1068            # running test as root!
1069            self.assertEqual(socket.gethostname(), 'new')
1070            # Should work with bytes objects too
1071            socket.sethostname(b'bar')
1072            self.assertEqual(socket.gethostname(), 'bar')
1073        finally:
1074            socket.sethostname(oldhn)
1075
1076    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
1077                         'socket.if_nameindex() not available.')
1078    def testInterfaceNameIndex(self):
1079        interfaces = socket.if_nameindex()
1080        for index, name in interfaces:
1081            self.assertIsInstance(index, int)
1082            self.assertIsInstance(name, str)
1083            # interface indices are non-zero integers
1084            self.assertGreater(index, 0)
1085            _index = socket.if_nametoindex(name)
1086            self.assertIsInstance(_index, int)
1087            self.assertEqual(index, _index)
1088            _name = socket.if_indextoname(index)
1089            self.assertIsInstance(_name, str)
1090            self.assertEqual(name, _name)
1091
1092    @unittest.skipUnless(hasattr(socket, 'if_indextoname'),
1093                         'socket.if_indextoname() not available.')
1094    def testInvalidInterfaceIndexToName(self):
1095        self.assertRaises(OSError, socket.if_indextoname, 0)
1096        self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
1097
1098    @unittest.skipUnless(hasattr(socket, 'if_nametoindex'),
1099                         'socket.if_nametoindex() not available.')
1100    def testInvalidInterfaceNameToIndex(self):
1101        self.assertRaises(TypeError, socket.if_nametoindex, 0)
1102        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
1103
1104    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
1105                         'test needs sys.getrefcount()')
1106    def testRefCountGetNameInfo(self):
1107        # Testing reference count for getnameinfo
1108        try:
1109            # On some versions, this loses a reference
1110            orig = sys.getrefcount(__name__)
1111            socket.getnameinfo(__name__,0)
1112        except TypeError:
1113            if sys.getrefcount(__name__) != orig:
1114                self.fail("socket.getnameinfo loses a reference")
1115
1116    def testInterpreterCrash(self):
1117        # Making sure getnameinfo doesn't crash the interpreter
1118        try:
1119            # On some versions, this crashes the interpreter.
1120            socket.getnameinfo(('x', 0, 0, 0), 0)
1121        except OSError:
1122            pass
1123
1124    def testNtoH(self):
1125        # This just checks that htons etc. are their own inverse,
1126        # when looking at the lower 16 or 32 bits.
1127        sizes = {socket.htonl: 32, socket.ntohl: 32,
1128                 socket.htons: 16, socket.ntohs: 16}
1129        for func, size in sizes.items():
1130            mask = (1<<size) - 1
1131            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
1132                self.assertEqual(i & mask, func(func(i&mask)) & mask)
1133
1134            swapped = func(mask)
1135            self.assertEqual(swapped & mask, mask)
1136            self.assertRaises(OverflowError, func, 1<<34)
1137
1138    @support.cpython_only
1139    def testNtoHErrors(self):
1140        import _testcapi
1141        s_good_values = [0, 1, 2, 0xffff]
1142        l_good_values = s_good_values + [0xffffffff]
1143        l_bad_values = [-1, -2, 1<<32, 1<<1000]
1144        s_bad_values = (
1145            l_bad_values +
1146            [_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] +
1147            [1 << 16, _testcapi.INT_MAX]
1148        )
1149        for k in s_good_values:
1150            socket.ntohs(k)
1151            socket.htons(k)
1152        for k in l_good_values:
1153            socket.ntohl(k)
1154            socket.htonl(k)
1155        for k in s_bad_values:
1156            self.assertRaises(OverflowError, socket.ntohs, k)
1157            self.assertRaises(OverflowError, socket.htons, k)
1158        for k in l_bad_values:
1159            self.assertRaises(OverflowError, socket.ntohl, k)
1160            self.assertRaises(OverflowError, socket.htonl, k)
1161
1162    def testGetServBy(self):
1163        eq = self.assertEqual
1164        # Find one service that exists, then check all the related interfaces.
1165        # I've ordered this by protocols that have both a tcp and udp
1166        # protocol, at least for modern Linuxes.
1167        if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
1168            or sys.platform in ('linux', 'darwin')):
1169            # avoid the 'echo' service on this platform, as there is an
1170            # assumption breaking non-standard port/protocol entry
1171            services = ('daytime', 'qotd', 'domain')
1172        else:
1173            services = ('echo', 'daytime', 'domain')
1174        for service in services:
1175            try:
1176                port = socket.getservbyname(service, 'tcp')
1177                break
1178            except OSError:
1179                pass
1180        else:
1181            raise OSError
1182        # Try same call with optional protocol omitted
1183        # Issue #26936: Android getservbyname() was broken before API 23.
1184        if (not hasattr(sys, 'getandroidapilevel') or
1185                sys.getandroidapilevel() >= 23):
1186            port2 = socket.getservbyname(service)
1187            eq(port, port2)
1188        # Try udp, but don't barf if it doesn't exist
1189        try:
1190            udpport = socket.getservbyname(service, 'udp')
1191        except OSError:
1192            udpport = None
1193        else:
1194            eq(udpport, port)
1195        # Now make sure the lookup by port returns the same service name
1196        # Issue #26936: Android getservbyport() is broken.
1197        if not support.is_android:
1198            eq(socket.getservbyport(port2), service)
1199        eq(socket.getservbyport(port, 'tcp'), service)
1200        if udpport is not None:
1201            eq(socket.getservbyport(udpport, 'udp'), service)
1202        # Make sure getservbyport does not accept out of range ports.
1203        self.assertRaises(OverflowError, socket.getservbyport, -1)
1204        self.assertRaises(OverflowError, socket.getservbyport, 65536)
1205
1206    def testDefaultTimeout(self):
1207        # Testing default timeout
1208        # The default timeout should initially be None
1209        self.assertEqual(socket.getdefaulttimeout(), None)
1210        with socket.socket() as s:
1211            self.assertEqual(s.gettimeout(), None)
1212
1213        # Set the default timeout to 10, and see if it propagates
1214        with socket_setdefaulttimeout(10):
1215            self.assertEqual(socket.getdefaulttimeout(), 10)
1216            with socket.socket() as sock:
1217                self.assertEqual(sock.gettimeout(), 10)
1218
1219            # Reset the default timeout to None, and see if it propagates
1220            socket.setdefaulttimeout(None)
1221            self.assertEqual(socket.getdefaulttimeout(), None)
1222            with socket.socket() as sock:
1223                self.assertEqual(sock.gettimeout(), None)
1224
1225        # Check that setting it to an invalid value raises ValueError
1226        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
1227
1228        # Check that setting it to an invalid type raises TypeError
1229        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
1230
1231    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
1232                         'test needs socket.inet_aton()')
1233    def testIPv4_inet_aton_fourbytes(self):
1234        # Test that issue1008086 and issue767150 are fixed.
1235        # It must return 4 bytes.
1236        self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
1237        self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
1238
1239    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1240                         'test needs socket.inet_pton()')
1241    def testIPv4toString(self):
1242        from socket import inet_aton as f, inet_pton, AF_INET
1243        g = lambda a: inet_pton(AF_INET, a)
1244
1245        assertInvalid = lambda func,a: self.assertRaises(
1246            (OSError, ValueError), func, a
1247        )
1248
1249        self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
1250        self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1251        self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1252        self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1253        self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1254        # bpo-29972: inet_pton() doesn't fail on AIX
1255        if not AIX:
1256            assertInvalid(f, '0.0.0.')
1257        assertInvalid(f, '300.0.0.0')
1258        assertInvalid(f, 'a.0.0.0')
1259        assertInvalid(f, '1.2.3.4.5')
1260        assertInvalid(f, '::1')
1261
1262        self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1263        self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1264        self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1265        self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1266        assertInvalid(g, '0.0.0.')
1267        assertInvalid(g, '300.0.0.0')
1268        assertInvalid(g, 'a.0.0.0')
1269        assertInvalid(g, '1.2.3.4.5')
1270        assertInvalid(g, '::1')
1271
1272    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1273                         'test needs socket.inet_pton()')
1274    def testIPv6toString(self):
1275        try:
1276            from socket import inet_pton, AF_INET6, has_ipv6
1277            if not has_ipv6:
1278                self.skipTest('IPv6 not available')
1279        except ImportError:
1280            self.skipTest('could not import needed symbols from socket')
1281
1282        if sys.platform == "win32":
1283            try:
1284                inet_pton(AF_INET6, '::')
1285            except OSError as e:
1286                if e.winerror == 10022:
1287                    self.skipTest('IPv6 might not be supported')
1288
1289        f = lambda a: inet_pton(AF_INET6, a)
1290        assertInvalid = lambda a: self.assertRaises(
1291            (OSError, ValueError), f, a
1292        )
1293
1294        self.assertEqual(b'\x00' * 16, f('::'))
1295        self.assertEqual(b'\x00' * 16, f('0::0'))
1296        self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1297        self.assertEqual(
1298            b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1299            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1300        )
1301        self.assertEqual(
1302            b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1303            f('ad42:abc::127:0:254:2')
1304        )
1305        self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1306        assertInvalid('0x20::')
1307        assertInvalid(':::')
1308        assertInvalid('::0::')
1309        assertInvalid('1::abc::')
1310        assertInvalid('1::abc::def')
1311        assertInvalid('1:2:3:4:5:6')
1312        assertInvalid('1:2:3:4:5:6:')
1313        assertInvalid('1:2:3:4:5:6:7:8:0')
1314        # bpo-29972: inet_pton() doesn't fail on AIX
1315        if not AIX:
1316            assertInvalid('1:2:3:4:5:6:7:8:')
1317
1318        self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1319            f('::254.42.23.64')
1320        )
1321        self.assertEqual(
1322            b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1323            f('42::a29b:254.42.23.64')
1324        )
1325        self.assertEqual(
1326            b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1327            f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1328        )
1329        assertInvalid('255.254.253.252')
1330        assertInvalid('1::260.2.3.0')
1331        assertInvalid('1::0.be.e.0')
1332        assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1333        assertInvalid('::1.2.3.4:0')
1334        assertInvalid('0.100.200.0:3:4:5:6:7:8')
1335
1336    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1337                         'test needs socket.inet_ntop()')
1338    def testStringToIPv4(self):
1339        from socket import inet_ntoa as f, inet_ntop, AF_INET
1340        g = lambda a: inet_ntop(AF_INET, a)
1341        assertInvalid = lambda func,a: self.assertRaises(
1342            (OSError, ValueError), func, a
1343        )
1344
1345        self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1346        self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1347        self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1348        self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1349        assertInvalid(f, b'\x00' * 3)
1350        assertInvalid(f, b'\x00' * 5)
1351        assertInvalid(f, b'\x00' * 16)
1352        self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1353
1354        self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1355        self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1356        self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1357        assertInvalid(g, b'\x00' * 3)
1358        assertInvalid(g, b'\x00' * 5)
1359        assertInvalid(g, b'\x00' * 16)
1360        self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1361
1362    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1363                         'test needs socket.inet_ntop()')
1364    def testStringToIPv6(self):
1365        try:
1366            from socket import inet_ntop, AF_INET6, has_ipv6
1367            if not has_ipv6:
1368                self.skipTest('IPv6 not available')
1369        except ImportError:
1370            self.skipTest('could not import needed symbols from socket')
1371
1372        if sys.platform == "win32":
1373            try:
1374                inet_ntop(AF_INET6, b'\x00' * 16)
1375            except OSError as e:
1376                if e.winerror == 10022:
1377                    self.skipTest('IPv6 might not be supported')
1378
1379        f = lambda a: inet_ntop(AF_INET6, a)
1380        assertInvalid = lambda a: self.assertRaises(
1381            (OSError, ValueError), f, a
1382        )
1383
1384        self.assertEqual('::', f(b'\x00' * 16))
1385        self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1386        self.assertEqual(
1387            'aef:b01:506:1001:ffff:9997:55:170',
1388            f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1389        )
1390        self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1391
1392        assertInvalid(b'\x12' * 15)
1393        assertInvalid(b'\x12' * 17)
1394        assertInvalid(b'\x12' * 4)
1395
1396    # XXX The following don't test module-level functionality...
1397
1398    def testSockName(self):
1399        # Testing getsockname()
1400        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1401        self.addCleanup(sock.close)
1402
1403        # Since find_unused_port() is inherently subject to race conditions, we
1404        # call it a couple times if necessary.
1405        for i in itertools.count():
1406            port = socket_helper.find_unused_port()
1407            try:
1408                sock.bind(("0.0.0.0", port))
1409            except OSError as e:
1410                if e.errno != errno.EADDRINUSE or i == 5:
1411                    raise
1412            else:
1413                break
1414
1415        name = sock.getsockname()
1416        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1417        # it reasonable to get the host's addr in addition to 0.0.0.0.
1418        # At least for eCos.  This is required for the S/390 to pass.
1419        try:
1420            my_ip_addr = socket.gethostbyname(socket.gethostname())
1421        except OSError:
1422            # Probably name lookup wasn't set up right; skip this test
1423            self.skipTest('name lookup failure')
1424        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1425        self.assertEqual(name[1], port)
1426
1427    def testGetSockOpt(self):
1428        # Testing getsockopt()
1429        # We know a socket should start without reuse==0
1430        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1431        self.addCleanup(sock.close)
1432        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1433        self.assertFalse(reuse != 0, "initial mode is reuse")
1434
1435    def testSetSockOpt(self):
1436        # Testing setsockopt()
1437        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1438        self.addCleanup(sock.close)
1439        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1440        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1441        self.assertFalse(reuse == 0, "failed to set reuse mode")
1442
1443    def testSendAfterClose(self):
1444        # testing send() after close() with timeout
1445        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1446            sock.settimeout(1)
1447        self.assertRaises(OSError, sock.send, b"spam")
1448
1449    def testCloseException(self):
1450        sock = socket.socket()
1451        sock.bind((socket._LOCALHOST, 0))
1452        socket.socket(fileno=sock.fileno()).close()
1453        try:
1454            sock.close()
1455        except OSError as err:
1456            # Winsock apparently raises ENOTSOCK
1457            self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1458        else:
1459            self.fail("close() should raise EBADF/ENOTSOCK")
1460
1461    def testNewAttributes(self):
1462        # testing .family, .type and .protocol
1463
1464        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1465            self.assertEqual(sock.family, socket.AF_INET)
1466            if hasattr(socket, 'SOCK_CLOEXEC'):
1467                self.assertIn(sock.type,
1468                              (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1469                               socket.SOCK_STREAM))
1470            else:
1471                self.assertEqual(sock.type, socket.SOCK_STREAM)
1472            self.assertEqual(sock.proto, 0)
1473
1474    def test_getsockaddrarg(self):
1475        sock = socket.socket()
1476        self.addCleanup(sock.close)
1477        port = socket_helper.find_unused_port()
1478        big_port = port + 65536
1479        neg_port = port - 65536
1480        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1481        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1482        # Since find_unused_port() is inherently subject to race conditions, we
1483        # call it a couple times if necessary.
1484        for i in itertools.count():
1485            port = socket_helper.find_unused_port()
1486            try:
1487                sock.bind((HOST, port))
1488            except OSError as e:
1489                if e.errno != errno.EADDRINUSE or i == 5:
1490                    raise
1491            else:
1492                break
1493
1494    @unittest.skipUnless(os.name == "nt", "Windows specific")
1495    def test_sock_ioctl(self):
1496        self.assertTrue(hasattr(socket.socket, 'ioctl'))
1497        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1498        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1499        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1500        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1501        s = socket.socket()
1502        self.addCleanup(s.close)
1503        self.assertRaises(ValueError, s.ioctl, -1, None)
1504        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1505
1506    @unittest.skipUnless(os.name == "nt", "Windows specific")
1507    @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1508                         'Loopback fast path support required for this test')
1509    def test_sio_loopback_fast_path(self):
1510        s = socket.socket()
1511        self.addCleanup(s.close)
1512        try:
1513            s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1514        except OSError as exc:
1515            WSAEOPNOTSUPP = 10045
1516            if exc.winerror == WSAEOPNOTSUPP:
1517                self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1518                              "doesn't implemented in this Windows version")
1519            raise
1520        self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1521
1522    def testGetaddrinfo(self):
1523        try:
1524            socket.getaddrinfo('localhost', 80)
1525        except socket.gaierror as err:
1526            if err.errno == socket.EAI_SERVICE:
1527                # see http://bugs.python.org/issue1282647
1528                self.skipTest("buggy libc version")
1529            raise
1530        # len of every sequence is supposed to be == 5
1531        for info in socket.getaddrinfo(HOST, None):
1532            self.assertEqual(len(info), 5)
1533        # host can be a domain name, a string representation of an
1534        # IPv4/v6 address or None
1535        socket.getaddrinfo('localhost', 80)
1536        socket.getaddrinfo('127.0.0.1', 80)
1537        socket.getaddrinfo(None, 80)
1538        if socket_helper.IPV6_ENABLED:
1539            socket.getaddrinfo('::1', 80)
1540        # port can be a string service name such as "http", a numeric
1541        # port number or None
1542        # Issue #26936: Android getaddrinfo() was broken before API level 23.
1543        if (not hasattr(sys, 'getandroidapilevel') or
1544                sys.getandroidapilevel() >= 23):
1545            socket.getaddrinfo(HOST, "http")
1546        socket.getaddrinfo(HOST, 80)
1547        socket.getaddrinfo(HOST, None)
1548        # test family and socktype filters
1549        infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1550        for family, type, _, _, _ in infos:
1551            self.assertEqual(family, socket.AF_INET)
1552            self.assertEqual(repr(family), '<AddressFamily.AF_INET: %r>' % family.value)
1553            self.assertEqual(str(family), str(family.value))
1554            self.assertEqual(type, socket.SOCK_STREAM)
1555            self.assertEqual(repr(type), '<SocketKind.SOCK_STREAM: %r>' % type.value)
1556            self.assertEqual(str(type), str(type.value))
1557        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1558        for _, socktype, _, _, _ in infos:
1559            self.assertEqual(socktype, socket.SOCK_STREAM)
1560        # test proto and flags arguments
1561        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1562        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1563        # a server willing to support both IPv4 and IPv6 will
1564        # usually do this
1565        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1566                           socket.AI_PASSIVE)
1567        # test keyword arguments
1568        a = socket.getaddrinfo(HOST, None)
1569        b = socket.getaddrinfo(host=HOST, port=None)
1570        self.assertEqual(a, b)
1571        a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1572        b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1573        self.assertEqual(a, b)
1574        a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1575        b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1576        self.assertEqual(a, b)
1577        a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1578        b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1579        self.assertEqual(a, b)
1580        a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1581        b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1582        self.assertEqual(a, b)
1583        a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1584                               socket.AI_PASSIVE)
1585        b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1586                               type=socket.SOCK_STREAM, proto=0,
1587                               flags=socket.AI_PASSIVE)
1588        self.assertEqual(a, b)
1589        # Issue #6697.
1590        self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1591
1592        # Issue 17269: test workaround for OS X platform bug segfault
1593        if hasattr(socket, 'AI_NUMERICSERV'):
1594            try:
1595                # The arguments here are undefined and the call may succeed
1596                # or fail.  All we care here is that it doesn't segfault.
1597                socket.getaddrinfo("localhost", None, 0, 0, 0,
1598                                   socket.AI_NUMERICSERV)
1599            except socket.gaierror:
1600                pass
1601
1602    def test_getnameinfo(self):
1603        # only IP addresses are allowed
1604        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1605
1606    @unittest.skipUnless(support.is_resource_enabled('network'),
1607                         'network is not enabled')
1608    def test_idna(self):
1609        # Check for internet access before running test
1610        # (issue #12804, issue #25138).
1611        with socket_helper.transient_internet('python.org'):
1612            socket.gethostbyname('python.org')
1613
1614        # these should all be successful
1615        domain = 'испытание.pythontest.net'
1616        socket.gethostbyname(domain)
1617        socket.gethostbyname_ex(domain)
1618        socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1619        # this may not work if the forward lookup chooses the IPv6 address, as that doesn't
1620        # have a reverse entry yet
1621        # socket.gethostbyaddr('испытание.python.org')
1622
1623    def check_sendall_interrupted(self, with_timeout):
1624        # socketpair() is not strictly required, but it makes things easier.
1625        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1626            self.skipTest("signal.alarm and socket.socketpair required for this test")
1627        # Our signal handlers clobber the C errno by calling a math function
1628        # with an invalid domain value.
1629        def ok_handler(*args):
1630            self.assertRaises(ValueError, math.acosh, 0)
1631        def raising_handler(*args):
1632            self.assertRaises(ValueError, math.acosh, 0)
1633            1 // 0
1634        c, s = socket.socketpair()
1635        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1636        try:
1637            if with_timeout:
1638                # Just above the one second minimum for signal.alarm
1639                c.settimeout(1.5)
1640            with self.assertRaises(ZeroDivisionError):
1641                signal.alarm(1)
1642                c.sendall(b"x" * support.SOCK_MAX_SIZE)
1643            if with_timeout:
1644                signal.signal(signal.SIGALRM, ok_handler)
1645                signal.alarm(1)
1646                self.assertRaises(TimeoutError, c.sendall,
1647                                  b"x" * support.SOCK_MAX_SIZE)
1648        finally:
1649            signal.alarm(0)
1650            signal.signal(signal.SIGALRM, old_alarm)
1651            c.close()
1652            s.close()
1653
1654    def test_sendall_interrupted(self):
1655        self.check_sendall_interrupted(False)
1656
1657    def test_sendall_interrupted_with_timeout(self):
1658        self.check_sendall_interrupted(True)
1659
1660    def test_dealloc_warn(self):
1661        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1662        r = repr(sock)
1663        with self.assertWarns(ResourceWarning) as cm:
1664            sock = None
1665            support.gc_collect()
1666        self.assertIn(r, str(cm.warning.args[0]))
1667        # An open socket file object gets dereferenced after the socket
1668        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1669        f = sock.makefile('rb')
1670        r = repr(sock)
1671        sock = None
1672        support.gc_collect()
1673        with self.assertWarns(ResourceWarning):
1674            f = None
1675            support.gc_collect()
1676
1677    def test_name_closed_socketio(self):
1678        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1679            fp = sock.makefile("rb")
1680            fp.close()
1681            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1682
1683    def test_unusable_closed_socketio(self):
1684        with socket.socket() as sock:
1685            fp = sock.makefile("rb", buffering=0)
1686            self.assertTrue(fp.readable())
1687            self.assertFalse(fp.writable())
1688            self.assertFalse(fp.seekable())
1689            fp.close()
1690            self.assertRaises(ValueError, fp.readable)
1691            self.assertRaises(ValueError, fp.writable)
1692            self.assertRaises(ValueError, fp.seekable)
1693
1694    def test_socket_close(self):
1695        sock = socket.socket()
1696        try:
1697            sock.bind((HOST, 0))
1698            socket.close(sock.fileno())
1699            with self.assertRaises(OSError):
1700                sock.listen(1)
1701        finally:
1702            with self.assertRaises(OSError):
1703                # sock.close() fails with EBADF
1704                sock.close()
1705        with self.assertRaises(TypeError):
1706            socket.close(None)
1707        with self.assertRaises(OSError):
1708            socket.close(-1)
1709
1710    def test_makefile_mode(self):
1711        for mode in 'r', 'rb', 'rw', 'w', 'wb':
1712            with self.subTest(mode=mode):
1713                with socket.socket() as sock:
1714                    encoding = None if "b" in mode else "utf-8"
1715                    with sock.makefile(mode, encoding=encoding) as fp:
1716                        self.assertEqual(fp.mode, mode)
1717
1718    def test_makefile_invalid_mode(self):
1719        for mode in 'rt', 'x', '+', 'a':
1720            with self.subTest(mode=mode):
1721                with socket.socket() as sock:
1722                    with self.assertRaisesRegex(ValueError, 'invalid mode'):
1723                        sock.makefile(mode)
1724
1725    def test_pickle(self):
1726        sock = socket.socket()
1727        with sock:
1728            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1729                self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1730        for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1731            family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1732            self.assertEqual(family, socket.AF_INET)
1733            type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1734            self.assertEqual(type, socket.SOCK_STREAM)
1735
1736    def test_listen_backlog(self):
1737        for backlog in 0, -1:
1738            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1739                srv.bind((HOST, 0))
1740                srv.listen(backlog)
1741
1742        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1743            srv.bind((HOST, 0))
1744            srv.listen()
1745
1746    @support.cpython_only
1747    def test_listen_backlog_overflow(self):
1748        # Issue 15989
1749        import _testcapi
1750        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1751            srv.bind((HOST, 0))
1752            self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1753
1754    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1755    def test_flowinfo(self):
1756        self.assertRaises(OverflowError, socket.getnameinfo,
1757                          (socket_helper.HOSTv6, 0, 0xffffffff), 0)
1758        with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1759            self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10))
1760
1761    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1762    def test_getaddrinfo_ipv6_basic(self):
1763        ((*_, sockaddr),) = socket.getaddrinfo(
1764            'ff02::1de:c0:face:8D',  # Note capital letter `D`.
1765            1234, socket.AF_INET6,
1766            socket.SOCK_DGRAM,
1767            socket.IPPROTO_UDP
1768        )
1769        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
1770
1771    def test_getfqdn_filter_localhost(self):
1772        self.assertEqual(socket.getfqdn(), socket.getfqdn("0.0.0.0"))
1773        self.assertEqual(socket.getfqdn(), socket.getfqdn("::"))
1774
1775    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1776    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1777    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1778    @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
1779    def test_getaddrinfo_ipv6_scopeid_symbolic(self):
1780        # Just pick up any network interface (Linux, Mac OS X)
1781        (ifindex, test_interface) = socket.if_nameindex()[0]
1782        ((*_, sockaddr),) = socket.getaddrinfo(
1783            'ff02::1de:c0:face:8D%' + test_interface,
1784            1234, socket.AF_INET6,
1785            socket.SOCK_DGRAM,
1786            socket.IPPROTO_UDP
1787        )
1788        # Note missing interface name part in IPv6 address
1789        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1790
1791    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1792    @unittest.skipUnless(
1793        sys.platform == 'win32',
1794        'Numeric scope id does not work or undocumented')
1795    def test_getaddrinfo_ipv6_scopeid_numeric(self):
1796        # Also works on Linux and Mac OS X, but is not documented (?)
1797        # Windows, Linux and Max OS X allow nonexistent interface numbers here.
1798        ifindex = 42
1799        ((*_, sockaddr),) = socket.getaddrinfo(
1800            'ff02::1de:c0:face:8D%' + str(ifindex),
1801            1234, socket.AF_INET6,
1802            socket.SOCK_DGRAM,
1803            socket.IPPROTO_UDP
1804        )
1805        # Note missing interface name part in IPv6 address
1806        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1807
1808    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1809    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1810    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1811    @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
1812    def test_getnameinfo_ipv6_scopeid_symbolic(self):
1813        # Just pick up any network interface.
1814        (ifindex, test_interface) = socket.if_nameindex()[0]
1815        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1816        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1817        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
1818
1819    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1820    @unittest.skipUnless( sys.platform == 'win32',
1821        'Numeric scope id does not work or undocumented')
1822    def test_getnameinfo_ipv6_scopeid_numeric(self):
1823        # Also works on Linux (undocumented), but does not work on Mac OS X
1824        # Windows and Linux allow nonexistent interface numbers here.
1825        ifindex = 42
1826        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1827        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1828        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234'))
1829
1830    def test_str_for_enums(self):
1831        # Make sure that the AF_* and SOCK_* constants have enum-like string
1832        # reprs.
1833        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1834            self.assertEqual(repr(s.family), '<AddressFamily.AF_INET: %r>' % s.family.value)
1835            self.assertEqual(repr(s.type), '<SocketKind.SOCK_STREAM: %r>' % s.type.value)
1836            self.assertEqual(str(s.family), str(s.family.value))
1837            self.assertEqual(str(s.type), str(s.type.value))
1838
1839    def test_socket_consistent_sock_type(self):
1840        SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
1841        SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0)
1842        sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC
1843
1844        with socket.socket(socket.AF_INET, sock_type) as s:
1845            self.assertEqual(s.type, socket.SOCK_STREAM)
1846            s.settimeout(1)
1847            self.assertEqual(s.type, socket.SOCK_STREAM)
1848            s.settimeout(0)
1849            self.assertEqual(s.type, socket.SOCK_STREAM)
1850            s.setblocking(True)
1851            self.assertEqual(s.type, socket.SOCK_STREAM)
1852            s.setblocking(False)
1853            self.assertEqual(s.type, socket.SOCK_STREAM)
1854
1855    def test_unknown_socket_family_repr(self):
1856        # Test that when created with a family that's not one of the known
1857        # AF_*/SOCK_* constants, socket.family just returns the number.
1858        #
1859        # To do this we fool socket.socket into believing it already has an
1860        # open fd because on this path it doesn't actually verify the family and
1861        # type and populates the socket object.
1862        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1863        fd = sock.detach()
1864        unknown_family = max(socket.AddressFamily.__members__.values()) + 1
1865
1866        unknown_type = max(
1867            kind
1868            for name, kind in socket.SocketKind.__members__.items()
1869            if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'}
1870        ) + 1
1871
1872        with socket.socket(
1873                family=unknown_family, type=unknown_type, proto=23,
1874                fileno=fd) as s:
1875            self.assertEqual(s.family, unknown_family)
1876            self.assertEqual(s.type, unknown_type)
1877            # some OS like macOS ignore proto
1878            self.assertIn(s.proto, {0, 23})
1879
1880    @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1881    def test__sendfile_use_sendfile(self):
1882        class File:
1883            def __init__(self, fd):
1884                self.fd = fd
1885
1886            def fileno(self):
1887                return self.fd
1888        with socket.socket() as sock:
1889            fd = os.open(os.curdir, os.O_RDONLY)
1890            os.close(fd)
1891            with self.assertRaises(socket._GiveupOnSendfile):
1892                sock._sendfile_use_sendfile(File(fd))
1893            with self.assertRaises(OverflowError):
1894                sock._sendfile_use_sendfile(File(2**1000))
1895            with self.assertRaises(TypeError):
1896                sock._sendfile_use_sendfile(File(None))
1897
1898    def _test_socket_fileno(self, s, family, stype):
1899        self.assertEqual(s.family, family)
1900        self.assertEqual(s.type, stype)
1901
1902        fd = s.fileno()
1903        s2 = socket.socket(fileno=fd)
1904        self.addCleanup(s2.close)
1905        # detach old fd to avoid double close
1906        s.detach()
1907        self.assertEqual(s2.family, family)
1908        self.assertEqual(s2.type, stype)
1909        self.assertEqual(s2.fileno(), fd)
1910
1911    def test_socket_fileno(self):
1912        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1913        self.addCleanup(s.close)
1914        s.bind((socket_helper.HOST, 0))
1915        self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
1916
1917        if hasattr(socket, "SOCK_DGRAM"):
1918            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1919            self.addCleanup(s.close)
1920            s.bind((socket_helper.HOST, 0))
1921            self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
1922
1923        if socket_helper.IPV6_ENABLED:
1924            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1925            self.addCleanup(s.close)
1926            s.bind((socket_helper.HOSTv6, 0, 0, 0))
1927            self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
1928
1929        if hasattr(socket, "AF_UNIX"):
1930            tmpdir = tempfile.mkdtemp()
1931            self.addCleanup(shutil.rmtree, tmpdir)
1932            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1933            self.addCleanup(s.close)
1934            try:
1935                s.bind(os.path.join(tmpdir, 'socket'))
1936            except PermissionError:
1937                pass
1938            else:
1939                self._test_socket_fileno(s, socket.AF_UNIX,
1940                                         socket.SOCK_STREAM)
1941
1942    def test_socket_fileno_rejects_float(self):
1943        with self.assertRaises(TypeError):
1944            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5)
1945
1946    def test_socket_fileno_rejects_other_types(self):
1947        with self.assertRaises(TypeError):
1948            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo")
1949
1950    def test_socket_fileno_rejects_invalid_socket(self):
1951        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1952            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1)
1953
1954    @unittest.skipIf(os.name == "nt", "Windows disallows -1 only")
1955    def test_socket_fileno_rejects_negative(self):
1956        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1957            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42)
1958
1959    def test_socket_fileno_requires_valid_fd(self):
1960        WSAENOTSOCK = 10038
1961        with self.assertRaises(OSError) as cm:
1962            socket.socket(fileno=os_helper.make_bad_fd())
1963        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1964
1965        with self.assertRaises(OSError) as cm:
1966            socket.socket(
1967                socket.AF_INET,
1968                socket.SOCK_STREAM,
1969                fileno=os_helper.make_bad_fd())
1970        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1971
1972    def test_socket_fileno_requires_socket_fd(self):
1973        with tempfile.NamedTemporaryFile() as afile:
1974            with self.assertRaises(OSError):
1975                socket.socket(fileno=afile.fileno())
1976
1977            with self.assertRaises(OSError) as cm:
1978                socket.socket(
1979                    socket.AF_INET,
1980                    socket.SOCK_STREAM,
1981                    fileno=afile.fileno())
1982            self.assertEqual(cm.exception.errno, errno.ENOTSOCK)
1983
1984    def test_addressfamily_enum(self):
1985        import _socket, enum
1986        CheckedAddressFamily = enum._old_convert_(
1987                enum.IntEnum, 'AddressFamily', 'socket',
1988                lambda C: C.isupper() and C.startswith('AF_'),
1989                source=_socket,
1990                )
1991        enum._test_simple_enum(CheckedAddressFamily, socket.AddressFamily)
1992
1993    def test_socketkind_enum(self):
1994        import _socket, enum
1995        CheckedSocketKind = enum._old_convert_(
1996                enum.IntEnum, 'SocketKind', 'socket',
1997                lambda C: C.isupper() and C.startswith('SOCK_'),
1998                source=_socket,
1999                )
2000        enum._test_simple_enum(CheckedSocketKind, socket.SocketKind)
2001
2002    def test_msgflag_enum(self):
2003        import _socket, enum
2004        CheckedMsgFlag = enum._old_convert_(
2005                enum.IntFlag, 'MsgFlag', 'socket',
2006                lambda C: C.isupper() and C.startswith('MSG_'),
2007                source=_socket,
2008                )
2009        enum._test_simple_enum(CheckedMsgFlag, socket.MsgFlag)
2010
2011    def test_addressinfo_enum(self):
2012        import _socket, enum
2013        CheckedAddressInfo = enum._old_convert_(
2014                enum.IntFlag, 'AddressInfo', 'socket',
2015                lambda C: C.isupper() and C.startswith('AI_'),
2016                source=_socket)
2017        enum._test_simple_enum(CheckedAddressInfo, socket.AddressInfo)
2018
2019
2020@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
2021class BasicCANTest(unittest.TestCase):
2022
2023    def testCrucialConstants(self):
2024        socket.AF_CAN
2025        socket.PF_CAN
2026        socket.CAN_RAW
2027
2028    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2029                         'socket.CAN_BCM required for this test.')
2030    def testBCMConstants(self):
2031        socket.CAN_BCM
2032
2033        # opcodes
2034        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task
2035        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task
2036        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task
2037        socket.CAN_BCM_TX_SEND      # send one CAN frame
2038        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription
2039        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription
2040        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription
2041        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request
2042        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0)
2043        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request
2044        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent
2045        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change)
2046
2047        # flags
2048        socket.CAN_BCM_SETTIMER
2049        socket.CAN_BCM_STARTTIMER
2050        socket.CAN_BCM_TX_COUNTEVT
2051        socket.CAN_BCM_TX_ANNOUNCE
2052        socket.CAN_BCM_TX_CP_CAN_ID
2053        socket.CAN_BCM_RX_FILTER_ID
2054        socket.CAN_BCM_RX_CHECK_DLC
2055        socket.CAN_BCM_RX_NO_AUTOTIMER
2056        socket.CAN_BCM_RX_ANNOUNCE_RESUME
2057        socket.CAN_BCM_TX_RESET_MULTI_IDX
2058        socket.CAN_BCM_RX_RTR_FRAME
2059
2060    def testCreateSocket(self):
2061        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2062            pass
2063
2064    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2065                         'socket.CAN_BCM required for this test.')
2066    def testCreateBCMSocket(self):
2067        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
2068            pass
2069
2070    def testBindAny(self):
2071        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2072            address = ('', )
2073            s.bind(address)
2074            self.assertEqual(s.getsockname(), address)
2075
2076    def testTooLongInterfaceName(self):
2077        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2078        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2079            self.assertRaisesRegex(OSError, 'interface name too long',
2080                                   s.bind, ('x' * 1024,))
2081
2082    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
2083                         'socket.CAN_RAW_LOOPBACK required for this test.')
2084    def testLoopback(self):
2085        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2086            for loopback in (0, 1):
2087                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
2088                             loopback)
2089                self.assertEqual(loopback,
2090                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
2091
2092    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
2093                         'socket.CAN_RAW_FILTER required for this test.')
2094    def testFilter(self):
2095        can_id, can_mask = 0x200, 0x700
2096        can_filter = struct.pack("=II", can_id, can_mask)
2097        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2098            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
2099            self.assertEqual(can_filter,
2100                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
2101            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
2102
2103
2104@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
2105class CANTest(ThreadedCANSocketTest):
2106
2107    def __init__(self, methodName='runTest'):
2108        ThreadedCANSocketTest.__init__(self, methodName=methodName)
2109
2110    @classmethod
2111    def build_can_frame(cls, can_id, data):
2112        """Build a CAN frame."""
2113        can_dlc = len(data)
2114        data = data.ljust(8, b'\x00')
2115        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
2116
2117    @classmethod
2118    def dissect_can_frame(cls, frame):
2119        """Dissect a CAN frame."""
2120        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
2121        return (can_id, can_dlc, data[:can_dlc])
2122
2123    def testSendFrame(self):
2124        cf, addr = self.s.recvfrom(self.bufsize)
2125        self.assertEqual(self.cf, cf)
2126        self.assertEqual(addr[0], self.interface)
2127
2128    def _testSendFrame(self):
2129        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
2130        self.cli.send(self.cf)
2131
2132    def testSendMaxFrame(self):
2133        cf, addr = self.s.recvfrom(self.bufsize)
2134        self.assertEqual(self.cf, cf)
2135
2136    def _testSendMaxFrame(self):
2137        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
2138        self.cli.send(self.cf)
2139
2140    def testSendMultiFrames(self):
2141        cf, addr = self.s.recvfrom(self.bufsize)
2142        self.assertEqual(self.cf1, cf)
2143
2144        cf, addr = self.s.recvfrom(self.bufsize)
2145        self.assertEqual(self.cf2, cf)
2146
2147    def _testSendMultiFrames(self):
2148        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
2149        self.cli.send(self.cf1)
2150
2151        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
2152        self.cli.send(self.cf2)
2153
2154    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2155                         'socket.CAN_BCM required for this test.')
2156    def _testBCM(self):
2157        cf, addr = self.cli.recvfrom(self.bufsize)
2158        self.assertEqual(self.cf, cf)
2159        can_id, can_dlc, data = self.dissect_can_frame(cf)
2160        self.assertEqual(self.can_id, can_id)
2161        self.assertEqual(self.data, data)
2162
2163    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2164                         'socket.CAN_BCM required for this test.')
2165    def testBCM(self):
2166        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
2167        self.addCleanup(bcm.close)
2168        bcm.connect((self.interface,))
2169        self.can_id = 0x123
2170        self.data = bytes([0xc0, 0xff, 0xee])
2171        self.cf = self.build_can_frame(self.can_id, self.data)
2172        opcode = socket.CAN_BCM_TX_SEND
2173        flags = 0
2174        count = 0
2175        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
2176        bcm_can_id = 0x0222
2177        nframes = 1
2178        assert len(self.cf) == 16
2179        header = struct.pack(self.bcm_cmd_msg_fmt,
2180                    opcode,
2181                    flags,
2182                    count,
2183                    ival1_seconds,
2184                    ival1_usec,
2185                    ival2_seconds,
2186                    ival2_usec,
2187                    bcm_can_id,
2188                    nframes,
2189                    )
2190        header_plus_frame = header + self.cf
2191        bytes_sent = bcm.send(header_plus_frame)
2192        self.assertEqual(bytes_sent, len(header_plus_frame))
2193
2194
2195@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
2196class ISOTPTest(unittest.TestCase):
2197
2198    def __init__(self, *args, **kwargs):
2199        super().__init__(*args, **kwargs)
2200        self.interface = "vcan0"
2201
2202    def testCrucialConstants(self):
2203        socket.AF_CAN
2204        socket.PF_CAN
2205        socket.CAN_ISOTP
2206        socket.SOCK_DGRAM
2207
2208    def testCreateSocket(self):
2209        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2210            pass
2211
2212    @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
2213                         'socket.CAN_ISOTP required for this test.')
2214    def testCreateISOTPSocket(self):
2215        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2216            pass
2217
2218    def testTooLongInterfaceName(self):
2219        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2220        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2221            with self.assertRaisesRegex(OSError, 'interface name too long'):
2222                s.bind(('x' * 1024, 1, 2))
2223
2224    def testBind(self):
2225        try:
2226            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2227                addr = self.interface, 0x123, 0x456
2228                s.bind(addr)
2229                self.assertEqual(s.getsockname(), addr)
2230        except OSError as e:
2231            if e.errno == errno.ENODEV:
2232                self.skipTest('network interface `%s` does not exist' %
2233                           self.interface)
2234            else:
2235                raise
2236
2237
2238@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.')
2239class J1939Test(unittest.TestCase):
2240
2241    def __init__(self, *args, **kwargs):
2242        super().__init__(*args, **kwargs)
2243        self.interface = "vcan0"
2244
2245    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2246                         'socket.CAN_J1939 required for this test.')
2247    def testJ1939Constants(self):
2248        socket.CAN_J1939
2249
2250        socket.J1939_MAX_UNICAST_ADDR
2251        socket.J1939_IDLE_ADDR
2252        socket.J1939_NO_ADDR
2253        socket.J1939_NO_NAME
2254        socket.J1939_PGN_REQUEST
2255        socket.J1939_PGN_ADDRESS_CLAIMED
2256        socket.J1939_PGN_ADDRESS_COMMANDED
2257        socket.J1939_PGN_PDU1_MAX
2258        socket.J1939_PGN_MAX
2259        socket.J1939_NO_PGN
2260
2261        # J1939 socket options
2262        socket.SO_J1939_FILTER
2263        socket.SO_J1939_PROMISC
2264        socket.SO_J1939_SEND_PRIO
2265        socket.SO_J1939_ERRQUEUE
2266
2267        socket.SCM_J1939_DEST_ADDR
2268        socket.SCM_J1939_DEST_NAME
2269        socket.SCM_J1939_PRIO
2270        socket.SCM_J1939_ERRQUEUE
2271
2272        socket.J1939_NLA_PAD
2273        socket.J1939_NLA_BYTES_ACKED
2274
2275        socket.J1939_EE_INFO_NONE
2276        socket.J1939_EE_INFO_TX_ABORT
2277
2278        socket.J1939_FILTER_MAX
2279
2280    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2281                         'socket.CAN_J1939 required for this test.')
2282    def testCreateJ1939Socket(self):
2283        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2284            pass
2285
2286    def testBind(self):
2287        try:
2288            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2289                addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR
2290                s.bind(addr)
2291                self.assertEqual(s.getsockname(), addr)
2292        except OSError as e:
2293            if e.errno == errno.ENODEV:
2294                self.skipTest('network interface `%s` does not exist' %
2295                           self.interface)
2296            else:
2297                raise
2298
2299
2300@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2301class BasicRDSTest(unittest.TestCase):
2302
2303    def testCrucialConstants(self):
2304        socket.AF_RDS
2305        socket.PF_RDS
2306
2307    def testCreateSocket(self):
2308        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2309            pass
2310
2311    def testSocketBufferSize(self):
2312        bufsize = 16384
2313        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2314            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
2315            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
2316
2317
2318@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2319class RDSTest(ThreadedRDSSocketTest):
2320
2321    def __init__(self, methodName='runTest'):
2322        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
2323
2324    def setUp(self):
2325        super().setUp()
2326        self.evt = threading.Event()
2327
2328    def testSendAndRecv(self):
2329        data, addr = self.serv.recvfrom(self.bufsize)
2330        self.assertEqual(self.data, data)
2331        self.assertEqual(self.cli_addr, addr)
2332
2333    def _testSendAndRecv(self):
2334        self.data = b'spam'
2335        self.cli.sendto(self.data, 0, (HOST, self.port))
2336
2337    def testPeek(self):
2338        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
2339        self.assertEqual(self.data, data)
2340        data, addr = self.serv.recvfrom(self.bufsize)
2341        self.assertEqual(self.data, data)
2342
2343    def _testPeek(self):
2344        self.data = b'spam'
2345        self.cli.sendto(self.data, 0, (HOST, self.port))
2346
2347    @requireAttrs(socket.socket, 'recvmsg')
2348    def testSendAndRecvMsg(self):
2349        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
2350        self.assertEqual(self.data, data)
2351
2352    @requireAttrs(socket.socket, 'sendmsg')
2353    def _testSendAndRecvMsg(self):
2354        self.data = b'hello ' * 10
2355        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
2356
2357    def testSendAndRecvMulti(self):
2358        data, addr = self.serv.recvfrom(self.bufsize)
2359        self.assertEqual(self.data1, data)
2360
2361        data, addr = self.serv.recvfrom(self.bufsize)
2362        self.assertEqual(self.data2, data)
2363
2364    def _testSendAndRecvMulti(self):
2365        self.data1 = b'bacon'
2366        self.cli.sendto(self.data1, 0, (HOST, self.port))
2367
2368        self.data2 = b'egg'
2369        self.cli.sendto(self.data2, 0, (HOST, self.port))
2370
2371    def testSelect(self):
2372        r, w, x = select.select([self.serv], [], [], 3.0)
2373        self.assertIn(self.serv, r)
2374        data, addr = self.serv.recvfrom(self.bufsize)
2375        self.assertEqual(self.data, data)
2376
2377    def _testSelect(self):
2378        self.data = b'select'
2379        self.cli.sendto(self.data, 0, (HOST, self.port))
2380
2381@unittest.skipUnless(HAVE_SOCKET_QIPCRTR,
2382          'QIPCRTR sockets required for this test.')
2383class BasicQIPCRTRTest(unittest.TestCase):
2384
2385    def testCrucialConstants(self):
2386        socket.AF_QIPCRTR
2387
2388    def testCreateSocket(self):
2389        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2390            pass
2391
2392    def testUnbound(self):
2393        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2394            self.assertEqual(s.getsockname()[1], 0)
2395
2396    def testBindSock(self):
2397        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2398            socket_helper.bind_port(s, host=s.getsockname()[0])
2399            self.assertNotEqual(s.getsockname()[1], 0)
2400
2401    def testInvalidBindSock(self):
2402        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2403            self.assertRaises(OSError, socket_helper.bind_port, s, host=-2)
2404
2405    def testAutoBindSock(self):
2406        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2407            s.connect((123, 123))
2408            self.assertNotEqual(s.getsockname()[1], 0)
2409
2410@unittest.skipIf(fcntl is None, "need fcntl")
2411@unittest.skipUnless(HAVE_SOCKET_VSOCK,
2412          'VSOCK sockets required for this test.')
2413class BasicVSOCKTest(unittest.TestCase):
2414
2415    def testCrucialConstants(self):
2416        socket.AF_VSOCK
2417
2418    def testVSOCKConstants(self):
2419        socket.SO_VM_SOCKETS_BUFFER_SIZE
2420        socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE
2421        socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE
2422        socket.VMADDR_CID_ANY
2423        socket.VMADDR_PORT_ANY
2424        socket.VMADDR_CID_HOST
2425        socket.VM_SOCKETS_INVALID_VERSION
2426        socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID
2427
2428    def testCreateSocket(self):
2429        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2430            pass
2431
2432    def testSocketBufferSize(self):
2433        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2434            orig_max = s.getsockopt(socket.AF_VSOCK,
2435                                    socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)
2436            orig = s.getsockopt(socket.AF_VSOCK,
2437                                socket.SO_VM_SOCKETS_BUFFER_SIZE)
2438            orig_min = s.getsockopt(socket.AF_VSOCK,
2439                                    socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)
2440
2441            s.setsockopt(socket.AF_VSOCK,
2442                         socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2)
2443            s.setsockopt(socket.AF_VSOCK,
2444                         socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2)
2445            s.setsockopt(socket.AF_VSOCK,
2446                         socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2)
2447
2448            self.assertEqual(orig_max * 2,
2449                             s.getsockopt(socket.AF_VSOCK,
2450                             socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE))
2451            self.assertEqual(orig * 2,
2452                             s.getsockopt(socket.AF_VSOCK,
2453                             socket.SO_VM_SOCKETS_BUFFER_SIZE))
2454            self.assertEqual(orig_min * 2,
2455                             s.getsockopt(socket.AF_VSOCK,
2456                             socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE))
2457
2458
2459@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH,
2460                     'Bluetooth sockets required for this test.')
2461class BasicBluetoothTest(unittest.TestCase):
2462
2463    def testBluetoothConstants(self):
2464        socket.BDADDR_ANY
2465        socket.BDADDR_LOCAL
2466        socket.AF_BLUETOOTH
2467        socket.BTPROTO_RFCOMM
2468
2469        if sys.platform != "win32":
2470            socket.BTPROTO_HCI
2471            socket.SOL_HCI
2472            socket.BTPROTO_L2CAP
2473
2474            if not sys.platform.startswith("freebsd"):
2475                socket.BTPROTO_SCO
2476
2477    def testCreateRfcommSocket(self):
2478        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
2479            pass
2480
2481    @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets")
2482    def testCreateL2capSocket(self):
2483        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s:
2484            pass
2485
2486    @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets")
2487    def testCreateHciSocket(self):
2488        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
2489            pass
2490
2491    @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"),
2492                     "windows and freebsd do not support SCO sockets")
2493    def testCreateScoSocket(self):
2494        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
2495            pass
2496
2497
2498class BasicTCPTest(SocketConnectedTest):
2499
2500    def __init__(self, methodName='runTest'):
2501        SocketConnectedTest.__init__(self, methodName=methodName)
2502
2503    def testRecv(self):
2504        # Testing large receive over TCP
2505        msg = self.cli_conn.recv(1024)
2506        self.assertEqual(msg, MSG)
2507
2508    def _testRecv(self):
2509        self.serv_conn.send(MSG)
2510
2511    def testOverFlowRecv(self):
2512        # Testing receive in chunks over TCP
2513        seg1 = self.cli_conn.recv(len(MSG) - 3)
2514        seg2 = self.cli_conn.recv(1024)
2515        msg = seg1 + seg2
2516        self.assertEqual(msg, MSG)
2517
2518    def _testOverFlowRecv(self):
2519        self.serv_conn.send(MSG)
2520
2521    def testRecvFrom(self):
2522        # Testing large recvfrom() over TCP
2523        msg, addr = self.cli_conn.recvfrom(1024)
2524        self.assertEqual(msg, MSG)
2525
2526    def _testRecvFrom(self):
2527        self.serv_conn.send(MSG)
2528
2529    def testOverFlowRecvFrom(self):
2530        # Testing recvfrom() in chunks over TCP
2531        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
2532        seg2, addr = self.cli_conn.recvfrom(1024)
2533        msg = seg1 + seg2
2534        self.assertEqual(msg, MSG)
2535
2536    def _testOverFlowRecvFrom(self):
2537        self.serv_conn.send(MSG)
2538
2539    def testSendAll(self):
2540        # Testing sendall() with a 2048 byte string over TCP
2541        msg = b''
2542        while 1:
2543            read = self.cli_conn.recv(1024)
2544            if not read:
2545                break
2546            msg += read
2547        self.assertEqual(msg, b'f' * 2048)
2548
2549    def _testSendAll(self):
2550        big_chunk = b'f' * 2048
2551        self.serv_conn.sendall(big_chunk)
2552
2553    def testFromFd(self):
2554        # Testing fromfd()
2555        fd = self.cli_conn.fileno()
2556        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
2557        self.addCleanup(sock.close)
2558        self.assertIsInstance(sock, socket.socket)
2559        msg = sock.recv(1024)
2560        self.assertEqual(msg, MSG)
2561
2562    def _testFromFd(self):
2563        self.serv_conn.send(MSG)
2564
2565    def testDup(self):
2566        # Testing dup()
2567        sock = self.cli_conn.dup()
2568        self.addCleanup(sock.close)
2569        msg = sock.recv(1024)
2570        self.assertEqual(msg, MSG)
2571
2572    def _testDup(self):
2573        self.serv_conn.send(MSG)
2574
2575    def testShutdown(self):
2576        # Testing shutdown()
2577        msg = self.cli_conn.recv(1024)
2578        self.assertEqual(msg, MSG)
2579        # wait for _testShutdown to finish: on OS X, when the server
2580        # closes the connection the client also becomes disconnected,
2581        # and the client's shutdown call will fail. (Issue #4397.)
2582        self.done.wait()
2583
2584    def _testShutdown(self):
2585        self.serv_conn.send(MSG)
2586        self.serv_conn.shutdown(2)
2587
2588    testShutdown_overflow = support.cpython_only(testShutdown)
2589
2590    @support.cpython_only
2591    def _testShutdown_overflow(self):
2592        import _testcapi
2593        self.serv_conn.send(MSG)
2594        # Issue 15989
2595        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2596                          _testcapi.INT_MAX + 1)
2597        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2598                          2 + (_testcapi.UINT_MAX + 1))
2599        self.serv_conn.shutdown(2)
2600
2601    def testDetach(self):
2602        # Testing detach()
2603        fileno = self.cli_conn.fileno()
2604        f = self.cli_conn.detach()
2605        self.assertEqual(f, fileno)
2606        # cli_conn cannot be used anymore...
2607        self.assertTrue(self.cli_conn._closed)
2608        self.assertRaises(OSError, self.cli_conn.recv, 1024)
2609        self.cli_conn.close()
2610        # ...but we can create another socket using the (still open)
2611        # file descriptor
2612        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
2613        self.addCleanup(sock.close)
2614        msg = sock.recv(1024)
2615        self.assertEqual(msg, MSG)
2616
2617    def _testDetach(self):
2618        self.serv_conn.send(MSG)
2619
2620
2621class BasicUDPTest(ThreadedUDPSocketTest):
2622
2623    def __init__(self, methodName='runTest'):
2624        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
2625
2626    def testSendtoAndRecv(self):
2627        # Testing sendto() and Recv() over UDP
2628        msg = self.serv.recv(len(MSG))
2629        self.assertEqual(msg, MSG)
2630
2631    def _testSendtoAndRecv(self):
2632        self.cli.sendto(MSG, 0, (HOST, self.port))
2633
2634    def testRecvFrom(self):
2635        # Testing recvfrom() over UDP
2636        msg, addr = self.serv.recvfrom(len(MSG))
2637        self.assertEqual(msg, MSG)
2638
2639    def _testRecvFrom(self):
2640        self.cli.sendto(MSG, 0, (HOST, self.port))
2641
2642    def testRecvFromNegative(self):
2643        # Negative lengths passed to recvfrom should give ValueError.
2644        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2645
2646    def _testRecvFromNegative(self):
2647        self.cli.sendto(MSG, 0, (HOST, self.port))
2648
2649
2650@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
2651          'UDPLITE sockets required for this test.')
2652class BasicUDPLITETest(ThreadedUDPLITESocketTest):
2653
2654    def __init__(self, methodName='runTest'):
2655        ThreadedUDPLITESocketTest.__init__(self, methodName=methodName)
2656
2657    def testSendtoAndRecv(self):
2658        # Testing sendto() and Recv() over UDPLITE
2659        msg = self.serv.recv(len(MSG))
2660        self.assertEqual(msg, MSG)
2661
2662    def _testSendtoAndRecv(self):
2663        self.cli.sendto(MSG, 0, (HOST, self.port))
2664
2665    def testRecvFrom(self):
2666        # Testing recvfrom() over UDPLITE
2667        msg, addr = self.serv.recvfrom(len(MSG))
2668        self.assertEqual(msg, MSG)
2669
2670    def _testRecvFrom(self):
2671        self.cli.sendto(MSG, 0, (HOST, self.port))
2672
2673    def testRecvFromNegative(self):
2674        # Negative lengths passed to recvfrom should give ValueError.
2675        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2676
2677    def _testRecvFromNegative(self):
2678        self.cli.sendto(MSG, 0, (HOST, self.port))
2679
2680# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
2681# same test code is used with different families and types of socket
2682# (e.g. stream, datagram), and tests using recvmsg() are repeated
2683# using recvmsg_into().
2684#
2685# The generic test classes such as SendmsgTests and
2686# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
2687# supplied with sockets cli_sock and serv_sock representing the
2688# client's and the server's end of the connection respectively, and
2689# attributes cli_addr and serv_addr holding their (numeric where
2690# appropriate) addresses.
2691#
2692# The final concrete test classes combine these with subclasses of
2693# SocketTestBase which set up client and server sockets of a specific
2694# type, and with subclasses of SendrecvmsgBase such as
2695# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
2696# sockets to cli_sock and serv_sock and override the methods and
2697# attributes of SendrecvmsgBase to fill in destination addresses if
2698# needed when sending, check for specific flags in msg_flags, etc.
2699#
2700# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
2701# recvmsg_into().
2702
2703# XXX: like the other datagram (UDP) tests in this module, the code
2704# here assumes that datagram delivery on the local machine will be
2705# reliable.
2706
2707class SendrecvmsgBase(ThreadSafeCleanupTestCase):
2708    # Base class for sendmsg()/recvmsg() tests.
2709
2710    # Time in seconds to wait before considering a test failed, or
2711    # None for no timeout.  Not all tests actually set a timeout.
2712    fail_timeout = support.LOOPBACK_TIMEOUT
2713
2714    def setUp(self):
2715        self.misc_event = threading.Event()
2716        super().setUp()
2717
2718    def sendToServer(self, msg):
2719        # Send msg to the server.
2720        return self.cli_sock.send(msg)
2721
2722    # Tuple of alternative default arguments for sendmsg() when called
2723    # via sendmsgToServer() (e.g. to include a destination address).
2724    sendmsg_to_server_defaults = ()
2725
2726    def sendmsgToServer(self, *args):
2727        # Call sendmsg() on self.cli_sock with the given arguments,
2728        # filling in any arguments which are not supplied with the
2729        # corresponding items of self.sendmsg_to_server_defaults, if
2730        # any.
2731        return self.cli_sock.sendmsg(
2732            *(args + self.sendmsg_to_server_defaults[len(args):]))
2733
2734    def doRecvmsg(self, sock, bufsize, *args):
2735        # Call recvmsg() on sock with given arguments and return its
2736        # result.  Should be used for tests which can use either
2737        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
2738        # this method with one which emulates it using recvmsg_into(),
2739        # thus allowing the same test to be used for both methods.
2740        result = sock.recvmsg(bufsize, *args)
2741        self.registerRecvmsgResult(result)
2742        return result
2743
2744    def registerRecvmsgResult(self, result):
2745        # Called by doRecvmsg() with the return value of recvmsg() or
2746        # recvmsg_into().  Can be overridden to arrange cleanup based
2747        # on the returned ancillary data, for instance.
2748        pass
2749
2750    def checkRecvmsgAddress(self, addr1, addr2):
2751        # Called to compare the received address with the address of
2752        # the peer.
2753        self.assertEqual(addr1, addr2)
2754
2755    # Flags that are normally unset in msg_flags
2756    msg_flags_common_unset = 0
2757    for name in ("MSG_CTRUNC", "MSG_OOB"):
2758        msg_flags_common_unset |= getattr(socket, name, 0)
2759
2760    # Flags that are normally set
2761    msg_flags_common_set = 0
2762
2763    # Flags set when a complete record has been received (e.g. MSG_EOR
2764    # for SCTP)
2765    msg_flags_eor_indicator = 0
2766
2767    # Flags set when a complete record has not been received
2768    # (e.g. MSG_TRUNC for datagram sockets)
2769    msg_flags_non_eor_indicator = 0
2770
2771    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2772        # Method to check the value of msg_flags returned by recvmsg[_into]().
2773        #
2774        # Checks that all bits in msg_flags_common_set attribute are
2775        # set in "flags" and all bits in msg_flags_common_unset are
2776        # unset.
2777        #
2778        # The "eor" argument specifies whether the flags should
2779        # indicate that a full record (or datagram) has been received.
2780        # If "eor" is None, no checks are done; otherwise, checks
2781        # that:
2782        #
2783        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
2784        #    set and all bits in msg_flags_non_eor_indicator are unset
2785        #
2786        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
2787        #    are set and all bits in msg_flags_eor_indicator are unset
2788        #
2789        # If "checkset" and/or "checkunset" are supplied, they require
2790        # the given bits to be set or unset respectively, overriding
2791        # what the attributes require for those bits.
2792        #
2793        # If any bits are set in "ignore", they will not be checked,
2794        # regardless of the other inputs.
2795        #
2796        # Will raise Exception if the inputs require a bit to be both
2797        # set and unset, and it is not ignored.
2798
2799        defaultset = self.msg_flags_common_set
2800        defaultunset = self.msg_flags_common_unset
2801
2802        if eor:
2803            defaultset |= self.msg_flags_eor_indicator
2804            defaultunset |= self.msg_flags_non_eor_indicator
2805        elif eor is not None:
2806            defaultset |= self.msg_flags_non_eor_indicator
2807            defaultunset |= self.msg_flags_eor_indicator
2808
2809        # Function arguments override defaults
2810        defaultset &= ~checkunset
2811        defaultunset &= ~checkset
2812
2813        # Merge arguments with remaining defaults, and check for conflicts
2814        checkset |= defaultset
2815        checkunset |= defaultunset
2816        inboth = checkset & checkunset & ~ignore
2817        if inboth:
2818            raise Exception("contradictory set, unset requirements for flags "
2819                            "{0:#x}".format(inboth))
2820
2821        # Compare with given msg_flags value
2822        mask = (checkset | checkunset) & ~ignore
2823        self.assertEqual(flags & mask, checkset & mask)
2824
2825
2826class RecvmsgIntoMixin(SendrecvmsgBase):
2827    # Mixin to implement doRecvmsg() using recvmsg_into().
2828
2829    def doRecvmsg(self, sock, bufsize, *args):
2830        buf = bytearray(bufsize)
2831        result = sock.recvmsg_into([buf], *args)
2832        self.registerRecvmsgResult(result)
2833        self.assertGreaterEqual(result[0], 0)
2834        self.assertLessEqual(result[0], bufsize)
2835        return (bytes(buf[:result[0]]),) + result[1:]
2836
2837
2838class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2839    # Defines flags to be checked in msg_flags for datagram sockets.
2840
2841    @property
2842    def msg_flags_non_eor_indicator(self):
2843        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2844
2845
2846class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2847    # Defines flags to be checked in msg_flags for SCTP sockets.
2848
2849    @property
2850    def msg_flags_eor_indicator(self):
2851        return super().msg_flags_eor_indicator | socket.MSG_EOR
2852
2853
2854class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2855    # Base class for tests on connectionless-mode sockets.  Users must
2856    # supply sockets on attributes cli and serv to be mapped to
2857    # cli_sock and serv_sock respectively.
2858
2859    @property
2860    def serv_sock(self):
2861        return self.serv
2862
2863    @property
2864    def cli_sock(self):
2865        return self.cli
2866
2867    @property
2868    def sendmsg_to_server_defaults(self):
2869        return ([], [], 0, self.serv_addr)
2870
2871    def sendToServer(self, msg):
2872        return self.cli_sock.sendto(msg, self.serv_addr)
2873
2874
2875class SendrecvmsgConnectedBase(SendrecvmsgBase):
2876    # Base class for tests on connected sockets.  Users must supply
2877    # sockets on attributes serv_conn and cli_conn (representing the
2878    # connections *to* the server and the client), to be mapped to
2879    # cli_sock and serv_sock respectively.
2880
2881    @property
2882    def serv_sock(self):
2883        return self.cli_conn
2884
2885    @property
2886    def cli_sock(self):
2887        return self.serv_conn
2888
2889    def checkRecvmsgAddress(self, addr1, addr2):
2890        # Address is currently "unspecified" for a connected socket,
2891        # so we don't examine it
2892        pass
2893
2894
2895class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2896    # Base class to set a timeout on server's socket.
2897
2898    def setUp(self):
2899        super().setUp()
2900        self.serv_sock.settimeout(self.fail_timeout)
2901
2902
2903class SendmsgTests(SendrecvmsgServerTimeoutBase):
2904    # Tests for sendmsg() which can use any socket type and do not
2905    # involve recvmsg() or recvmsg_into().
2906
2907    def testSendmsg(self):
2908        # Send a simple message with sendmsg().
2909        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2910
2911    def _testSendmsg(self):
2912        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2913
2914    def testSendmsgDataGenerator(self):
2915        # Send from buffer obtained from a generator (not a sequence).
2916        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2917
2918    def _testSendmsgDataGenerator(self):
2919        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2920                         len(MSG))
2921
2922    def testSendmsgAncillaryGenerator(self):
2923        # Gather (empty) ancillary data from a generator.
2924        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2925
2926    def _testSendmsgAncillaryGenerator(self):
2927        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2928                         len(MSG))
2929
2930    def testSendmsgArray(self):
2931        # Send data from an array instead of the usual bytes object.
2932        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2933
2934    def _testSendmsgArray(self):
2935        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2936                         len(MSG))
2937
2938    def testSendmsgGather(self):
2939        # Send message data from more than one buffer (gather write).
2940        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2941
2942    def _testSendmsgGather(self):
2943        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2944
2945    def testSendmsgBadArgs(self):
2946        # Check that sendmsg() rejects invalid arguments.
2947        self.assertEqual(self.serv_sock.recv(1000), b"done")
2948
2949    def _testSendmsgBadArgs(self):
2950        self.assertRaises(TypeError, self.cli_sock.sendmsg)
2951        self.assertRaises(TypeError, self.sendmsgToServer,
2952                          b"not in an iterable")
2953        self.assertRaises(TypeError, self.sendmsgToServer,
2954                          object())
2955        self.assertRaises(TypeError, self.sendmsgToServer,
2956                          [object()])
2957        self.assertRaises(TypeError, self.sendmsgToServer,
2958                          [MSG, object()])
2959        self.assertRaises(TypeError, self.sendmsgToServer,
2960                          [MSG], object())
2961        self.assertRaises(TypeError, self.sendmsgToServer,
2962                          [MSG], [], object())
2963        self.assertRaises(TypeError, self.sendmsgToServer,
2964                          [MSG], [], 0, object())
2965        self.sendToServer(b"done")
2966
2967    def testSendmsgBadCmsg(self):
2968        # Check that invalid ancillary data items are rejected.
2969        self.assertEqual(self.serv_sock.recv(1000), b"done")
2970
2971    def _testSendmsgBadCmsg(self):
2972        self.assertRaises(TypeError, self.sendmsgToServer,
2973                          [MSG], [object()])
2974        self.assertRaises(TypeError, self.sendmsgToServer,
2975                          [MSG], [(object(), 0, b"data")])
2976        self.assertRaises(TypeError, self.sendmsgToServer,
2977                          [MSG], [(0, object(), b"data")])
2978        self.assertRaises(TypeError, self.sendmsgToServer,
2979                          [MSG], [(0, 0, object())])
2980        self.assertRaises(TypeError, self.sendmsgToServer,
2981                          [MSG], [(0, 0)])
2982        self.assertRaises(TypeError, self.sendmsgToServer,
2983                          [MSG], [(0, 0, b"data", 42)])
2984        self.sendToServer(b"done")
2985
2986    @requireAttrs(socket, "CMSG_SPACE")
2987    def testSendmsgBadMultiCmsg(self):
2988        # Check that invalid ancillary data items are rejected when
2989        # more than one item is present.
2990        self.assertEqual(self.serv_sock.recv(1000), b"done")
2991
2992    @testSendmsgBadMultiCmsg.client_skip
2993    def _testSendmsgBadMultiCmsg(self):
2994        self.assertRaises(TypeError, self.sendmsgToServer,
2995                          [MSG], [0, 0, b""])
2996        self.assertRaises(TypeError, self.sendmsgToServer,
2997                          [MSG], [(0, 0, b""), object()])
2998        self.sendToServer(b"done")
2999
3000    def testSendmsgExcessCmsgReject(self):
3001        # Check that sendmsg() rejects excess ancillary data items
3002        # when the number that can be sent is limited.
3003        self.assertEqual(self.serv_sock.recv(1000), b"done")
3004
3005    def _testSendmsgExcessCmsgReject(self):
3006        if not hasattr(socket, "CMSG_SPACE"):
3007            # Can only send one item
3008            with self.assertRaises(OSError) as cm:
3009                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
3010            self.assertIsNone(cm.exception.errno)
3011        self.sendToServer(b"done")
3012
3013    def testSendmsgAfterClose(self):
3014        # Check that sendmsg() fails on a closed socket.
3015        pass
3016
3017    def _testSendmsgAfterClose(self):
3018        self.cli_sock.close()
3019        self.assertRaises(OSError, self.sendmsgToServer, [MSG])
3020
3021
3022class SendmsgStreamTests(SendmsgTests):
3023    # Tests for sendmsg() which require a stream socket and do not
3024    # involve recvmsg() or recvmsg_into().
3025
3026    def testSendmsgExplicitNoneAddr(self):
3027        # Check that peer address can be specified as None.
3028        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
3029
3030    def _testSendmsgExplicitNoneAddr(self):
3031        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
3032
3033    def testSendmsgTimeout(self):
3034        # Check that timeout works with sendmsg().
3035        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
3036        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3037
3038    def _testSendmsgTimeout(self):
3039        try:
3040            self.cli_sock.settimeout(0.03)
3041            try:
3042                while True:
3043                    self.sendmsgToServer([b"a"*512])
3044            except TimeoutError:
3045                pass
3046            except OSError as exc:
3047                if exc.errno != errno.ENOMEM:
3048                    raise
3049                # bpo-33937 the test randomly fails on Travis CI with
3050                # "OSError: [Errno 12] Cannot allocate memory"
3051            else:
3052                self.fail("TimeoutError not raised")
3053        finally:
3054            self.misc_event.set()
3055
3056    # XXX: would be nice to have more tests for sendmsg flags argument.
3057
3058    # Linux supports MSG_DONTWAIT when sending, but in general, it
3059    # only works when receiving.  Could add other platforms if they
3060    # support it too.
3061    @skipWithClientIf(sys.platform not in {"linux"},
3062                      "MSG_DONTWAIT not known to work on this platform when "
3063                      "sending")
3064    def testSendmsgDontWait(self):
3065        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
3066        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
3067        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3068
3069    @testSendmsgDontWait.client_skip
3070    def _testSendmsgDontWait(self):
3071        try:
3072            with self.assertRaises(OSError) as cm:
3073                while True:
3074                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
3075            # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI
3076            # with "OSError: [Errno 12] Cannot allocate memory"
3077            self.assertIn(cm.exception.errno,
3078                          (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM))
3079        finally:
3080            self.misc_event.set()
3081
3082
3083class SendmsgConnectionlessTests(SendmsgTests):
3084    # Tests for sendmsg() which require a connectionless-mode
3085    # (e.g. datagram) socket, and do not involve recvmsg() or
3086    # recvmsg_into().
3087
3088    def testSendmsgNoDestAddr(self):
3089        # Check that sendmsg() fails when no destination address is
3090        # given for unconnected socket.
3091        pass
3092
3093    def _testSendmsgNoDestAddr(self):
3094        self.assertRaises(OSError, self.cli_sock.sendmsg,
3095                          [MSG])
3096        self.assertRaises(OSError, self.cli_sock.sendmsg,
3097                          [MSG], [], 0, None)
3098
3099
3100class RecvmsgGenericTests(SendrecvmsgBase):
3101    # Tests for recvmsg() which can also be emulated using
3102    # recvmsg_into(), and can use any socket type.
3103
3104    def testRecvmsg(self):
3105        # Receive a simple message with recvmsg[_into]().
3106        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3107        self.assertEqual(msg, MSG)
3108        self.checkRecvmsgAddress(addr, self.cli_addr)
3109        self.assertEqual(ancdata, [])
3110        self.checkFlags(flags, eor=True)
3111
3112    def _testRecvmsg(self):
3113        self.sendToServer(MSG)
3114
3115    def testRecvmsgExplicitDefaults(self):
3116        # Test recvmsg[_into]() with default arguments provided explicitly.
3117        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3118                                                   len(MSG), 0, 0)
3119        self.assertEqual(msg, MSG)
3120        self.checkRecvmsgAddress(addr, self.cli_addr)
3121        self.assertEqual(ancdata, [])
3122        self.checkFlags(flags, eor=True)
3123
3124    def _testRecvmsgExplicitDefaults(self):
3125        self.sendToServer(MSG)
3126
3127    def testRecvmsgShorter(self):
3128        # Receive a message smaller than buffer.
3129        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3130                                                   len(MSG) + 42)
3131        self.assertEqual(msg, MSG)
3132        self.checkRecvmsgAddress(addr, self.cli_addr)
3133        self.assertEqual(ancdata, [])
3134        self.checkFlags(flags, eor=True)
3135
3136    def _testRecvmsgShorter(self):
3137        self.sendToServer(MSG)
3138
3139    def testRecvmsgTrunc(self):
3140        # Receive part of message, check for truncation indicators.
3141        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3142                                                   len(MSG) - 3)
3143        self.assertEqual(msg, MSG[:-3])
3144        self.checkRecvmsgAddress(addr, self.cli_addr)
3145        self.assertEqual(ancdata, [])
3146        self.checkFlags(flags, eor=False)
3147
3148    def _testRecvmsgTrunc(self):
3149        self.sendToServer(MSG)
3150
3151    def testRecvmsgShortAncillaryBuf(self):
3152        # Test ancillary data buffer too small to hold any ancillary data.
3153        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3154                                                   len(MSG), 1)
3155        self.assertEqual(msg, MSG)
3156        self.checkRecvmsgAddress(addr, self.cli_addr)
3157        self.assertEqual(ancdata, [])
3158        self.checkFlags(flags, eor=True)
3159
3160    def _testRecvmsgShortAncillaryBuf(self):
3161        self.sendToServer(MSG)
3162
3163    def testRecvmsgLongAncillaryBuf(self):
3164        # Test large ancillary data buffer.
3165        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3166                                                   len(MSG), 10240)
3167        self.assertEqual(msg, MSG)
3168        self.checkRecvmsgAddress(addr, self.cli_addr)
3169        self.assertEqual(ancdata, [])
3170        self.checkFlags(flags, eor=True)
3171
3172    def _testRecvmsgLongAncillaryBuf(self):
3173        self.sendToServer(MSG)
3174
3175    def testRecvmsgAfterClose(self):
3176        # Check that recvmsg[_into]() fails on a closed socket.
3177        self.serv_sock.close()
3178        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
3179
3180    def _testRecvmsgAfterClose(self):
3181        pass
3182
3183    def testRecvmsgTimeout(self):
3184        # Check that timeout works.
3185        try:
3186            self.serv_sock.settimeout(0.03)
3187            self.assertRaises(TimeoutError,
3188                              self.doRecvmsg, self.serv_sock, len(MSG))
3189        finally:
3190            self.misc_event.set()
3191
3192    def _testRecvmsgTimeout(self):
3193        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3194
3195    @requireAttrs(socket, "MSG_PEEK")
3196    def testRecvmsgPeek(self):
3197        # Check that MSG_PEEK in flags enables examination of pending
3198        # data without consuming it.
3199
3200        # Receive part of data with MSG_PEEK.
3201        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3202                                                   len(MSG) - 3, 0,
3203                                                   socket.MSG_PEEK)
3204        self.assertEqual(msg, MSG[:-3])
3205        self.checkRecvmsgAddress(addr, self.cli_addr)
3206        self.assertEqual(ancdata, [])
3207        # Ignoring MSG_TRUNC here (so this test is the same for stream
3208        # and datagram sockets).  Some wording in POSIX seems to
3209        # suggest that it needn't be set when peeking, but that may
3210        # just be a slip.
3211        self.checkFlags(flags, eor=False,
3212                        ignore=getattr(socket, "MSG_TRUNC", 0))
3213
3214        # Receive all data with MSG_PEEK.
3215        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3216                                                   len(MSG), 0,
3217                                                   socket.MSG_PEEK)
3218        self.assertEqual(msg, MSG)
3219        self.checkRecvmsgAddress(addr, self.cli_addr)
3220        self.assertEqual(ancdata, [])
3221        self.checkFlags(flags, eor=True)
3222
3223        # Check that the same data can still be received normally.
3224        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3225        self.assertEqual(msg, MSG)
3226        self.checkRecvmsgAddress(addr, self.cli_addr)
3227        self.assertEqual(ancdata, [])
3228        self.checkFlags(flags, eor=True)
3229
3230    @testRecvmsgPeek.client_skip
3231    def _testRecvmsgPeek(self):
3232        self.sendToServer(MSG)
3233
3234    @requireAttrs(socket.socket, "sendmsg")
3235    def testRecvmsgFromSendmsg(self):
3236        # Test receiving with recvmsg[_into]() when message is sent
3237        # using sendmsg().
3238        self.serv_sock.settimeout(self.fail_timeout)
3239        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3240        self.assertEqual(msg, MSG)
3241        self.checkRecvmsgAddress(addr, self.cli_addr)
3242        self.assertEqual(ancdata, [])
3243        self.checkFlags(flags, eor=True)
3244
3245    @testRecvmsgFromSendmsg.client_skip
3246    def _testRecvmsgFromSendmsg(self):
3247        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
3248
3249
3250class RecvmsgGenericStreamTests(RecvmsgGenericTests):
3251    # Tests which require a stream socket and can use either recvmsg()
3252    # or recvmsg_into().
3253
3254    def testRecvmsgEOF(self):
3255        # Receive end-of-stream indicator (b"", peer socket closed).
3256        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3257        self.assertEqual(msg, b"")
3258        self.checkRecvmsgAddress(addr, self.cli_addr)
3259        self.assertEqual(ancdata, [])
3260        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
3261
3262    def _testRecvmsgEOF(self):
3263        self.cli_sock.close()
3264
3265    def testRecvmsgOverflow(self):
3266        # Receive a message in more than one chunk.
3267        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3268                                                    len(MSG) - 3)
3269        self.checkRecvmsgAddress(addr, self.cli_addr)
3270        self.assertEqual(ancdata, [])
3271        self.checkFlags(flags, eor=False)
3272
3273        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3274        self.checkRecvmsgAddress(addr, self.cli_addr)
3275        self.assertEqual(ancdata, [])
3276        self.checkFlags(flags, eor=True)
3277
3278        msg = seg1 + seg2
3279        self.assertEqual(msg, MSG)
3280
3281    def _testRecvmsgOverflow(self):
3282        self.sendToServer(MSG)
3283
3284
3285class RecvmsgTests(RecvmsgGenericTests):
3286    # Tests for recvmsg() which can use any socket type.
3287
3288    def testRecvmsgBadArgs(self):
3289        # Check that recvmsg() rejects invalid arguments.
3290        self.assertRaises(TypeError, self.serv_sock.recvmsg)
3291        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3292                          -1, 0, 0)
3293        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3294                          len(MSG), -1, 0)
3295        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3296                          [bytearray(10)], 0, 0)
3297        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3298                          object(), 0, 0)
3299        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3300                          len(MSG), object(), 0)
3301        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3302                          len(MSG), 0, object())
3303
3304        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
3305        self.assertEqual(msg, MSG)
3306        self.checkRecvmsgAddress(addr, self.cli_addr)
3307        self.assertEqual(ancdata, [])
3308        self.checkFlags(flags, eor=True)
3309
3310    def _testRecvmsgBadArgs(self):
3311        self.sendToServer(MSG)
3312
3313
3314class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
3315    # Tests for recvmsg_into() which can use any socket type.
3316
3317    def testRecvmsgIntoBadArgs(self):
3318        # Check that recvmsg_into() rejects invalid arguments.
3319        buf = bytearray(len(MSG))
3320        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
3321        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3322                          len(MSG), 0, 0)
3323        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3324                          buf, 0, 0)
3325        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3326                          [object()], 0, 0)
3327        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3328                          [b"I'm not writable"], 0, 0)
3329        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3330                          [buf, object()], 0, 0)
3331        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
3332                          [buf], -1, 0)
3333        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3334                          [buf], object(), 0)
3335        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3336                          [buf], 0, object())
3337
3338        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
3339        self.assertEqual(nbytes, len(MSG))
3340        self.assertEqual(buf, bytearray(MSG))
3341        self.checkRecvmsgAddress(addr, self.cli_addr)
3342        self.assertEqual(ancdata, [])
3343        self.checkFlags(flags, eor=True)
3344
3345    def _testRecvmsgIntoBadArgs(self):
3346        self.sendToServer(MSG)
3347
3348    def testRecvmsgIntoGenerator(self):
3349        # Receive into buffer obtained from a generator (not a sequence).
3350        buf = bytearray(len(MSG))
3351        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3352            (o for o in [buf]))
3353        self.assertEqual(nbytes, len(MSG))
3354        self.assertEqual(buf, bytearray(MSG))
3355        self.checkRecvmsgAddress(addr, self.cli_addr)
3356        self.assertEqual(ancdata, [])
3357        self.checkFlags(flags, eor=True)
3358
3359    def _testRecvmsgIntoGenerator(self):
3360        self.sendToServer(MSG)
3361
3362    def testRecvmsgIntoArray(self):
3363        # Receive into an array rather than the usual bytearray.
3364        buf = array.array("B", [0] * len(MSG))
3365        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
3366        self.assertEqual(nbytes, len(MSG))
3367        self.assertEqual(buf.tobytes(), MSG)
3368        self.checkRecvmsgAddress(addr, self.cli_addr)
3369        self.assertEqual(ancdata, [])
3370        self.checkFlags(flags, eor=True)
3371
3372    def _testRecvmsgIntoArray(self):
3373        self.sendToServer(MSG)
3374
3375    def testRecvmsgIntoScatter(self):
3376        # Receive into multiple buffers (scatter write).
3377        b1 = bytearray(b"----")
3378        b2 = bytearray(b"0123456789")
3379        b3 = bytearray(b"--------------")
3380        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3381            [b1, memoryview(b2)[2:9], b3])
3382        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
3383        self.assertEqual(b1, bytearray(b"Mary"))
3384        self.assertEqual(b2, bytearray(b"01 had a 9"))
3385        self.assertEqual(b3, bytearray(b"little lamb---"))
3386        self.checkRecvmsgAddress(addr, self.cli_addr)
3387        self.assertEqual(ancdata, [])
3388        self.checkFlags(flags, eor=True)
3389
3390    def _testRecvmsgIntoScatter(self):
3391        self.sendToServer(b"Mary had a little lamb")
3392
3393
3394class CmsgMacroTests(unittest.TestCase):
3395    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
3396    # assumptions used by sendmsg() and recvmsg[_into](), which share
3397    # code with these functions.
3398
3399    # Match the definition in socketmodule.c
3400    try:
3401        import _testcapi
3402    except ImportError:
3403        socklen_t_limit = 0x7fffffff
3404    else:
3405        socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
3406
3407    @requireAttrs(socket, "CMSG_LEN")
3408    def testCMSG_LEN(self):
3409        # Test CMSG_LEN() with various valid and invalid values,
3410        # checking the assumptions used by recvmsg() and sendmsg().
3411        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
3412        values = list(range(257)) + list(range(toobig - 257, toobig))
3413
3414        # struct cmsghdr has at least three members, two of which are ints
3415        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
3416        for n in values:
3417            ret = socket.CMSG_LEN(n)
3418            # This is how recvmsg() calculates the data size
3419            self.assertEqual(ret - socket.CMSG_LEN(0), n)
3420            self.assertLessEqual(ret, self.socklen_t_limit)
3421
3422        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
3423        # sendmsg() shares code with these functions, and requires
3424        # that it reject values over the limit.
3425        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
3426        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
3427
3428    @requireAttrs(socket, "CMSG_SPACE")
3429    def testCMSG_SPACE(self):
3430        # Test CMSG_SPACE() with various valid and invalid values,
3431        # checking the assumptions used by sendmsg().
3432        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
3433        values = list(range(257)) + list(range(toobig - 257, toobig))
3434
3435        last = socket.CMSG_SPACE(0)
3436        # struct cmsghdr has at least three members, two of which are ints
3437        self.assertGreater(last, array.array("i").itemsize * 2)
3438        for n in values:
3439            ret = socket.CMSG_SPACE(n)
3440            self.assertGreaterEqual(ret, last)
3441            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
3442            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
3443            self.assertLessEqual(ret, self.socklen_t_limit)
3444            last = ret
3445
3446        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
3447        # sendmsg() shares code with these functions, and requires
3448        # that it reject values over the limit.
3449        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
3450        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
3451
3452
3453class SCMRightsTest(SendrecvmsgServerTimeoutBase):
3454    # Tests for file descriptor passing on Unix-domain sockets.
3455
3456    # Invalid file descriptor value that's unlikely to evaluate to a
3457    # real FD even if one of its bytes is replaced with a different
3458    # value (which shouldn't actually happen).
3459    badfd = -0x5555
3460
3461    def newFDs(self, n):
3462        # Return a list of n file descriptors for newly-created files
3463        # containing their list indices as ASCII numbers.
3464        fds = []
3465        for i in range(n):
3466            fd, path = tempfile.mkstemp()
3467            self.addCleanup(os.unlink, path)
3468            self.addCleanup(os.close, fd)
3469            os.write(fd, str(i).encode())
3470            fds.append(fd)
3471        return fds
3472
3473    def checkFDs(self, fds):
3474        # Check that the file descriptors in the given list contain
3475        # their correct list indices as ASCII numbers.
3476        for n, fd in enumerate(fds):
3477            os.lseek(fd, 0, os.SEEK_SET)
3478            self.assertEqual(os.read(fd, 1024), str(n).encode())
3479
3480    def registerRecvmsgResult(self, result):
3481        self.addCleanup(self.closeRecvmsgFDs, result)
3482
3483    def closeRecvmsgFDs(self, recvmsg_result):
3484        # Close all file descriptors specified in the ancillary data
3485        # of the given return value from recvmsg() or recvmsg_into().
3486        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
3487            if (cmsg_level == socket.SOL_SOCKET and
3488                    cmsg_type == socket.SCM_RIGHTS):
3489                fds = array.array("i")
3490                fds.frombytes(cmsg_data[:
3491                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3492                for fd in fds:
3493                    os.close(fd)
3494
3495    def createAndSendFDs(self, n):
3496        # Send n new file descriptors created by newFDs() to the
3497        # server, with the constant MSG as the non-ancillary data.
3498        self.assertEqual(
3499            self.sendmsgToServer([MSG],
3500                                 [(socket.SOL_SOCKET,
3501                                   socket.SCM_RIGHTS,
3502                                   array.array("i", self.newFDs(n)))]),
3503            len(MSG))
3504
3505    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
3506        # Check that constant MSG was received with numfds file
3507        # descriptors in a maximum of maxcmsgs control messages (which
3508        # must contain only complete integers).  By default, check
3509        # that MSG_CTRUNC is unset, but ignore any flags in
3510        # ignoreflags.
3511        msg, ancdata, flags, addr = result
3512        self.assertEqual(msg, MSG)
3513        self.checkRecvmsgAddress(addr, self.cli_addr)
3514        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3515                        ignore=ignoreflags)
3516
3517        self.assertIsInstance(ancdata, list)
3518        self.assertLessEqual(len(ancdata), maxcmsgs)
3519        fds = array.array("i")
3520        for item in ancdata:
3521            self.assertIsInstance(item, tuple)
3522            cmsg_level, cmsg_type, cmsg_data = item
3523            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3524            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3525            self.assertIsInstance(cmsg_data, bytes)
3526            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
3527            fds.frombytes(cmsg_data)
3528
3529        self.assertEqual(len(fds), numfds)
3530        self.checkFDs(fds)
3531
3532    def testFDPassSimple(self):
3533        # Pass a single FD (array read from bytes object).
3534        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
3535                                               len(MSG), 10240))
3536
3537    def _testFDPassSimple(self):
3538        self.assertEqual(
3539            self.sendmsgToServer(
3540                [MSG],
3541                [(socket.SOL_SOCKET,
3542                  socket.SCM_RIGHTS,
3543                  array.array("i", self.newFDs(1)).tobytes())]),
3544            len(MSG))
3545
3546    def testMultipleFDPass(self):
3547        # Pass multiple FDs in a single array.
3548        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
3549                                               len(MSG), 10240))
3550
3551    def _testMultipleFDPass(self):
3552        self.createAndSendFDs(4)
3553
3554    @requireAttrs(socket, "CMSG_SPACE")
3555    def testFDPassCMSG_SPACE(self):
3556        # Test using CMSG_SPACE() to calculate ancillary buffer size.
3557        self.checkRecvmsgFDs(
3558            4, self.doRecvmsg(self.serv_sock, len(MSG),
3559                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
3560
3561    @testFDPassCMSG_SPACE.client_skip
3562    def _testFDPassCMSG_SPACE(self):
3563        self.createAndSendFDs(4)
3564
3565    def testFDPassCMSG_LEN(self):
3566        # Test using CMSG_LEN() to calculate ancillary buffer size.
3567        self.checkRecvmsgFDs(1,
3568                             self.doRecvmsg(self.serv_sock, len(MSG),
3569                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
3570                             # RFC 3542 says implementations may set
3571                             # MSG_CTRUNC if there isn't enough space
3572                             # for trailing padding.
3573                             ignoreflags=socket.MSG_CTRUNC)
3574
3575    def _testFDPassCMSG_LEN(self):
3576        self.createAndSendFDs(1)
3577
3578    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3579    @unittest.skipIf(AIX, "skipping, see issue #22397")
3580    @requireAttrs(socket, "CMSG_SPACE")
3581    def testFDPassSeparate(self):
3582        # Pass two FDs in two separate arrays.  Arrays may be combined
3583        # into a single control message by the OS.
3584        self.checkRecvmsgFDs(2,
3585                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
3586                             maxcmsgs=2)
3587
3588    @testFDPassSeparate.client_skip
3589    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3590    @unittest.skipIf(AIX, "skipping, see issue #22397")
3591    def _testFDPassSeparate(self):
3592        fd0, fd1 = self.newFDs(2)
3593        self.assertEqual(
3594            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3595                                          socket.SCM_RIGHTS,
3596                                          array.array("i", [fd0])),
3597                                         (socket.SOL_SOCKET,
3598                                          socket.SCM_RIGHTS,
3599                                          array.array("i", [fd1]))]),
3600            len(MSG))
3601
3602    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3603    @unittest.skipIf(AIX, "skipping, see issue #22397")
3604    @requireAttrs(socket, "CMSG_SPACE")
3605    def testFDPassSeparateMinSpace(self):
3606        # Pass two FDs in two separate arrays, receiving them into the
3607        # minimum space for two arrays.
3608        num_fds = 2
3609        self.checkRecvmsgFDs(num_fds,
3610                             self.doRecvmsg(self.serv_sock, len(MSG),
3611                                            socket.CMSG_SPACE(SIZEOF_INT) +
3612                                            socket.CMSG_LEN(SIZEOF_INT * num_fds)),
3613                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
3614
3615    @testFDPassSeparateMinSpace.client_skip
3616    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3617    @unittest.skipIf(AIX, "skipping, see issue #22397")
3618    def _testFDPassSeparateMinSpace(self):
3619        fd0, fd1 = self.newFDs(2)
3620        self.assertEqual(
3621            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3622                                          socket.SCM_RIGHTS,
3623                                          array.array("i", [fd0])),
3624                                         (socket.SOL_SOCKET,
3625                                          socket.SCM_RIGHTS,
3626                                          array.array("i", [fd1]))]),
3627            len(MSG))
3628
3629    def sendAncillaryIfPossible(self, msg, ancdata):
3630        # Try to send msg and ancdata to server, but if the system
3631        # call fails, just send msg with no ancillary data.
3632        try:
3633            nbytes = self.sendmsgToServer([msg], ancdata)
3634        except OSError as e:
3635            # Check that it was the system call that failed
3636            self.assertIsInstance(e.errno, int)
3637            nbytes = self.sendmsgToServer([msg])
3638        self.assertEqual(nbytes, len(msg))
3639
3640    @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
3641    def testFDPassEmpty(self):
3642        # Try to pass an empty FD array.  Can receive either no array
3643        # or an empty array.
3644        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
3645                                               len(MSG), 10240),
3646                             ignoreflags=socket.MSG_CTRUNC)
3647
3648    def _testFDPassEmpty(self):
3649        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
3650                                            socket.SCM_RIGHTS,
3651                                            b"")])
3652
3653    def testFDPassPartialInt(self):
3654        # Try to pass a truncated FD array.
3655        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3656                                                   len(MSG), 10240)
3657        self.assertEqual(msg, MSG)
3658        self.checkRecvmsgAddress(addr, self.cli_addr)
3659        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3660        self.assertLessEqual(len(ancdata), 1)
3661        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3662            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3663            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3664            self.assertLess(len(cmsg_data), SIZEOF_INT)
3665
3666    def _testFDPassPartialInt(self):
3667        self.sendAncillaryIfPossible(
3668            MSG,
3669            [(socket.SOL_SOCKET,
3670              socket.SCM_RIGHTS,
3671              array.array("i", [self.badfd]).tobytes()[:-1])])
3672
3673    @requireAttrs(socket, "CMSG_SPACE")
3674    def testFDPassPartialIntInMiddle(self):
3675        # Try to pass two FD arrays, the first of which is truncated.
3676        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3677                                                   len(MSG), 10240)
3678        self.assertEqual(msg, MSG)
3679        self.checkRecvmsgAddress(addr, self.cli_addr)
3680        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3681        self.assertLessEqual(len(ancdata), 2)
3682        fds = array.array("i")
3683        # Arrays may have been combined in a single control message
3684        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3685            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3686            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3687            fds.frombytes(cmsg_data[:
3688                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3689        self.assertLessEqual(len(fds), 2)
3690        self.checkFDs(fds)
3691
3692    @testFDPassPartialIntInMiddle.client_skip
3693    def _testFDPassPartialIntInMiddle(self):
3694        fd0, fd1 = self.newFDs(2)
3695        self.sendAncillaryIfPossible(
3696            MSG,
3697            [(socket.SOL_SOCKET,
3698              socket.SCM_RIGHTS,
3699              array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
3700             (socket.SOL_SOCKET,
3701              socket.SCM_RIGHTS,
3702              array.array("i", [fd1]))])
3703
3704    def checkTruncatedHeader(self, result, ignoreflags=0):
3705        # Check that no ancillary data items are returned when data is
3706        # truncated inside the cmsghdr structure.
3707        msg, ancdata, flags, addr = result
3708        self.assertEqual(msg, MSG)
3709        self.checkRecvmsgAddress(addr, self.cli_addr)
3710        self.assertEqual(ancdata, [])
3711        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3712                        ignore=ignoreflags)
3713
3714    def testCmsgTruncNoBufSize(self):
3715        # Check that no ancillary data is received when no buffer size
3716        # is specified.
3717        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
3718                                  # BSD seems to set MSG_CTRUNC only
3719                                  # if an item has been partially
3720                                  # received.
3721                                  ignoreflags=socket.MSG_CTRUNC)
3722
3723    def _testCmsgTruncNoBufSize(self):
3724        self.createAndSendFDs(1)
3725
3726    def testCmsgTrunc0(self):
3727        # Check that no ancillary data is received when buffer size is 0.
3728        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
3729                                  ignoreflags=socket.MSG_CTRUNC)
3730
3731    def _testCmsgTrunc0(self):
3732        self.createAndSendFDs(1)
3733
3734    # Check that no ancillary data is returned for various non-zero
3735    # (but still too small) buffer sizes.
3736
3737    def testCmsgTrunc1(self):
3738        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
3739
3740    def _testCmsgTrunc1(self):
3741        self.createAndSendFDs(1)
3742
3743    def testCmsgTrunc2Int(self):
3744        # The cmsghdr structure has at least three members, two of
3745        # which are ints, so we still shouldn't see any ancillary
3746        # data.
3747        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3748                                                 SIZEOF_INT * 2))
3749
3750    def _testCmsgTrunc2Int(self):
3751        self.createAndSendFDs(1)
3752
3753    def testCmsgTruncLen0Minus1(self):
3754        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3755                                                 socket.CMSG_LEN(0) - 1))
3756
3757    def _testCmsgTruncLen0Minus1(self):
3758        self.createAndSendFDs(1)
3759
3760    # The following tests try to truncate the control message in the
3761    # middle of the FD array.
3762
3763    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3764        # Check that file descriptor data is truncated to between
3765        # mindata and maxdata bytes when received with buffer size
3766        # ancbuf, and that any complete file descriptor numbers are
3767        # valid.
3768        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3769                                                   len(MSG), ancbuf)
3770        self.assertEqual(msg, MSG)
3771        self.checkRecvmsgAddress(addr, self.cli_addr)
3772        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3773
3774        if mindata == 0 and ancdata == []:
3775            return
3776        self.assertEqual(len(ancdata), 1)
3777        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3778        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3779        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3780        self.assertGreaterEqual(len(cmsg_data), mindata)
3781        self.assertLessEqual(len(cmsg_data), maxdata)
3782        fds = array.array("i")
3783        fds.frombytes(cmsg_data[:
3784                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3785        self.checkFDs(fds)
3786
3787    def testCmsgTruncLen0(self):
3788        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3789
3790    def _testCmsgTruncLen0(self):
3791        self.createAndSendFDs(1)
3792
3793    def testCmsgTruncLen0Plus1(self):
3794        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3795
3796    def _testCmsgTruncLen0Plus1(self):
3797        self.createAndSendFDs(2)
3798
3799    def testCmsgTruncLen1(self):
3800        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3801                                 maxdata=SIZEOF_INT)
3802
3803    def _testCmsgTruncLen1(self):
3804        self.createAndSendFDs(2)
3805
3806    def testCmsgTruncLen2Minus1(self):
3807        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3808                                 maxdata=(2 * SIZEOF_INT) - 1)
3809
3810    def _testCmsgTruncLen2Minus1(self):
3811        self.createAndSendFDs(2)
3812
3813
3814class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3815    # Test sendmsg() and recvmsg[_into]() using the ancillary data
3816    # features of the RFC 3542 Advanced Sockets API for IPv6.
3817    # Currently we can only handle certain data items (e.g. traffic
3818    # class, hop limit, MTU discovery and fragmentation settings)
3819    # without resorting to unportable means such as the struct module,
3820    # but the tests here are aimed at testing the ancillary data
3821    # handling in sendmsg() and recvmsg() rather than the IPv6 API
3822    # itself.
3823
3824    # Test value to use when setting hop limit of packet
3825    hop_limit = 2
3826
3827    # Test value to use when setting traffic class of packet.
3828    # -1 means "use kernel default".
3829    traffic_class = -1
3830
3831    def ancillaryMapping(self, ancdata):
3832        # Given ancillary data list ancdata, return a mapping from
3833        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3834        # Check that no (level, type) pair appears more than once.
3835        d = {}
3836        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3837            self.assertNotIn((cmsg_level, cmsg_type), d)
3838            d[(cmsg_level, cmsg_type)] = cmsg_data
3839        return d
3840
3841    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3842        # Receive hop limit into ancbufsize bytes of ancillary data
3843        # space.  Check that data is MSG, ancillary data is not
3844        # truncated (but ignore any flags in ignoreflags), and hop
3845        # limit is between 0 and maxhop inclusive.
3846        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3847                                  socket.IPV6_RECVHOPLIMIT, 1)
3848        self.misc_event.set()
3849        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3850                                                   len(MSG), ancbufsize)
3851
3852        self.assertEqual(msg, MSG)
3853        self.checkRecvmsgAddress(addr, self.cli_addr)
3854        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3855                        ignore=ignoreflags)
3856
3857        self.assertEqual(len(ancdata), 1)
3858        self.assertIsInstance(ancdata[0], tuple)
3859        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3860        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3861        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3862        self.assertIsInstance(cmsg_data, bytes)
3863        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3864        a = array.array("i")
3865        a.frombytes(cmsg_data)
3866        self.assertGreaterEqual(a[0], 0)
3867        self.assertLessEqual(a[0], maxhop)
3868
3869    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3870    def testRecvHopLimit(self):
3871        # Test receiving the packet hop limit as ancillary data.
3872        self.checkHopLimit(ancbufsize=10240)
3873
3874    @testRecvHopLimit.client_skip
3875    def _testRecvHopLimit(self):
3876        # Need to wait until server has asked to receive ancillary
3877        # data, as implementations are not required to buffer it
3878        # otherwise.
3879        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3880        self.sendToServer(MSG)
3881
3882    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3883    def testRecvHopLimitCMSG_SPACE(self):
3884        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3885        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3886
3887    @testRecvHopLimitCMSG_SPACE.client_skip
3888    def _testRecvHopLimitCMSG_SPACE(self):
3889        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3890        self.sendToServer(MSG)
3891
3892    # Could test receiving into buffer sized using CMSG_LEN, but RFC
3893    # 3542 says portable applications must provide space for trailing
3894    # padding.  Implementations may set MSG_CTRUNC if there isn't
3895    # enough space for the padding.
3896
3897    @requireAttrs(socket.socket, "sendmsg")
3898    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3899    def testSetHopLimit(self):
3900        # Test setting hop limit on outgoing packet and receiving it
3901        # at the other end.
3902        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3903
3904    @testSetHopLimit.client_skip
3905    def _testSetHopLimit(self):
3906        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3907        self.assertEqual(
3908            self.sendmsgToServer([MSG],
3909                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3910                                   array.array("i", [self.hop_limit]))]),
3911            len(MSG))
3912
3913    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3914                                     ignoreflags=0):
3915        # Receive traffic class and hop limit into ancbufsize bytes of
3916        # ancillary data space.  Check that data is MSG, ancillary
3917        # data is not truncated (but ignore any flags in ignoreflags),
3918        # and traffic class and hop limit are in range (hop limit no
3919        # more than maxhop).
3920        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3921                                  socket.IPV6_RECVHOPLIMIT, 1)
3922        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3923                                  socket.IPV6_RECVTCLASS, 1)
3924        self.misc_event.set()
3925        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3926                                                   len(MSG), ancbufsize)
3927
3928        self.assertEqual(msg, MSG)
3929        self.checkRecvmsgAddress(addr, self.cli_addr)
3930        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3931                        ignore=ignoreflags)
3932        self.assertEqual(len(ancdata), 2)
3933        ancmap = self.ancillaryMapping(ancdata)
3934
3935        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3936        self.assertEqual(len(tcdata), SIZEOF_INT)
3937        a = array.array("i")
3938        a.frombytes(tcdata)
3939        self.assertGreaterEqual(a[0], 0)
3940        self.assertLessEqual(a[0], 255)
3941
3942        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3943        self.assertEqual(len(hldata), SIZEOF_INT)
3944        a = array.array("i")
3945        a.frombytes(hldata)
3946        self.assertGreaterEqual(a[0], 0)
3947        self.assertLessEqual(a[0], maxhop)
3948
3949    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3950                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3951    def testRecvTrafficClassAndHopLimit(self):
3952        # Test receiving traffic class and hop limit as ancillary data.
3953        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3954
3955    @testRecvTrafficClassAndHopLimit.client_skip
3956    def _testRecvTrafficClassAndHopLimit(self):
3957        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3958        self.sendToServer(MSG)
3959
3960    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3961                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3962    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3963        # Test receiving traffic class and hop limit, using
3964        # CMSG_SPACE() to calculate buffer size.
3965        self.checkTrafficClassAndHopLimit(
3966            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3967
3968    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3969    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3970        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3971        self.sendToServer(MSG)
3972
3973    @requireAttrs(socket.socket, "sendmsg")
3974    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3975                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3976    def testSetTrafficClassAndHopLimit(self):
3977        # Test setting traffic class and hop limit on outgoing packet,
3978        # and receiving them at the other end.
3979        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3980                                          maxhop=self.hop_limit)
3981
3982    @testSetTrafficClassAndHopLimit.client_skip
3983    def _testSetTrafficClassAndHopLimit(self):
3984        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3985        self.assertEqual(
3986            self.sendmsgToServer([MSG],
3987                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3988                                   array.array("i", [self.traffic_class])),
3989                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3990                                   array.array("i", [self.hop_limit]))]),
3991            len(MSG))
3992
3993    @requireAttrs(socket.socket, "sendmsg")
3994    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3995                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3996    def testOddCmsgSize(self):
3997        # Try to send ancillary data with first item one byte too
3998        # long.  Fall back to sending with correct size if this fails,
3999        # and check that second item was handled correctly.
4000        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
4001                                          maxhop=self.hop_limit)
4002
4003    @testOddCmsgSize.client_skip
4004    def _testOddCmsgSize(self):
4005        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4006        try:
4007            nbytes = self.sendmsgToServer(
4008                [MSG],
4009                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
4010                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
4011                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
4012                  array.array("i", [self.hop_limit]))])
4013        except OSError as e:
4014            self.assertIsInstance(e.errno, int)
4015            nbytes = self.sendmsgToServer(
4016                [MSG],
4017                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
4018                  array.array("i", [self.traffic_class])),
4019                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
4020                  array.array("i", [self.hop_limit]))])
4021            self.assertEqual(nbytes, len(MSG))
4022
4023    # Tests for proper handling of truncated ancillary data
4024
4025    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
4026        # Receive hop limit into ancbufsize bytes of ancillary data
4027        # space, which should be too small to contain the ancillary
4028        # data header (if ancbufsize is None, pass no second argument
4029        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
4030        # (unless included in ignoreflags), and no ancillary data is
4031        # returned.
4032        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4033                                  socket.IPV6_RECVHOPLIMIT, 1)
4034        self.misc_event.set()
4035        args = () if ancbufsize is None else (ancbufsize,)
4036        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
4037                                                   len(MSG), *args)
4038
4039        self.assertEqual(msg, MSG)
4040        self.checkRecvmsgAddress(addr, self.cli_addr)
4041        self.assertEqual(ancdata, [])
4042        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
4043                        ignore=ignoreflags)
4044
4045    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4046    def testCmsgTruncNoBufSize(self):
4047        # Check that no ancillary data is received when no ancillary
4048        # buffer size is provided.
4049        self.checkHopLimitTruncatedHeader(ancbufsize=None,
4050                                          # BSD seems to set
4051                                          # MSG_CTRUNC only if an item
4052                                          # has been partially
4053                                          # received.
4054                                          ignoreflags=socket.MSG_CTRUNC)
4055
4056    @testCmsgTruncNoBufSize.client_skip
4057    def _testCmsgTruncNoBufSize(self):
4058        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4059        self.sendToServer(MSG)
4060
4061    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4062    def testSingleCmsgTrunc0(self):
4063        # Check that no ancillary data is received when ancillary
4064        # buffer size is zero.
4065        self.checkHopLimitTruncatedHeader(ancbufsize=0,
4066                                          ignoreflags=socket.MSG_CTRUNC)
4067
4068    @testSingleCmsgTrunc0.client_skip
4069    def _testSingleCmsgTrunc0(self):
4070        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4071        self.sendToServer(MSG)
4072
4073    # Check that no ancillary data is returned for various non-zero
4074    # (but still too small) buffer sizes.
4075
4076    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4077    def testSingleCmsgTrunc1(self):
4078        self.checkHopLimitTruncatedHeader(ancbufsize=1)
4079
4080    @testSingleCmsgTrunc1.client_skip
4081    def _testSingleCmsgTrunc1(self):
4082        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4083        self.sendToServer(MSG)
4084
4085    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4086    def testSingleCmsgTrunc2Int(self):
4087        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
4088
4089    @testSingleCmsgTrunc2Int.client_skip
4090    def _testSingleCmsgTrunc2Int(self):
4091        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4092        self.sendToServer(MSG)
4093
4094    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4095    def testSingleCmsgTruncLen0Minus1(self):
4096        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
4097
4098    @testSingleCmsgTruncLen0Minus1.client_skip
4099    def _testSingleCmsgTruncLen0Minus1(self):
4100        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4101        self.sendToServer(MSG)
4102
4103    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4104    def testSingleCmsgTruncInData(self):
4105        # Test truncation of a control message inside its associated
4106        # data.  The message may be returned with its data truncated,
4107        # or not returned at all.
4108        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4109                                  socket.IPV6_RECVHOPLIMIT, 1)
4110        self.misc_event.set()
4111        msg, ancdata, flags, addr = self.doRecvmsg(
4112            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
4113
4114        self.assertEqual(msg, MSG)
4115        self.checkRecvmsgAddress(addr, self.cli_addr)
4116        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4117
4118        self.assertLessEqual(len(ancdata), 1)
4119        if ancdata:
4120            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4121            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4122            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
4123            self.assertLess(len(cmsg_data), SIZEOF_INT)
4124
4125    @testSingleCmsgTruncInData.client_skip
4126    def _testSingleCmsgTruncInData(self):
4127        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4128        self.sendToServer(MSG)
4129
4130    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
4131        # Receive traffic class and hop limit into ancbufsize bytes of
4132        # ancillary data space, which should be large enough to
4133        # contain the first item, but too small to contain the header
4134        # of the second.  Check that data is MSG, MSG_CTRUNC is set
4135        # (unless included in ignoreflags), and only one ancillary
4136        # data item is returned.
4137        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4138                                  socket.IPV6_RECVHOPLIMIT, 1)
4139        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4140                                  socket.IPV6_RECVTCLASS, 1)
4141        self.misc_event.set()
4142        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
4143                                                   len(MSG), ancbufsize)
4144
4145        self.assertEqual(msg, MSG)
4146        self.checkRecvmsgAddress(addr, self.cli_addr)
4147        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
4148                        ignore=ignoreflags)
4149
4150        self.assertEqual(len(ancdata), 1)
4151        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4152        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4153        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
4154        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4155        a = array.array("i")
4156        a.frombytes(cmsg_data)
4157        self.assertGreaterEqual(a[0], 0)
4158        self.assertLessEqual(a[0], 255)
4159
4160    # Try the above test with various buffer sizes.
4161
4162    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4163                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4164    def testSecondCmsgTrunc0(self):
4165        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
4166                                        ignoreflags=socket.MSG_CTRUNC)
4167
4168    @testSecondCmsgTrunc0.client_skip
4169    def _testSecondCmsgTrunc0(self):
4170        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4171        self.sendToServer(MSG)
4172
4173    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4174                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4175    def testSecondCmsgTrunc1(self):
4176        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
4177
4178    @testSecondCmsgTrunc1.client_skip
4179    def _testSecondCmsgTrunc1(self):
4180        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4181        self.sendToServer(MSG)
4182
4183    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4184                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4185    def testSecondCmsgTrunc2Int(self):
4186        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4187                                        2 * SIZEOF_INT)
4188
4189    @testSecondCmsgTrunc2Int.client_skip
4190    def _testSecondCmsgTrunc2Int(self):
4191        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4192        self.sendToServer(MSG)
4193
4194    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4195                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4196    def testSecondCmsgTruncLen0Minus1(self):
4197        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4198                                        socket.CMSG_LEN(0) - 1)
4199
4200    @testSecondCmsgTruncLen0Minus1.client_skip
4201    def _testSecondCmsgTruncLen0Minus1(self):
4202        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4203        self.sendToServer(MSG)
4204
4205    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4206                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4207    def testSecondCmsgTruncInData(self):
4208        # Test truncation of the second of two control messages inside
4209        # its associated data.
4210        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4211                                  socket.IPV6_RECVHOPLIMIT, 1)
4212        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4213                                  socket.IPV6_RECVTCLASS, 1)
4214        self.misc_event.set()
4215        msg, ancdata, flags, addr = self.doRecvmsg(
4216            self.serv_sock, len(MSG),
4217            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
4218
4219        self.assertEqual(msg, MSG)
4220        self.checkRecvmsgAddress(addr, self.cli_addr)
4221        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4222
4223        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
4224
4225        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4226        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4227        cmsg_types.remove(cmsg_type)
4228        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4229        a = array.array("i")
4230        a.frombytes(cmsg_data)
4231        self.assertGreaterEqual(a[0], 0)
4232        self.assertLessEqual(a[0], 255)
4233
4234        if ancdata:
4235            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4236            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4237            cmsg_types.remove(cmsg_type)
4238            self.assertLess(len(cmsg_data), SIZEOF_INT)
4239
4240        self.assertEqual(ancdata, [])
4241
4242    @testSecondCmsgTruncInData.client_skip
4243    def _testSecondCmsgTruncInData(self):
4244        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4245        self.sendToServer(MSG)
4246
4247
4248# Derive concrete test classes for different socket types.
4249
4250class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
4251                             SendrecvmsgConnectionlessBase,
4252                             ThreadedSocketTestMixin, UDPTestBase):
4253    pass
4254
4255@requireAttrs(socket.socket, "sendmsg")
4256class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
4257    pass
4258
4259@requireAttrs(socket.socket, "recvmsg")
4260class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
4261    pass
4262
4263@requireAttrs(socket.socket, "recvmsg_into")
4264class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
4265    pass
4266
4267
4268class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
4269                              SendrecvmsgConnectionlessBase,
4270                              ThreadedSocketTestMixin, UDP6TestBase):
4271
4272    def checkRecvmsgAddress(self, addr1, addr2):
4273        # Called to compare the received address with the address of
4274        # the peer, ignoring scope ID
4275        self.assertEqual(addr1[:-1], addr2[:-1])
4276
4277@requireAttrs(socket.socket, "sendmsg")
4278@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4279@requireSocket("AF_INET6", "SOCK_DGRAM")
4280class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
4281    pass
4282
4283@requireAttrs(socket.socket, "recvmsg")
4284@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4285@requireSocket("AF_INET6", "SOCK_DGRAM")
4286class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
4287    pass
4288
4289@requireAttrs(socket.socket, "recvmsg_into")
4290@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4291@requireSocket("AF_INET6", "SOCK_DGRAM")
4292class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
4293    pass
4294
4295@requireAttrs(socket.socket, "recvmsg")
4296@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4297@requireAttrs(socket, "IPPROTO_IPV6")
4298@requireSocket("AF_INET6", "SOCK_DGRAM")
4299class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
4300                                      SendrecvmsgUDP6TestBase):
4301    pass
4302
4303@requireAttrs(socket.socket, "recvmsg_into")
4304@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4305@requireAttrs(socket, "IPPROTO_IPV6")
4306@requireSocket("AF_INET6", "SOCK_DGRAM")
4307class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
4308                                          RFC3542AncillaryTest,
4309                                          SendrecvmsgUDP6TestBase):
4310    pass
4311
4312
4313@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4314          'UDPLITE sockets required for this test.')
4315class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase,
4316                             SendrecvmsgConnectionlessBase,
4317                             ThreadedSocketTestMixin, UDPLITETestBase):
4318    pass
4319
4320@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4321          'UDPLITE sockets required for this test.')
4322@requireAttrs(socket.socket, "sendmsg")
4323class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase):
4324    pass
4325
4326@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4327          'UDPLITE sockets required for this test.')
4328@requireAttrs(socket.socket, "recvmsg")
4329class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase):
4330    pass
4331
4332@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4333          'UDPLITE sockets required for this test.')
4334@requireAttrs(socket.socket, "recvmsg_into")
4335class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase):
4336    pass
4337
4338
4339@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4340          'UDPLITE sockets required for this test.')
4341class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase,
4342                              SendrecvmsgConnectionlessBase,
4343                              ThreadedSocketTestMixin, UDPLITE6TestBase):
4344
4345    def checkRecvmsgAddress(self, addr1, addr2):
4346        # Called to compare the received address with the address of
4347        # the peer, ignoring scope ID
4348        self.assertEqual(addr1[:-1], addr2[:-1])
4349
4350@requireAttrs(socket.socket, "sendmsg")
4351@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4352@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4353          'UDPLITE sockets required for this test.')
4354@requireSocket("AF_INET6", "SOCK_DGRAM")
4355class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase):
4356    pass
4357
4358@requireAttrs(socket.socket, "recvmsg")
4359@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4360@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4361          'UDPLITE sockets required for this test.')
4362@requireSocket("AF_INET6", "SOCK_DGRAM")
4363class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase):
4364    pass
4365
4366@requireAttrs(socket.socket, "recvmsg_into")
4367@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4368@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4369          'UDPLITE sockets required for this test.')
4370@requireSocket("AF_INET6", "SOCK_DGRAM")
4371class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase):
4372    pass
4373
4374@requireAttrs(socket.socket, "recvmsg")
4375@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4376@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4377          'UDPLITE sockets required for this test.')
4378@requireAttrs(socket, "IPPROTO_IPV6")
4379@requireSocket("AF_INET6", "SOCK_DGRAM")
4380class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest,
4381                                      SendrecvmsgUDPLITE6TestBase):
4382    pass
4383
4384@requireAttrs(socket.socket, "recvmsg_into")
4385@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4386@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4387          'UDPLITE sockets required for this test.')
4388@requireAttrs(socket, "IPPROTO_IPV6")
4389@requireSocket("AF_INET6", "SOCK_DGRAM")
4390class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin,
4391                                          RFC3542AncillaryTest,
4392                                          SendrecvmsgUDPLITE6TestBase):
4393    pass
4394
4395
4396class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
4397                             ConnectedStreamTestMixin, TCPTestBase):
4398    pass
4399
4400@requireAttrs(socket.socket, "sendmsg")
4401class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
4402    pass
4403
4404@requireAttrs(socket.socket, "recvmsg")
4405class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
4406                     SendrecvmsgTCPTestBase):
4407    pass
4408
4409@requireAttrs(socket.socket, "recvmsg_into")
4410class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4411                         SendrecvmsgTCPTestBase):
4412    pass
4413
4414
4415class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
4416                                    SendrecvmsgConnectedBase,
4417                                    ConnectedStreamTestMixin, SCTPStreamBase):
4418    pass
4419
4420@requireAttrs(socket.socket, "sendmsg")
4421@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4422@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4423class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
4424    pass
4425
4426@requireAttrs(socket.socket, "recvmsg")
4427@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4428@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4429class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4430                            SendrecvmsgSCTPStreamTestBase):
4431
4432    def testRecvmsgEOF(self):
4433        try:
4434            super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
4435        except OSError as e:
4436            if e.errno != errno.ENOTCONN:
4437                raise
4438            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4439
4440@requireAttrs(socket.socket, "recvmsg_into")
4441@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4442@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4443class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4444                                SendrecvmsgSCTPStreamTestBase):
4445
4446    def testRecvmsgEOF(self):
4447        try:
4448            super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
4449        except OSError as e:
4450            if e.errno != errno.ENOTCONN:
4451                raise
4452            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4453
4454
4455class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
4456                                    ConnectedStreamTestMixin, UnixStreamBase):
4457    pass
4458
4459@requireAttrs(socket.socket, "sendmsg")
4460@requireAttrs(socket, "AF_UNIX")
4461class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
4462    pass
4463
4464@requireAttrs(socket.socket, "recvmsg")
4465@requireAttrs(socket, "AF_UNIX")
4466class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4467                            SendrecvmsgUnixStreamTestBase):
4468    pass
4469
4470@requireAttrs(socket.socket, "recvmsg_into")
4471@requireAttrs(socket, "AF_UNIX")
4472class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4473                                SendrecvmsgUnixStreamTestBase):
4474    pass
4475
4476@requireAttrs(socket.socket, "sendmsg", "recvmsg")
4477@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4478class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
4479    pass
4480
4481@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
4482@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4483class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
4484                                     SendrecvmsgUnixStreamTestBase):
4485    pass
4486
4487
4488# Test interrupting the interruptible send/receive methods with a
4489# signal when a timeout is set.  These tests avoid having multiple
4490# threads alive during the test so that the OS cannot deliver the
4491# signal to the wrong one.
4492
4493class InterruptedTimeoutBase:
4494    # Base class for interrupted send/receive tests.  Installs an
4495    # empty handler for SIGALRM and removes it on teardown, along with
4496    # any scheduled alarms.
4497
4498    def setUp(self):
4499        super().setUp()
4500        orig_alrm_handler = signal.signal(signal.SIGALRM,
4501                                          lambda signum, frame: 1 / 0)
4502        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
4503
4504    # Timeout for socket operations
4505    timeout = support.LOOPBACK_TIMEOUT
4506
4507    # Provide setAlarm() method to schedule delivery of SIGALRM after
4508    # given number of seconds, or cancel it if zero, and an
4509    # appropriate time value to use.  Use setitimer() if available.
4510    if hasattr(signal, "setitimer"):
4511        alarm_time = 0.05
4512
4513        def setAlarm(self, seconds):
4514            signal.setitimer(signal.ITIMER_REAL, seconds)
4515    else:
4516        # Old systems may deliver the alarm up to one second early
4517        alarm_time = 2
4518
4519        def setAlarm(self, seconds):
4520            signal.alarm(seconds)
4521
4522
4523# Require siginterrupt() in order to ensure that system calls are
4524# interrupted by default.
4525@requireAttrs(signal, "siginterrupt")
4526@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4527                     "Don't have signal.alarm or signal.setitimer")
4528class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
4529    # Test interrupting the recv*() methods with signals when a
4530    # timeout is set.
4531
4532    def setUp(self):
4533        super().setUp()
4534        self.serv.settimeout(self.timeout)
4535
4536    def checkInterruptedRecv(self, func, *args, **kwargs):
4537        # Check that func(*args, **kwargs) raises
4538        # errno of EINTR when interrupted by a signal.
4539        try:
4540            self.setAlarm(self.alarm_time)
4541            with self.assertRaises(ZeroDivisionError) as cm:
4542                func(*args, **kwargs)
4543        finally:
4544            self.setAlarm(0)
4545
4546    def testInterruptedRecvTimeout(self):
4547        self.checkInterruptedRecv(self.serv.recv, 1024)
4548
4549    def testInterruptedRecvIntoTimeout(self):
4550        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
4551
4552    def testInterruptedRecvfromTimeout(self):
4553        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
4554
4555    def testInterruptedRecvfromIntoTimeout(self):
4556        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
4557
4558    @requireAttrs(socket.socket, "recvmsg")
4559    def testInterruptedRecvmsgTimeout(self):
4560        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
4561
4562    @requireAttrs(socket.socket, "recvmsg_into")
4563    def testInterruptedRecvmsgIntoTimeout(self):
4564        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
4565
4566
4567# Require siginterrupt() in order to ensure that system calls are
4568# interrupted by default.
4569@requireAttrs(signal, "siginterrupt")
4570@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4571                     "Don't have signal.alarm or signal.setitimer")
4572class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
4573                                 ThreadSafeCleanupTestCase,
4574                                 SocketListeningTestMixin, TCPTestBase):
4575    # Test interrupting the interruptible send*() methods with signals
4576    # when a timeout is set.
4577
4578    def setUp(self):
4579        super().setUp()
4580        self.serv_conn = self.newSocket()
4581        self.addCleanup(self.serv_conn.close)
4582        # Use a thread to complete the connection, but wait for it to
4583        # terminate before running the test, so that there is only one
4584        # thread to accept the signal.
4585        cli_thread = threading.Thread(target=self.doConnect)
4586        cli_thread.start()
4587        self.cli_conn, addr = self.serv.accept()
4588        self.addCleanup(self.cli_conn.close)
4589        cli_thread.join()
4590        self.serv_conn.settimeout(self.timeout)
4591
4592    def doConnect(self):
4593        self.serv_conn.connect(self.serv_addr)
4594
4595    def checkInterruptedSend(self, func, *args, **kwargs):
4596        # Check that func(*args, **kwargs), run in a loop, raises
4597        # OSError with an errno of EINTR when interrupted by a
4598        # signal.
4599        try:
4600            with self.assertRaises(ZeroDivisionError) as cm:
4601                while True:
4602                    self.setAlarm(self.alarm_time)
4603                    func(*args, **kwargs)
4604        finally:
4605            self.setAlarm(0)
4606
4607    # Issue #12958: The following tests have problems on OS X prior to 10.7
4608    @support.requires_mac_ver(10, 7)
4609    def testInterruptedSendTimeout(self):
4610        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
4611
4612    @support.requires_mac_ver(10, 7)
4613    def testInterruptedSendtoTimeout(self):
4614        # Passing an actual address here as Python's wrapper for
4615        # sendto() doesn't allow passing a zero-length one; POSIX
4616        # requires that the address is ignored since the socket is
4617        # connection-mode, however.
4618        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
4619                                  self.serv_addr)
4620
4621    @support.requires_mac_ver(10, 7)
4622    @requireAttrs(socket.socket, "sendmsg")
4623    def testInterruptedSendmsgTimeout(self):
4624        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
4625
4626
4627class TCPCloserTest(ThreadedTCPSocketTest):
4628
4629    def testClose(self):
4630        conn, addr = self.serv.accept()
4631        conn.close()
4632
4633        sd = self.cli
4634        read, write, err = select.select([sd], [], [], 1.0)
4635        self.assertEqual(read, [sd])
4636        self.assertEqual(sd.recv(1), b'')
4637
4638        # Calling close() many times should be safe.
4639        conn.close()
4640        conn.close()
4641
4642    def _testClose(self):
4643        self.cli.connect((HOST, self.port))
4644        time.sleep(1.0)
4645
4646
4647class BasicSocketPairTest(SocketPairTest):
4648
4649    def __init__(self, methodName='runTest'):
4650        SocketPairTest.__init__(self, methodName=methodName)
4651
4652    def _check_defaults(self, sock):
4653        self.assertIsInstance(sock, socket.socket)
4654        if hasattr(socket, 'AF_UNIX'):
4655            self.assertEqual(sock.family, socket.AF_UNIX)
4656        else:
4657            self.assertEqual(sock.family, socket.AF_INET)
4658        self.assertEqual(sock.type, socket.SOCK_STREAM)
4659        self.assertEqual(sock.proto, 0)
4660
4661    def _testDefaults(self):
4662        self._check_defaults(self.cli)
4663
4664    def testDefaults(self):
4665        self._check_defaults(self.serv)
4666
4667    def testRecv(self):
4668        msg = self.serv.recv(1024)
4669        self.assertEqual(msg, MSG)
4670
4671    def _testRecv(self):
4672        self.cli.send(MSG)
4673
4674    def testSend(self):
4675        self.serv.send(MSG)
4676
4677    def _testSend(self):
4678        msg = self.cli.recv(1024)
4679        self.assertEqual(msg, MSG)
4680
4681
4682class PurePythonSocketPairTest(SocketPairTest):
4683
4684    # Explicitly use socketpair AF_INET or AF_INET6 to ensure that is the
4685    # code path we're using regardless platform is the pure python one where
4686    # `_socket.socketpair` does not exist.  (AF_INET does not work with
4687    # _socket.socketpair on many platforms).
4688    def socketpair(self):
4689        # called by super().setUp().
4690        try:
4691            return socket.socketpair(socket.AF_INET6)
4692        except OSError:
4693            return socket.socketpair(socket.AF_INET)
4694    # Local imports in this class make for easy security fix backporting.
4695
4696    def setUp(self):
4697        if hasattr(_socket, "socketpair"):
4698            self._orig_sp = socket.socketpair
4699            # This forces the version using the non-OS provided socketpair
4700            # emulation via an AF_INET socket in Lib/socket.py.
4701            socket.socketpair = socket._fallback_socketpair
4702        else:
4703            # This platform already uses the non-OS provided version.
4704            # This platform already uses the non-OS provided version.
4705            self._orig_sp = None
4706        super().setUp()
4707
4708    def tearDown(self):
4709        super().tearDown()
4710
4711        if self._orig_sp is not None:
4712            # Restore the default socket.socketpair definition.
4713            socket.socketpair = self._orig_sp
4714
4715    def test_recv(self):
4716        msg = self.serv.recv(1024)
4717        self.assertEqual(msg, MSG)
4718    def _test_recv(self):
4719        self.cli.send(MSG)
4720    def test_send(self):
4721        self.serv.send(MSG)
4722    def _test_send(self):
4723        msg = self.cli.recv(1024)
4724        self.assertEqual(msg, MSG)
4725    def test_ipv4(self):
4726        cli, srv = socket.socketpair(socket.AF_INET)
4727        cli.close()
4728        srv.close()
4729    def _test_ipv4(self):
4730        pass
4731    @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
4732                     not hasattr(_socket, 'IPV6_V6ONLY'),
4733                     "IPV6_V6ONLY option not supported")
4734    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
4735    def test_ipv6(self):
4736        cli, srv = socket.socketpair(socket.AF_INET6)
4737        cli.close()
4738        srv.close()
4739    def _test_ipv6(self):
4740        pass
4741    def test_injected_authentication_failure(self):
4742        orig_getsockname = socket.socket.getsockname
4743        inject_sock = None
4744        def inject_getsocketname(self):
4745            nonlocal inject_sock
4746            sockname = orig_getsockname(self)
4747            # Connect to the listening socket ahead of the
4748            # client socket.
4749            if inject_sock is None:
4750                inject_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4751                inject_sock.setblocking(False)
4752                try:
4753                    inject_sock.connect(sockname[:2])
4754                except (BlockingIOError, InterruptedError):
4755                    pass
4756                inject_sock.setblocking(True)
4757            return sockname
4758        sock1 = sock2 = None
4759        try:
4760            socket.socket.getsockname = inject_getsocketname
4761            with self.assertRaises(OSError):
4762                sock1, sock2 = socket.socketpair()
4763        finally:
4764            socket.socket.getsockname = orig_getsockname
4765            if inject_sock:
4766                inject_sock.close()
4767            if sock1:  # This cleanup isn't needed on a successful test.
4768                sock1.close()
4769            if sock2:
4770                sock2.close()
4771    def _test_injected_authentication_failure(self):
4772        # No-op.  Exists for base class threading infrastructure to call.
4773        # We could refactor this test into its own lesser class along with the
4774        # setUp and tearDown code to construct an ideal; it is simpler to keep
4775        # it here and live with extra overhead one this _one_ failure test.
4776        pass
4777
4778
4779class NonBlockingTCPTests(ThreadedTCPSocketTest):
4780
4781    def __init__(self, methodName='runTest'):
4782        self.event = threading.Event()
4783        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
4784
4785    def assert_sock_timeout(self, sock, timeout):
4786        self.assertEqual(self.serv.gettimeout(), timeout)
4787
4788        blocking = (timeout != 0.0)
4789        self.assertEqual(sock.getblocking(), blocking)
4790
4791        if fcntl is not None:
4792            # When a Python socket has a non-zero timeout, it's switched
4793            # internally to a non-blocking mode. Later, sock.sendall(),
4794            # sock.recv(), and other socket operations use a select() call and
4795            # handle EWOULDBLOCK/EGAIN on all socket operations. That's how
4796            # timeouts are enforced.
4797            fd_blocking = (timeout is None)
4798
4799            flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK)
4800            self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking)
4801
4802    def testSetBlocking(self):
4803        # Test setblocking() and settimeout() methods
4804        self.serv.setblocking(True)
4805        self.assert_sock_timeout(self.serv, None)
4806
4807        self.serv.setblocking(False)
4808        self.assert_sock_timeout(self.serv, 0.0)
4809
4810        self.serv.settimeout(None)
4811        self.assert_sock_timeout(self.serv, None)
4812
4813        self.serv.settimeout(0)
4814        self.assert_sock_timeout(self.serv, 0)
4815
4816        self.serv.settimeout(10)
4817        self.assert_sock_timeout(self.serv, 10)
4818
4819        self.serv.settimeout(0)
4820        self.assert_sock_timeout(self.serv, 0)
4821
4822    def _testSetBlocking(self):
4823        pass
4824
4825    @support.cpython_only
4826    def testSetBlocking_overflow(self):
4827        # Issue 15989
4828        import _testcapi
4829        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
4830            self.skipTest('needs UINT_MAX < ULONG_MAX')
4831
4832        self.serv.setblocking(False)
4833        self.assertEqual(self.serv.gettimeout(), 0.0)
4834
4835        self.serv.setblocking(_testcapi.UINT_MAX + 1)
4836        self.assertIsNone(self.serv.gettimeout())
4837
4838    _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
4839
4840    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
4841                         'test needs socket.SOCK_NONBLOCK')
4842    @support.requires_linux_version(2, 6, 28)
4843    def testInitNonBlocking(self):
4844        # create a socket with SOCK_NONBLOCK
4845        self.serv.close()
4846        self.serv = socket.socket(socket.AF_INET,
4847                                  socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
4848        self.assert_sock_timeout(self.serv, 0)
4849
4850    def _testInitNonBlocking(self):
4851        pass
4852
4853    def testInheritFlagsBlocking(self):
4854        # bpo-7995: accept() on a listening socket with a timeout and the
4855        # default timeout is None, the resulting socket must be blocking.
4856        with socket_setdefaulttimeout(None):
4857            self.serv.settimeout(10)
4858            conn, addr = self.serv.accept()
4859            self.addCleanup(conn.close)
4860            self.assertIsNone(conn.gettimeout())
4861
4862    def _testInheritFlagsBlocking(self):
4863        self.cli.connect((HOST, self.port))
4864
4865    def testInheritFlagsTimeout(self):
4866        # bpo-7995: accept() on a listening socket with a timeout and the
4867        # default timeout is None, the resulting socket must inherit
4868        # the default timeout.
4869        default_timeout = 20.0
4870        with socket_setdefaulttimeout(default_timeout):
4871            self.serv.settimeout(10)
4872            conn, addr = self.serv.accept()
4873            self.addCleanup(conn.close)
4874            self.assertEqual(conn.gettimeout(), default_timeout)
4875
4876    def _testInheritFlagsTimeout(self):
4877        self.cli.connect((HOST, self.port))
4878
4879    def testAccept(self):
4880        # Testing non-blocking accept
4881        self.serv.setblocking(False)
4882
4883        # connect() didn't start: non-blocking accept() fails
4884        start_time = time.monotonic()
4885        with self.assertRaises(BlockingIOError):
4886            conn, addr = self.serv.accept()
4887        dt = time.monotonic() - start_time
4888        self.assertLess(dt, 1.0)
4889
4890        self.event.set()
4891
4892        read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT)
4893        if self.serv not in read:
4894            self.fail("Error trying to do accept after select.")
4895
4896        # connect() completed: non-blocking accept() doesn't block
4897        conn, addr = self.serv.accept()
4898        self.addCleanup(conn.close)
4899        self.assertIsNone(conn.gettimeout())
4900
4901    def _testAccept(self):
4902        # don't connect before event is set to check
4903        # that non-blocking accept() raises BlockingIOError
4904        self.event.wait()
4905
4906        self.cli.connect((HOST, self.port))
4907
4908    def testRecv(self):
4909        # Testing non-blocking recv
4910        conn, addr = self.serv.accept()
4911        self.addCleanup(conn.close)
4912        conn.setblocking(False)
4913
4914        # the server didn't send data yet: non-blocking recv() fails
4915        with self.assertRaises(BlockingIOError):
4916            msg = conn.recv(len(MSG))
4917
4918        self.event.set()
4919
4920        read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT)
4921        if conn not in read:
4922            self.fail("Error during select call to non-blocking socket.")
4923
4924        # the server sent data yet: non-blocking recv() doesn't block
4925        msg = conn.recv(len(MSG))
4926        self.assertEqual(msg, MSG)
4927
4928    def _testRecv(self):
4929        self.cli.connect((HOST, self.port))
4930
4931        # don't send anything before event is set to check
4932        # that non-blocking recv() raises BlockingIOError
4933        self.event.wait()
4934
4935        # send data: recv() will no longer block
4936        self.cli.sendall(MSG)
4937
4938
4939class FileObjectClassTestCase(SocketConnectedTest):
4940    """Unit tests for the object returned by socket.makefile()
4941
4942    self.read_file is the io object returned by makefile() on
4943    the client connection.  You can read from this file to
4944    get output from the server.
4945
4946    self.write_file is the io object returned by makefile() on the
4947    server connection.  You can write to this file to send output
4948    to the client.
4949    """
4950
4951    bufsize = -1 # Use default buffer size
4952    encoding = 'utf-8'
4953    errors = 'strict'
4954    newline = None
4955
4956    read_mode = 'rb'
4957    read_msg = MSG
4958    write_mode = 'wb'
4959    write_msg = MSG
4960
4961    def __init__(self, methodName='runTest'):
4962        SocketConnectedTest.__init__(self, methodName=methodName)
4963
4964    def setUp(self):
4965        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4966            threading.Event() for i in range(4)]
4967        SocketConnectedTest.setUp(self)
4968        self.read_file = self.cli_conn.makefile(
4969            self.read_mode, self.bufsize,
4970            encoding = self.encoding,
4971            errors = self.errors,
4972            newline = self.newline)
4973
4974    def tearDown(self):
4975        self.serv_finished.set()
4976        self.read_file.close()
4977        self.assertTrue(self.read_file.closed)
4978        self.read_file = None
4979        SocketConnectedTest.tearDown(self)
4980
4981    def clientSetUp(self):
4982        SocketConnectedTest.clientSetUp(self)
4983        self.write_file = self.serv_conn.makefile(
4984            self.write_mode, self.bufsize,
4985            encoding = self.encoding,
4986            errors = self.errors,
4987            newline = self.newline)
4988
4989    def clientTearDown(self):
4990        self.cli_finished.set()
4991        self.write_file.close()
4992        self.assertTrue(self.write_file.closed)
4993        self.write_file = None
4994        SocketConnectedTest.clientTearDown(self)
4995
4996    def testReadAfterTimeout(self):
4997        # Issue #7322: A file object must disallow further reads
4998        # after a timeout has occurred.
4999        self.cli_conn.settimeout(1)
5000        self.read_file.read(3)
5001        # First read raises a timeout
5002        self.assertRaises(TimeoutError, self.read_file.read, 1)
5003        # Second read is disallowed
5004        with self.assertRaises(OSError) as ctx:
5005            self.read_file.read(1)
5006        self.assertIn("cannot read from timed out object", str(ctx.exception))
5007
5008    def _testReadAfterTimeout(self):
5009        self.write_file.write(self.write_msg[0:3])
5010        self.write_file.flush()
5011        self.serv_finished.wait()
5012
5013    def testSmallRead(self):
5014        # Performing small file read test
5015        first_seg = self.read_file.read(len(self.read_msg)-3)
5016        second_seg = self.read_file.read(3)
5017        msg = first_seg + second_seg
5018        self.assertEqual(msg, self.read_msg)
5019
5020    def _testSmallRead(self):
5021        self.write_file.write(self.write_msg)
5022        self.write_file.flush()
5023
5024    def testFullRead(self):
5025        # read until EOF
5026        msg = self.read_file.read()
5027        self.assertEqual(msg, self.read_msg)
5028
5029    def _testFullRead(self):
5030        self.write_file.write(self.write_msg)
5031        self.write_file.close()
5032
5033    def testUnbufferedRead(self):
5034        # Performing unbuffered file read test
5035        buf = type(self.read_msg)()
5036        while 1:
5037            char = self.read_file.read(1)
5038            if not char:
5039                break
5040            buf += char
5041        self.assertEqual(buf, self.read_msg)
5042
5043    def _testUnbufferedRead(self):
5044        self.write_file.write(self.write_msg)
5045        self.write_file.flush()
5046
5047    def testReadline(self):
5048        # Performing file readline test
5049        line = self.read_file.readline()
5050        self.assertEqual(line, self.read_msg)
5051
5052    def _testReadline(self):
5053        self.write_file.write(self.write_msg)
5054        self.write_file.flush()
5055
5056    def testCloseAfterMakefile(self):
5057        # The file returned by makefile should keep the socket open.
5058        self.cli_conn.close()
5059        # read until EOF
5060        msg = self.read_file.read()
5061        self.assertEqual(msg, self.read_msg)
5062
5063    def _testCloseAfterMakefile(self):
5064        self.write_file.write(self.write_msg)
5065        self.write_file.flush()
5066
5067    def testMakefileAfterMakefileClose(self):
5068        self.read_file.close()
5069        msg = self.cli_conn.recv(len(MSG))
5070        if isinstance(self.read_msg, str):
5071            msg = msg.decode()
5072        self.assertEqual(msg, self.read_msg)
5073
5074    def _testMakefileAfterMakefileClose(self):
5075        self.write_file.write(self.write_msg)
5076        self.write_file.flush()
5077
5078    def testClosedAttr(self):
5079        self.assertTrue(not self.read_file.closed)
5080
5081    def _testClosedAttr(self):
5082        self.assertTrue(not self.write_file.closed)
5083
5084    def testAttributes(self):
5085        self.assertEqual(self.read_file.mode, self.read_mode)
5086        self.assertEqual(self.read_file.name, self.cli_conn.fileno())
5087
5088    def _testAttributes(self):
5089        self.assertEqual(self.write_file.mode, self.write_mode)
5090        self.assertEqual(self.write_file.name, self.serv_conn.fileno())
5091
5092    def testRealClose(self):
5093        self.read_file.close()
5094        self.assertRaises(ValueError, self.read_file.fileno)
5095        self.cli_conn.close()
5096        self.assertRaises(OSError, self.cli_conn.getsockname)
5097
5098    def _testRealClose(self):
5099        pass
5100
5101
5102class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
5103
5104    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
5105
5106    In this case (and in this case only), it should be possible to
5107    create a file object, read a line from it, create another file
5108    object, read another line from it, without loss of data in the
5109    first file object's buffer.  Note that http.client relies on this
5110    when reading multiple requests from the same socket."""
5111
5112    bufsize = 0 # Use unbuffered mode
5113
5114    def testUnbufferedReadline(self):
5115        # Read a line, create a new file object, read another line with it
5116        line = self.read_file.readline() # first line
5117        self.assertEqual(line, b"A. " + self.write_msg) # first line
5118        self.read_file = self.cli_conn.makefile('rb', 0)
5119        line = self.read_file.readline() # second line
5120        self.assertEqual(line, b"B. " + self.write_msg) # second line
5121
5122    def _testUnbufferedReadline(self):
5123        self.write_file.write(b"A. " + self.write_msg)
5124        self.write_file.write(b"B. " + self.write_msg)
5125        self.write_file.flush()
5126
5127    def testMakefileClose(self):
5128        # The file returned by makefile should keep the socket open...
5129        self.cli_conn.close()
5130        msg = self.cli_conn.recv(1024)
5131        self.assertEqual(msg, self.read_msg)
5132        # ...until the file is itself closed
5133        self.read_file.close()
5134        self.assertRaises(OSError, self.cli_conn.recv, 1024)
5135
5136    def _testMakefileClose(self):
5137        self.write_file.write(self.write_msg)
5138        self.write_file.flush()
5139
5140    def testMakefileCloseSocketDestroy(self):
5141        refcount_before = sys.getrefcount(self.cli_conn)
5142        self.read_file.close()
5143        refcount_after = sys.getrefcount(self.cli_conn)
5144        self.assertEqual(refcount_before - 1, refcount_after)
5145
5146    def _testMakefileCloseSocketDestroy(self):
5147        pass
5148
5149    # Non-blocking ops
5150    # NOTE: to set `read_file` as non-blocking, we must call
5151    # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
5152
5153    def testSmallReadNonBlocking(self):
5154        self.cli_conn.setblocking(False)
5155        self.assertEqual(self.read_file.readinto(bytearray(10)), None)
5156        self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
5157        self.evt1.set()
5158        self.evt2.wait(1.0)
5159        first_seg = self.read_file.read(len(self.read_msg) - 3)
5160        if first_seg is None:
5161            # Data not arrived (can happen under Windows), wait a bit
5162            time.sleep(0.5)
5163            first_seg = self.read_file.read(len(self.read_msg) - 3)
5164        buf = bytearray(10)
5165        n = self.read_file.readinto(buf)
5166        self.assertEqual(n, 3)
5167        msg = first_seg + buf[:n]
5168        self.assertEqual(msg, self.read_msg)
5169        self.assertEqual(self.read_file.readinto(bytearray(16)), None)
5170        self.assertEqual(self.read_file.read(1), None)
5171
5172    def _testSmallReadNonBlocking(self):
5173        self.evt1.wait(1.0)
5174        self.write_file.write(self.write_msg)
5175        self.write_file.flush()
5176        self.evt2.set()
5177        # Avoid closing the socket before the server test has finished,
5178        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
5179        self.serv_finished.wait(5.0)
5180
5181    def testWriteNonBlocking(self):
5182        self.cli_finished.wait(5.0)
5183        # The client thread can't skip directly - the SkipTest exception
5184        # would appear as a failure.
5185        if self.serv_skipped:
5186            self.skipTest(self.serv_skipped)
5187
5188    def _testWriteNonBlocking(self):
5189        self.serv_skipped = None
5190        self.serv_conn.setblocking(False)
5191        # Try to saturate the socket buffer pipe with repeated large writes.
5192        BIG = b"x" * support.SOCK_MAX_SIZE
5193        LIMIT = 10
5194        # The first write() succeeds since a chunk of data can be buffered
5195        n = self.write_file.write(BIG)
5196        self.assertGreater(n, 0)
5197        for i in range(LIMIT):
5198            n = self.write_file.write(BIG)
5199            if n is None:
5200                # Succeeded
5201                break
5202            self.assertGreater(n, 0)
5203        else:
5204            # Let us know that this test didn't manage to establish
5205            # the expected conditions. This is not a failure in itself but,
5206            # if it happens repeatedly, the test should be fixed.
5207            self.serv_skipped = "failed to saturate the socket buffer"
5208
5209
5210class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5211
5212    bufsize = 1 # Default-buffered for reading; line-buffered for writing
5213
5214
5215class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5216
5217    bufsize = 2 # Exercise the buffering code
5218
5219
5220class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
5221    """Tests for socket.makefile() in text mode (rather than binary)"""
5222
5223    read_mode = 'r'
5224    read_msg = MSG.decode('utf-8')
5225    write_mode = 'wb'
5226    write_msg = MSG
5227    newline = ''
5228
5229
5230class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
5231    """Tests for socket.makefile() in text mode (rather than binary)"""
5232
5233    read_mode = 'rb'
5234    read_msg = MSG
5235    write_mode = 'w'
5236    write_msg = MSG.decode('utf-8')
5237    newline = ''
5238
5239
5240class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
5241    """Tests for socket.makefile() in text mode (rather than binary)"""
5242
5243    read_mode = 'r'
5244    read_msg = MSG.decode('utf-8')
5245    write_mode = 'w'
5246    write_msg = MSG.decode('utf-8')
5247    newline = ''
5248
5249
5250class NetworkConnectionTest(object):
5251    """Prove network connection."""
5252
5253    def clientSetUp(self):
5254        # We're inherited below by BasicTCPTest2, which also inherits
5255        # BasicTCPTest, which defines self.port referenced below.
5256        self.cli = socket.create_connection((HOST, self.port))
5257        self.serv_conn = self.cli
5258
5259class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
5260    """Tests that NetworkConnection does not break existing TCP functionality.
5261    """
5262
5263class NetworkConnectionNoServer(unittest.TestCase):
5264
5265    class MockSocket(socket.socket):
5266        def connect(self, *args):
5267            raise TimeoutError('timed out')
5268
5269    @contextlib.contextmanager
5270    def mocked_socket_module(self):
5271        """Return a socket which times out on connect"""
5272        old_socket = socket.socket
5273        socket.socket = self.MockSocket
5274        try:
5275            yield
5276        finally:
5277            socket.socket = old_socket
5278
5279    def test_connect(self):
5280        port = socket_helper.find_unused_port()
5281        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
5282        self.addCleanup(cli.close)
5283        with self.assertRaises(OSError) as cm:
5284            cli.connect((HOST, port))
5285        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
5286
5287    def test_create_connection(self):
5288        # Issue #9792: errors raised by create_connection() should have
5289        # a proper errno attribute.
5290        port = socket_helper.find_unused_port()
5291        with self.assertRaises(OSError) as cm:
5292            socket.create_connection((HOST, port))
5293
5294        # Issue #16257: create_connection() calls getaddrinfo() against
5295        # 'localhost'.  This may result in an IPV6 addr being returned
5296        # as well as an IPV4 one:
5297        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
5298        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
5299        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
5300        #
5301        # create_connection() enumerates through all the addresses returned
5302        # and if it doesn't successfully bind to any of them, it propagates
5303        # the last exception it encountered.
5304        #
5305        # On Solaris, ENETUNREACH is returned in this circumstance instead
5306        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
5307        # expected errnos.
5308        expected_errnos = socket_helper.get_socket_conn_refused_errs()
5309        self.assertIn(cm.exception.errno, expected_errnos)
5310
5311    def test_create_connection_all_errors(self):
5312        port = socket_helper.find_unused_port()
5313        try:
5314            socket.create_connection((HOST, port), all_errors=True)
5315        except ExceptionGroup as e:
5316            eg = e
5317        else:
5318            self.fail('expected connection to fail')
5319
5320        self.assertIsInstance(eg, ExceptionGroup)
5321        for e in eg.exceptions:
5322            self.assertIsInstance(e, OSError)
5323
5324        addresses = socket.getaddrinfo(
5325            'localhost', port, 0, socket.SOCK_STREAM)
5326        # assert that we got an exception for each address
5327        self.assertEqual(len(addresses), len(eg.exceptions))
5328
5329    def test_create_connection_timeout(self):
5330        # Issue #9792: create_connection() should not recast timeout errors
5331        # as generic socket errors.
5332        with self.mocked_socket_module():
5333            try:
5334                socket.create_connection((HOST, 1234))
5335            except TimeoutError:
5336                pass
5337            except OSError as exc:
5338                if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
5339                    raise
5340            else:
5341                self.fail('TimeoutError not raised')
5342
5343
5344class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
5345
5346    def __init__(self, methodName='runTest'):
5347        SocketTCPTest.__init__(self, methodName=methodName)
5348        ThreadableTest.__init__(self)
5349
5350    def clientSetUp(self):
5351        self.source_port = socket_helper.find_unused_port()
5352
5353    def clientTearDown(self):
5354        self.cli.close()
5355        self.cli = None
5356        ThreadableTest.clientTearDown(self)
5357
5358    def _justAccept(self):
5359        conn, addr = self.serv.accept()
5360        conn.close()
5361
5362    testFamily = _justAccept
5363    def _testFamily(self):
5364        self.cli = socket.create_connection((HOST, self.port),
5365                            timeout=support.LOOPBACK_TIMEOUT)
5366        self.addCleanup(self.cli.close)
5367        self.assertEqual(self.cli.family, 2)
5368
5369    testSourceAddress = _justAccept
5370    def _testSourceAddress(self):
5371        self.cli = socket.create_connection((HOST, self.port),
5372                            timeout=support.LOOPBACK_TIMEOUT,
5373                            source_address=('', self.source_port))
5374        self.addCleanup(self.cli.close)
5375        self.assertEqual(self.cli.getsockname()[1], self.source_port)
5376        # The port number being used is sufficient to show that the bind()
5377        # call happened.
5378
5379    testTimeoutDefault = _justAccept
5380    def _testTimeoutDefault(self):
5381        # passing no explicit timeout uses socket's global default
5382        self.assertTrue(socket.getdefaulttimeout() is None)
5383        socket.setdefaulttimeout(42)
5384        try:
5385            self.cli = socket.create_connection((HOST, self.port))
5386            self.addCleanup(self.cli.close)
5387        finally:
5388            socket.setdefaulttimeout(None)
5389        self.assertEqual(self.cli.gettimeout(), 42)
5390
5391    testTimeoutNone = _justAccept
5392    def _testTimeoutNone(self):
5393        # None timeout means the same as sock.settimeout(None)
5394        self.assertTrue(socket.getdefaulttimeout() is None)
5395        socket.setdefaulttimeout(30)
5396        try:
5397            self.cli = socket.create_connection((HOST, self.port), timeout=None)
5398            self.addCleanup(self.cli.close)
5399        finally:
5400            socket.setdefaulttimeout(None)
5401        self.assertEqual(self.cli.gettimeout(), None)
5402
5403    testTimeoutValueNamed = _justAccept
5404    def _testTimeoutValueNamed(self):
5405        self.cli = socket.create_connection((HOST, self.port), timeout=30)
5406        self.assertEqual(self.cli.gettimeout(), 30)
5407
5408    testTimeoutValueNonamed = _justAccept
5409    def _testTimeoutValueNonamed(self):
5410        self.cli = socket.create_connection((HOST, self.port), 30)
5411        self.addCleanup(self.cli.close)
5412        self.assertEqual(self.cli.gettimeout(), 30)
5413
5414
5415class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
5416
5417    def __init__(self, methodName='runTest'):
5418        SocketTCPTest.__init__(self, methodName=methodName)
5419        ThreadableTest.__init__(self)
5420
5421    def clientSetUp(self):
5422        pass
5423
5424    def clientTearDown(self):
5425        self.cli.close()
5426        self.cli = None
5427        ThreadableTest.clientTearDown(self)
5428
5429    def testInsideTimeout(self):
5430        conn, addr = self.serv.accept()
5431        self.addCleanup(conn.close)
5432        time.sleep(3)
5433        conn.send(b"done!")
5434    testOutsideTimeout = testInsideTimeout
5435
5436    def _testInsideTimeout(self):
5437        self.cli = sock = socket.create_connection((HOST, self.port))
5438        data = sock.recv(5)
5439        self.assertEqual(data, b"done!")
5440
5441    def _testOutsideTimeout(self):
5442        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
5443        self.assertRaises(TimeoutError, lambda: sock.recv(5))
5444
5445
5446class TCPTimeoutTest(SocketTCPTest):
5447
5448    def testTCPTimeout(self):
5449        def raise_timeout(*args, **kwargs):
5450            self.serv.settimeout(1.0)
5451            self.serv.accept()
5452        self.assertRaises(TimeoutError, raise_timeout,
5453                              "Error generating a timeout exception (TCP)")
5454
5455    def testTimeoutZero(self):
5456        ok = False
5457        try:
5458            self.serv.settimeout(0.0)
5459            foo = self.serv.accept()
5460        except TimeoutError:
5461            self.fail("caught timeout instead of error (TCP)")
5462        except OSError:
5463            ok = True
5464        except:
5465            self.fail("caught unexpected exception (TCP)")
5466        if not ok:
5467            self.fail("accept() returned success when we did not expect it")
5468
5469    @unittest.skipUnless(hasattr(signal, 'alarm'),
5470                         'test needs signal.alarm()')
5471    def testInterruptedTimeout(self):
5472        # XXX I don't know how to do this test on MSWindows or any other
5473        # platform that doesn't support signal.alarm() or os.kill(), though
5474        # the bug should have existed on all platforms.
5475        self.serv.settimeout(5.0)   # must be longer than alarm
5476        class Alarm(Exception):
5477            pass
5478        def alarm_handler(signal, frame):
5479            raise Alarm
5480        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
5481        try:
5482            try:
5483                signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
5484                foo = self.serv.accept()
5485            except TimeoutError:
5486                self.fail("caught timeout instead of Alarm")
5487            except Alarm:
5488                pass
5489            except:
5490                self.fail("caught other exception instead of Alarm:"
5491                          " %s(%s):\n%s" %
5492                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
5493            else:
5494                self.fail("nothing caught")
5495            finally:
5496                signal.alarm(0)         # shut off alarm
5497        except Alarm:
5498            self.fail("got Alarm in wrong place")
5499        finally:
5500            # no alarm can be pending.  Safe to restore old handler.
5501            signal.signal(signal.SIGALRM, old_alarm)
5502
5503class UDPTimeoutTest(SocketUDPTest):
5504
5505    def testUDPTimeout(self):
5506        def raise_timeout(*args, **kwargs):
5507            self.serv.settimeout(1.0)
5508            self.serv.recv(1024)
5509        self.assertRaises(TimeoutError, raise_timeout,
5510                              "Error generating a timeout exception (UDP)")
5511
5512    def testTimeoutZero(self):
5513        ok = False
5514        try:
5515            self.serv.settimeout(0.0)
5516            foo = self.serv.recv(1024)
5517        except TimeoutError:
5518            self.fail("caught timeout instead of error (UDP)")
5519        except OSError:
5520            ok = True
5521        except:
5522            self.fail("caught unexpected exception (UDP)")
5523        if not ok:
5524            self.fail("recv() returned success when we did not expect it")
5525
5526@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
5527          'UDPLITE sockets required for this test.')
5528class UDPLITETimeoutTest(SocketUDPLITETest):
5529
5530    def testUDPLITETimeout(self):
5531        def raise_timeout(*args, **kwargs):
5532            self.serv.settimeout(1.0)
5533            self.serv.recv(1024)
5534        self.assertRaises(TimeoutError, raise_timeout,
5535                              "Error generating a timeout exception (UDPLITE)")
5536
5537    def testTimeoutZero(self):
5538        ok = False
5539        try:
5540            self.serv.settimeout(0.0)
5541            foo = self.serv.recv(1024)
5542        except TimeoutError:
5543            self.fail("caught timeout instead of error (UDPLITE)")
5544        except OSError:
5545            ok = True
5546        except:
5547            self.fail("caught unexpected exception (UDPLITE)")
5548        if not ok:
5549            self.fail("recv() returned success when we did not expect it")
5550
5551class TestExceptions(unittest.TestCase):
5552
5553    def testExceptionTree(self):
5554        self.assertTrue(issubclass(OSError, Exception))
5555        self.assertTrue(issubclass(socket.herror, OSError))
5556        self.assertTrue(issubclass(socket.gaierror, OSError))
5557        self.assertTrue(issubclass(socket.timeout, OSError))
5558        self.assertIs(socket.error, OSError)
5559        self.assertIs(socket.timeout, TimeoutError)
5560
5561    def test_setblocking_invalidfd(self):
5562        # Regression test for issue #28471
5563
5564        sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
5565        sock = socket.socket(
5566            socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
5567        sock0.close()
5568        self.addCleanup(sock.detach)
5569
5570        with self.assertRaises(OSError):
5571            sock.setblocking(False)
5572
5573
5574@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
5575class TestLinuxAbstractNamespace(unittest.TestCase):
5576
5577    UNIX_PATH_MAX = 108
5578
5579    def testLinuxAbstractNamespace(self):
5580        address = b"\x00python-test-hello\x00\xff"
5581        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
5582            s1.bind(address)
5583            s1.listen()
5584            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
5585                s2.connect(s1.getsockname())
5586                with s1.accept()[0] as s3:
5587                    self.assertEqual(s1.getsockname(), address)
5588                    self.assertEqual(s2.getpeername(), address)
5589
5590    def testMaxName(self):
5591        address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
5592        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5593            s.bind(address)
5594            self.assertEqual(s.getsockname(), address)
5595
5596    def testNameOverflow(self):
5597        address = "\x00" + "h" * self.UNIX_PATH_MAX
5598        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5599            self.assertRaises(OSError, s.bind, address)
5600
5601    def testStrName(self):
5602        # Check that an abstract name can be passed as a string.
5603        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5604        try:
5605            s.bind("\x00python\x00test\x00")
5606            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5607        finally:
5608            s.close()
5609
5610    def testBytearrayName(self):
5611        # Check that an abstract name can be passed as a bytearray.
5612        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5613            s.bind(bytearray(b"\x00python\x00test\x00"))
5614            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5615
5616    def testAutobind(self):
5617        # Check that binding to an empty string binds to an available address
5618        # in the abstract namespace as specified in unix(7) "Autobind feature".
5619        abstract_address = b"^\0[0-9a-f]{5}"
5620        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
5621            s1.bind("")
5622            self.assertRegex(s1.getsockname(), abstract_address)
5623            # Each socket is bound to a different abstract address.
5624            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
5625                s2.bind("")
5626                self.assertRegex(s2.getsockname(), abstract_address)
5627                self.assertNotEqual(s1.getsockname(), s2.getsockname())
5628
5629
5630@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
5631class TestUnixDomain(unittest.TestCase):
5632
5633    def setUp(self):
5634        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5635
5636    def tearDown(self):
5637        self.sock.close()
5638
5639    def encoded(self, path):
5640        # Return the given path encoded in the file system encoding,
5641        # or skip the test if this is not possible.
5642        try:
5643            return os.fsencode(path)
5644        except UnicodeEncodeError:
5645            self.skipTest(
5646                "Pathname {0!a} cannot be represented in file "
5647                "system encoding {1!r}".format(
5648                    path, sys.getfilesystemencoding()))
5649
5650    def bind(self, sock, path):
5651        # Bind the socket
5652        try:
5653            socket_helper.bind_unix_socket(sock, path)
5654        except OSError as e:
5655            if str(e) == "AF_UNIX path too long":
5656                self.skipTest(
5657                    "Pathname {0!a} is too long to serve as an AF_UNIX path"
5658                    .format(path))
5659            else:
5660                raise
5661
5662    def testUnbound(self):
5663        # Issue #30205 (note getsockname() can return None on OS X)
5664        self.assertIn(self.sock.getsockname(), ('', None))
5665
5666    def testStrAddr(self):
5667        # Test binding to and retrieving a normal string pathname.
5668        path = os.path.abspath(os_helper.TESTFN)
5669        self.bind(self.sock, path)
5670        self.addCleanup(os_helper.unlink, path)
5671        self.assertEqual(self.sock.getsockname(), path)
5672
5673    def testBytesAddr(self):
5674        # Test binding to a bytes pathname.
5675        path = os.path.abspath(os_helper.TESTFN)
5676        self.bind(self.sock, self.encoded(path))
5677        self.addCleanup(os_helper.unlink, path)
5678        self.assertEqual(self.sock.getsockname(), path)
5679
5680    def testSurrogateescapeBind(self):
5681        # Test binding to a valid non-ASCII pathname, with the
5682        # non-ASCII bytes supplied using surrogateescape encoding.
5683        path = os.path.abspath(os_helper.TESTFN_UNICODE)
5684        b = self.encoded(path)
5685        self.bind(self.sock, b.decode("ascii", "surrogateescape"))
5686        self.addCleanup(os_helper.unlink, path)
5687        self.assertEqual(self.sock.getsockname(), path)
5688
5689    def testUnencodableAddr(self):
5690        # Test binding to a pathname that cannot be encoded in the
5691        # file system encoding.
5692        if os_helper.TESTFN_UNENCODABLE is None:
5693            self.skipTest("No unencodable filename available")
5694        path = os.path.abspath(os_helper.TESTFN_UNENCODABLE)
5695        self.bind(self.sock, path)
5696        self.addCleanup(os_helper.unlink, path)
5697        self.assertEqual(self.sock.getsockname(), path)
5698
5699    @unittest.skipIf(sys.platform == 'linux', 'Linux specific test')
5700    def testEmptyAddress(self):
5701        # Test that binding empty address fails.
5702        self.assertRaises(OSError, self.sock.bind, "")
5703
5704
5705class BufferIOTest(SocketConnectedTest):
5706    """
5707    Test the buffer versions of socket.recv() and socket.send().
5708    """
5709    def __init__(self, methodName='runTest'):
5710        SocketConnectedTest.__init__(self, methodName=methodName)
5711
5712    def testRecvIntoArray(self):
5713        buf = array.array("B", [0] * len(MSG))
5714        nbytes = self.cli_conn.recv_into(buf)
5715        self.assertEqual(nbytes, len(MSG))
5716        buf = buf.tobytes()
5717        msg = buf[:len(MSG)]
5718        self.assertEqual(msg, MSG)
5719
5720    def _testRecvIntoArray(self):
5721        buf = bytes(MSG)
5722        self.serv_conn.send(buf)
5723
5724    def testRecvIntoBytearray(self):
5725        buf = bytearray(1024)
5726        nbytes = self.cli_conn.recv_into(buf)
5727        self.assertEqual(nbytes, len(MSG))
5728        msg = buf[:len(MSG)]
5729        self.assertEqual(msg, MSG)
5730
5731    _testRecvIntoBytearray = _testRecvIntoArray
5732
5733    def testRecvIntoMemoryview(self):
5734        buf = bytearray(1024)
5735        nbytes = self.cli_conn.recv_into(memoryview(buf))
5736        self.assertEqual(nbytes, len(MSG))
5737        msg = buf[:len(MSG)]
5738        self.assertEqual(msg, MSG)
5739
5740    _testRecvIntoMemoryview = _testRecvIntoArray
5741
5742    def testRecvFromIntoArray(self):
5743        buf = array.array("B", [0] * len(MSG))
5744        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5745        self.assertEqual(nbytes, len(MSG))
5746        buf = buf.tobytes()
5747        msg = buf[:len(MSG)]
5748        self.assertEqual(msg, MSG)
5749
5750    def _testRecvFromIntoArray(self):
5751        buf = bytes(MSG)
5752        self.serv_conn.send(buf)
5753
5754    def testRecvFromIntoBytearray(self):
5755        buf = bytearray(1024)
5756        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5757        self.assertEqual(nbytes, len(MSG))
5758        msg = buf[:len(MSG)]
5759        self.assertEqual(msg, MSG)
5760
5761    _testRecvFromIntoBytearray = _testRecvFromIntoArray
5762
5763    def testRecvFromIntoMemoryview(self):
5764        buf = bytearray(1024)
5765        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
5766        self.assertEqual(nbytes, len(MSG))
5767        msg = buf[:len(MSG)]
5768        self.assertEqual(msg, MSG)
5769
5770    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
5771
5772    def testRecvFromIntoSmallBuffer(self):
5773        # See issue #20246.
5774        buf = bytearray(8)
5775        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
5776
5777    def _testRecvFromIntoSmallBuffer(self):
5778        self.serv_conn.send(MSG)
5779
5780    def testRecvFromIntoEmptyBuffer(self):
5781        buf = bytearray()
5782        self.cli_conn.recvfrom_into(buf)
5783        self.cli_conn.recvfrom_into(buf, 0)
5784
5785    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
5786
5787
5788TIPC_STYPE = 2000
5789TIPC_LOWER = 200
5790TIPC_UPPER = 210
5791
5792def isTipcAvailable():
5793    """Check if the TIPC module is loaded
5794
5795    The TIPC module is not loaded automatically on Ubuntu and probably
5796    other Linux distros.
5797    """
5798    if not hasattr(socket, "AF_TIPC"):
5799        return False
5800    try:
5801        f = open("/proc/modules", encoding="utf-8")
5802    except (FileNotFoundError, IsADirectoryError, PermissionError):
5803        # It's ok if the file does not exist, is a directory or if we
5804        # have not the permission to read it.
5805        return False
5806    with f:
5807        for line in f:
5808            if line.startswith("tipc "):
5809                return True
5810    return False
5811
5812@unittest.skipUnless(isTipcAvailable(),
5813                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5814class TIPCTest(unittest.TestCase):
5815    def testRDM(self):
5816        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5817        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5818        self.addCleanup(srv.close)
5819        self.addCleanup(cli.close)
5820
5821        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5822        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5823                TIPC_LOWER, TIPC_UPPER)
5824        srv.bind(srvaddr)
5825
5826        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5827                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5828        cli.sendto(MSG, sendaddr)
5829
5830        msg, recvaddr = srv.recvfrom(1024)
5831
5832        self.assertEqual(cli.getsockname(), recvaddr)
5833        self.assertEqual(msg, MSG)
5834
5835
5836@unittest.skipUnless(isTipcAvailable(),
5837                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5838class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
5839    def __init__(self, methodName = 'runTest'):
5840        unittest.TestCase.__init__(self, methodName = methodName)
5841        ThreadableTest.__init__(self)
5842
5843    def setUp(self):
5844        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5845        self.addCleanup(self.srv.close)
5846        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5847        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5848                TIPC_LOWER, TIPC_UPPER)
5849        self.srv.bind(srvaddr)
5850        self.srv.listen()
5851        self.serverExplicitReady()
5852        self.conn, self.connaddr = self.srv.accept()
5853        self.addCleanup(self.conn.close)
5854
5855    def clientSetUp(self):
5856        # There is a hittable race between serverExplicitReady() and the
5857        # accept() call; sleep a little while to avoid it, otherwise
5858        # we could get an exception
5859        time.sleep(0.1)
5860        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5861        self.addCleanup(self.cli.close)
5862        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5863                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5864        self.cli.connect(addr)
5865        self.cliaddr = self.cli.getsockname()
5866
5867    def testStream(self):
5868        msg = self.conn.recv(1024)
5869        self.assertEqual(msg, MSG)
5870        self.assertEqual(self.cliaddr, self.connaddr)
5871
5872    def _testStream(self):
5873        self.cli.send(MSG)
5874        self.cli.close()
5875
5876
5877class ContextManagersTest(ThreadedTCPSocketTest):
5878
5879    def _testSocketClass(self):
5880        # base test
5881        with socket.socket() as sock:
5882            self.assertFalse(sock._closed)
5883        self.assertTrue(sock._closed)
5884        # close inside with block
5885        with socket.socket() as sock:
5886            sock.close()
5887        self.assertTrue(sock._closed)
5888        # exception inside with block
5889        with socket.socket() as sock:
5890            self.assertRaises(OSError, sock.sendall, b'foo')
5891        self.assertTrue(sock._closed)
5892
5893    def testCreateConnectionBase(self):
5894        conn, addr = self.serv.accept()
5895        self.addCleanup(conn.close)
5896        data = conn.recv(1024)
5897        conn.sendall(data)
5898
5899    def _testCreateConnectionBase(self):
5900        address = self.serv.getsockname()
5901        with socket.create_connection(address) as sock:
5902            self.assertFalse(sock._closed)
5903            sock.sendall(b'foo')
5904            self.assertEqual(sock.recv(1024), b'foo')
5905        self.assertTrue(sock._closed)
5906
5907    def testCreateConnectionClose(self):
5908        conn, addr = self.serv.accept()
5909        self.addCleanup(conn.close)
5910        data = conn.recv(1024)
5911        conn.sendall(data)
5912
5913    def _testCreateConnectionClose(self):
5914        address = self.serv.getsockname()
5915        with socket.create_connection(address) as sock:
5916            sock.close()
5917        self.assertTrue(sock._closed)
5918        self.assertRaises(OSError, sock.sendall, b'foo')
5919
5920
5921class InheritanceTest(unittest.TestCase):
5922    @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
5923                         "SOCK_CLOEXEC not defined")
5924    @support.requires_linux_version(2, 6, 28)
5925    def test_SOCK_CLOEXEC(self):
5926        with socket.socket(socket.AF_INET,
5927                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
5928            self.assertEqual(s.type, socket.SOCK_STREAM)
5929            self.assertFalse(s.get_inheritable())
5930
5931    def test_default_inheritable(self):
5932        sock = socket.socket()
5933        with sock:
5934            self.assertEqual(sock.get_inheritable(), False)
5935
5936    def test_dup(self):
5937        sock = socket.socket()
5938        with sock:
5939            newsock = sock.dup()
5940            sock.close()
5941            with newsock:
5942                self.assertEqual(newsock.get_inheritable(), False)
5943
5944    def test_set_inheritable(self):
5945        sock = socket.socket()
5946        with sock:
5947            sock.set_inheritable(True)
5948            self.assertEqual(sock.get_inheritable(), True)
5949
5950            sock.set_inheritable(False)
5951            self.assertEqual(sock.get_inheritable(), False)
5952
5953    @unittest.skipIf(fcntl is None, "need fcntl")
5954    def test_get_inheritable_cloexec(self):
5955        sock = socket.socket()
5956        with sock:
5957            fd = sock.fileno()
5958            self.assertEqual(sock.get_inheritable(), False)
5959
5960            # clear FD_CLOEXEC flag
5961            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
5962            flags &= ~fcntl.FD_CLOEXEC
5963            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
5964
5965            self.assertEqual(sock.get_inheritable(), True)
5966
5967    @unittest.skipIf(fcntl is None, "need fcntl")
5968    def test_set_inheritable_cloexec(self):
5969        sock = socket.socket()
5970        with sock:
5971            fd = sock.fileno()
5972            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5973                             fcntl.FD_CLOEXEC)
5974
5975            sock.set_inheritable(True)
5976            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5977                             0)
5978
5979
5980    def test_socketpair(self):
5981        s1, s2 = socket.socketpair()
5982        self.addCleanup(s1.close)
5983        self.addCleanup(s2.close)
5984        self.assertEqual(s1.get_inheritable(), False)
5985        self.assertEqual(s2.get_inheritable(), False)
5986
5987
5988@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
5989                     "SOCK_NONBLOCK not defined")
5990class NonblockConstantTest(unittest.TestCase):
5991    def checkNonblock(self, s, nonblock=True, timeout=0.0):
5992        if nonblock:
5993            self.assertEqual(s.type, socket.SOCK_STREAM)
5994            self.assertEqual(s.gettimeout(), timeout)
5995            self.assertTrue(
5996                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5997            if timeout == 0:
5998                # timeout == 0: means that getblocking() must be False.
5999                self.assertFalse(s.getblocking())
6000            else:
6001                # If timeout > 0, the socket will be in a "blocking" mode
6002                # from the standpoint of the Python API.  For Python socket
6003                # object, "blocking" means that operations like 'sock.recv()'
6004                # will block.  Internally, file descriptors for
6005                # "blocking" Python sockets *with timeouts* are in a
6006                # *non-blocking* mode, and 'sock.recv()' uses 'select()'
6007                # and handles EWOULDBLOCK/EAGAIN to enforce the timeout.
6008                self.assertTrue(s.getblocking())
6009        else:
6010            self.assertEqual(s.type, socket.SOCK_STREAM)
6011            self.assertEqual(s.gettimeout(), None)
6012            self.assertFalse(
6013                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
6014            self.assertTrue(s.getblocking())
6015
6016    @support.requires_linux_version(2, 6, 28)
6017    def test_SOCK_NONBLOCK(self):
6018        # a lot of it seems silly and redundant, but I wanted to test that
6019        # changing back and forth worked ok
6020        with socket.socket(socket.AF_INET,
6021                           socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
6022            self.checkNonblock(s)
6023            s.setblocking(True)
6024            self.checkNonblock(s, nonblock=False)
6025            s.setblocking(False)
6026            self.checkNonblock(s)
6027            s.settimeout(None)
6028            self.checkNonblock(s, nonblock=False)
6029            s.settimeout(2.0)
6030            self.checkNonblock(s, timeout=2.0)
6031            s.setblocking(True)
6032            self.checkNonblock(s, nonblock=False)
6033        # defaulttimeout
6034        t = socket.getdefaulttimeout()
6035        socket.setdefaulttimeout(0.0)
6036        with socket.socket() as s:
6037            self.checkNonblock(s)
6038        socket.setdefaulttimeout(None)
6039        with socket.socket() as s:
6040            self.checkNonblock(s, False)
6041        socket.setdefaulttimeout(2.0)
6042        with socket.socket() as s:
6043            self.checkNonblock(s, timeout=2.0)
6044        socket.setdefaulttimeout(None)
6045        with socket.socket() as s:
6046            self.checkNonblock(s, False)
6047        socket.setdefaulttimeout(t)
6048
6049
6050@unittest.skipUnless(os.name == "nt", "Windows specific")
6051@unittest.skipUnless(multiprocessing, "need multiprocessing")
6052class TestSocketSharing(SocketTCPTest):
6053    # This must be classmethod and not staticmethod or multiprocessing
6054    # won't be able to bootstrap it.
6055    @classmethod
6056    def remoteProcessServer(cls, q):
6057        # Recreate socket from shared data
6058        sdata = q.get()
6059        message = q.get()
6060
6061        s = socket.fromshare(sdata)
6062        s2, c = s.accept()
6063
6064        # Send the message
6065        s2.sendall(message)
6066        s2.close()
6067        s.close()
6068
6069    def testShare(self):
6070        # Transfer the listening server socket to another process
6071        # and service it from there.
6072
6073        # Create process:
6074        q = multiprocessing.Queue()
6075        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
6076        p.start()
6077
6078        # Get the shared socket data
6079        data = self.serv.share(p.pid)
6080
6081        # Pass the shared socket to the other process
6082        addr = self.serv.getsockname()
6083        self.serv.close()
6084        q.put(data)
6085
6086        # The data that the server will send us
6087        message = b"slapmahfro"
6088        q.put(message)
6089
6090        # Connect
6091        s = socket.create_connection(addr)
6092        #  listen for the data
6093        m = []
6094        while True:
6095            data = s.recv(100)
6096            if not data:
6097                break
6098            m.append(data)
6099        s.close()
6100        received = b"".join(m)
6101        self.assertEqual(received, message)
6102        p.join()
6103
6104    def testShareLength(self):
6105        data = self.serv.share(os.getpid())
6106        self.assertRaises(ValueError, socket.fromshare, data[:-1])
6107        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
6108
6109    def compareSockets(self, org, other):
6110        # socket sharing is expected to work only for blocking socket
6111        # since the internal python timeout value isn't transferred.
6112        self.assertEqual(org.gettimeout(), None)
6113        self.assertEqual(org.gettimeout(), other.gettimeout())
6114
6115        self.assertEqual(org.family, other.family)
6116        self.assertEqual(org.type, other.type)
6117        # If the user specified "0" for proto, then
6118        # internally windows will have picked the correct value.
6119        # Python introspection on the socket however will still return
6120        # 0.  For the shared socket, the python value is recreated
6121        # from the actual value, so it may not compare correctly.
6122        if org.proto != 0:
6123            self.assertEqual(org.proto, other.proto)
6124
6125    def testShareLocal(self):
6126        data = self.serv.share(os.getpid())
6127        s = socket.fromshare(data)
6128        try:
6129            self.compareSockets(self.serv, s)
6130        finally:
6131            s.close()
6132
6133    def testTypes(self):
6134        families = [socket.AF_INET, socket.AF_INET6]
6135        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
6136        for f in families:
6137            for t in types:
6138                try:
6139                    source = socket.socket(f, t)
6140                except OSError:
6141                    continue # This combination is not supported
6142                try:
6143                    data = source.share(os.getpid())
6144                    shared = socket.fromshare(data)
6145                    try:
6146                        self.compareSockets(source, shared)
6147                    finally:
6148                        shared.close()
6149                finally:
6150                    source.close()
6151
6152
6153class SendfileUsingSendTest(ThreadedTCPSocketTest):
6154    """
6155    Test the send() implementation of socket.sendfile().
6156    """
6157
6158    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
6159    BUFSIZE = 8192
6160    FILEDATA = b""
6161    TIMEOUT = support.LOOPBACK_TIMEOUT
6162
6163    @classmethod
6164    def setUpClass(cls):
6165        def chunks(total, step):
6166            assert total >= step
6167            while total > step:
6168                yield step
6169                total -= step
6170            if total:
6171                yield total
6172
6173        chunk = b"".join([random.choice(string.ascii_letters).encode()
6174                          for i in range(cls.BUFSIZE)])
6175        with open(os_helper.TESTFN, 'wb') as f:
6176            for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
6177                f.write(chunk)
6178        with open(os_helper.TESTFN, 'rb') as f:
6179            cls.FILEDATA = f.read()
6180            assert len(cls.FILEDATA) == cls.FILESIZE
6181
6182    @classmethod
6183    def tearDownClass(cls):
6184        os_helper.unlink(os_helper.TESTFN)
6185
6186    def accept_conn(self):
6187        self.serv.settimeout(support.LONG_TIMEOUT)
6188        conn, addr = self.serv.accept()
6189        conn.settimeout(self.TIMEOUT)
6190        self.addCleanup(conn.close)
6191        return conn
6192
6193    def recv_data(self, conn):
6194        received = []
6195        while True:
6196            chunk = conn.recv(self.BUFSIZE)
6197            if not chunk:
6198                break
6199            received.append(chunk)
6200        return b''.join(received)
6201
6202    def meth_from_sock(self, sock):
6203        # Depending on the mixin class being run return either send()
6204        # or sendfile() method implementation.
6205        return getattr(sock, "_sendfile_use_send")
6206
6207    # regular file
6208
6209    def _testRegularFile(self):
6210        address = self.serv.getsockname()
6211        file = open(os_helper.TESTFN, 'rb')
6212        with socket.create_connection(address) as sock, file as file:
6213            meth = self.meth_from_sock(sock)
6214            sent = meth(file)
6215            self.assertEqual(sent, self.FILESIZE)
6216            self.assertEqual(file.tell(), self.FILESIZE)
6217
6218    def testRegularFile(self):
6219        conn = self.accept_conn()
6220        data = self.recv_data(conn)
6221        self.assertEqual(len(data), self.FILESIZE)
6222        self.assertEqual(data, self.FILEDATA)
6223
6224    # non regular file
6225
6226    def _testNonRegularFile(self):
6227        address = self.serv.getsockname()
6228        file = io.BytesIO(self.FILEDATA)
6229        with socket.create_connection(address) as sock, file as file:
6230            sent = sock.sendfile(file)
6231            self.assertEqual(sent, self.FILESIZE)
6232            self.assertEqual(file.tell(), self.FILESIZE)
6233            self.assertRaises(socket._GiveupOnSendfile,
6234                              sock._sendfile_use_sendfile, file)
6235
6236    def testNonRegularFile(self):
6237        conn = self.accept_conn()
6238        data = self.recv_data(conn)
6239        self.assertEqual(len(data), self.FILESIZE)
6240        self.assertEqual(data, self.FILEDATA)
6241
6242    # empty file
6243
6244    def _testEmptyFileSend(self):
6245        address = self.serv.getsockname()
6246        filename = os_helper.TESTFN + "2"
6247        with open(filename, 'wb'):
6248            self.addCleanup(os_helper.unlink, filename)
6249        file = open(filename, 'rb')
6250        with socket.create_connection(address) as sock, file as file:
6251            meth = self.meth_from_sock(sock)
6252            sent = meth(file)
6253            self.assertEqual(sent, 0)
6254            self.assertEqual(file.tell(), 0)
6255
6256    def testEmptyFileSend(self):
6257        conn = self.accept_conn()
6258        data = self.recv_data(conn)
6259        self.assertEqual(data, b"")
6260
6261    # offset
6262
6263    def _testOffset(self):
6264        address = self.serv.getsockname()
6265        file = open(os_helper.TESTFN, 'rb')
6266        with socket.create_connection(address) as sock, file as file:
6267            meth = self.meth_from_sock(sock)
6268            sent = meth(file, offset=5000)
6269            self.assertEqual(sent, self.FILESIZE - 5000)
6270            self.assertEqual(file.tell(), self.FILESIZE)
6271
6272    def testOffset(self):
6273        conn = self.accept_conn()
6274        data = self.recv_data(conn)
6275        self.assertEqual(len(data), self.FILESIZE - 5000)
6276        self.assertEqual(data, self.FILEDATA[5000:])
6277
6278    # count
6279
6280    def _testCount(self):
6281        address = self.serv.getsockname()
6282        file = open(os_helper.TESTFN, 'rb')
6283        sock = socket.create_connection(address,
6284                                        timeout=support.LOOPBACK_TIMEOUT)
6285        with sock, file:
6286            count = 5000007
6287            meth = self.meth_from_sock(sock)
6288            sent = meth(file, count=count)
6289            self.assertEqual(sent, count)
6290            self.assertEqual(file.tell(), count)
6291
6292    def testCount(self):
6293        count = 5000007
6294        conn = self.accept_conn()
6295        data = self.recv_data(conn)
6296        self.assertEqual(len(data), count)
6297        self.assertEqual(data, self.FILEDATA[:count])
6298
6299    # count small
6300
6301    def _testCountSmall(self):
6302        address = self.serv.getsockname()
6303        file = open(os_helper.TESTFN, 'rb')
6304        sock = socket.create_connection(address,
6305                                        timeout=support.LOOPBACK_TIMEOUT)
6306        with sock, file:
6307            count = 1
6308            meth = self.meth_from_sock(sock)
6309            sent = meth(file, count=count)
6310            self.assertEqual(sent, count)
6311            self.assertEqual(file.tell(), count)
6312
6313    def testCountSmall(self):
6314        count = 1
6315        conn = self.accept_conn()
6316        data = self.recv_data(conn)
6317        self.assertEqual(len(data), count)
6318        self.assertEqual(data, self.FILEDATA[:count])
6319
6320    # count + offset
6321
6322    def _testCountWithOffset(self):
6323        address = self.serv.getsockname()
6324        file = open(os_helper.TESTFN, 'rb')
6325        with socket.create_connection(address, timeout=2) as sock, file as file:
6326            count = 100007
6327            meth = self.meth_from_sock(sock)
6328            sent = meth(file, offset=2007, count=count)
6329            self.assertEqual(sent, count)
6330            self.assertEqual(file.tell(), count + 2007)
6331
6332    def testCountWithOffset(self):
6333        count = 100007
6334        conn = self.accept_conn()
6335        data = self.recv_data(conn)
6336        self.assertEqual(len(data), count)
6337        self.assertEqual(data, self.FILEDATA[2007:count+2007])
6338
6339    # non blocking sockets are not supposed to work
6340
6341    def _testNonBlocking(self):
6342        address = self.serv.getsockname()
6343        file = open(os_helper.TESTFN, 'rb')
6344        with socket.create_connection(address) as sock, file as file:
6345            sock.setblocking(False)
6346            meth = self.meth_from_sock(sock)
6347            self.assertRaises(ValueError, meth, file)
6348            self.assertRaises(ValueError, sock.sendfile, file)
6349
6350    def testNonBlocking(self):
6351        conn = self.accept_conn()
6352        if conn.recv(8192):
6353            self.fail('was not supposed to receive any data')
6354
6355    # timeout (non-triggered)
6356
6357    def _testWithTimeout(self):
6358        address = self.serv.getsockname()
6359        file = open(os_helper.TESTFN, 'rb')
6360        sock = socket.create_connection(address,
6361                                        timeout=support.LOOPBACK_TIMEOUT)
6362        with sock, file:
6363            meth = self.meth_from_sock(sock)
6364            sent = meth(file)
6365            self.assertEqual(sent, self.FILESIZE)
6366
6367    def testWithTimeout(self):
6368        conn = self.accept_conn()
6369        data = self.recv_data(conn)
6370        self.assertEqual(len(data), self.FILESIZE)
6371        self.assertEqual(data, self.FILEDATA)
6372
6373    # timeout (triggered)
6374
6375    def _testWithTimeoutTriggeredSend(self):
6376        address = self.serv.getsockname()
6377        with open(os_helper.TESTFN, 'rb') as file:
6378            with socket.create_connection(address) as sock:
6379                sock.settimeout(0.01)
6380                meth = self.meth_from_sock(sock)
6381                self.assertRaises(TimeoutError, meth, file)
6382
6383    def testWithTimeoutTriggeredSend(self):
6384        conn = self.accept_conn()
6385        conn.recv(88192)
6386        # bpo-45212: the wait here needs to be longer than the client-side timeout (0.01s)
6387        time.sleep(1)
6388
6389    # errors
6390
6391    def _test_errors(self):
6392        pass
6393
6394    def test_errors(self):
6395        with open(os_helper.TESTFN, 'rb') as file:
6396            with socket.socket(type=socket.SOCK_DGRAM) as s:
6397                meth = self.meth_from_sock(s)
6398                self.assertRaisesRegex(
6399                    ValueError, "SOCK_STREAM", meth, file)
6400        with open(os_helper.TESTFN, encoding="utf-8") as file:
6401            with socket.socket() as s:
6402                meth = self.meth_from_sock(s)
6403                self.assertRaisesRegex(
6404                    ValueError, "binary mode", meth, file)
6405        with open(os_helper.TESTFN, 'rb') as file:
6406            with socket.socket() as s:
6407                meth = self.meth_from_sock(s)
6408                self.assertRaisesRegex(TypeError, "positive integer",
6409                                       meth, file, count='2')
6410                self.assertRaisesRegex(TypeError, "positive integer",
6411                                       meth, file, count=0.1)
6412                self.assertRaisesRegex(ValueError, "positive integer",
6413                                       meth, file, count=0)
6414                self.assertRaisesRegex(ValueError, "positive integer",
6415                                       meth, file, count=-1)
6416
6417
6418@unittest.skipUnless(hasattr(os, "sendfile"),
6419                     'os.sendfile() required for this test.')
6420class SendfileUsingSendfileTest(SendfileUsingSendTest):
6421    """
6422    Test the sendfile() implementation of socket.sendfile().
6423    """
6424    def meth_from_sock(self, sock):
6425        return getattr(sock, "_sendfile_use_sendfile")
6426
6427
6428@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
6429class LinuxKernelCryptoAPI(unittest.TestCase):
6430    # tests for AF_ALG
6431    def create_alg(self, typ, name):
6432        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6433        try:
6434            sock.bind((typ, name))
6435        except FileNotFoundError as e:
6436            # type / algorithm is not available
6437            sock.close()
6438            raise unittest.SkipTest(str(e), typ, name)
6439        else:
6440            return sock
6441
6442    # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY,
6443    # at least on ppc64le architecture
6444    @support.requires_linux_version(4, 5)
6445    def test_sha256(self):
6446        expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
6447                                 "177a9cb410ff61f20015ad")
6448        with self.create_alg('hash', 'sha256') as algo:
6449            op, _ = algo.accept()
6450            with op:
6451                op.sendall(b"abc")
6452                self.assertEqual(op.recv(512), expected)
6453
6454            op, _ = algo.accept()
6455            with op:
6456                op.send(b'a', socket.MSG_MORE)
6457                op.send(b'b', socket.MSG_MORE)
6458                op.send(b'c', socket.MSG_MORE)
6459                op.send(b'')
6460                self.assertEqual(op.recv(512), expected)
6461
6462    def test_hmac_sha1(self):
6463        expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
6464        with self.create_alg('hash', 'hmac(sha1)') as algo:
6465            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
6466            op, _ = algo.accept()
6467            with op:
6468                op.sendall(b"what do ya want for nothing?")
6469                self.assertEqual(op.recv(512), expected)
6470
6471    # Although it should work with 3.19 and newer the test blocks on
6472    # Ubuntu 15.10 with Kernel 4.2.0-19.
6473    @support.requires_linux_version(4, 3)
6474    def test_aes_cbc(self):
6475        key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
6476        iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
6477        msg = b"Single block msg"
6478        ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
6479        msglen = len(msg)
6480        with self.create_alg('skcipher', 'cbc(aes)') as algo:
6481            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6482            op, _ = algo.accept()
6483            with op:
6484                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6485                                 flags=socket.MSG_MORE)
6486                op.sendall(msg)
6487                self.assertEqual(op.recv(msglen), ciphertext)
6488
6489            op, _ = algo.accept()
6490            with op:
6491                op.sendmsg_afalg([ciphertext],
6492                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6493                self.assertEqual(op.recv(msglen), msg)
6494
6495            # long message
6496            multiplier = 1024
6497            longmsg = [msg] * multiplier
6498            op, _ = algo.accept()
6499            with op:
6500                op.sendmsg_afalg(longmsg,
6501                                 op=socket.ALG_OP_ENCRYPT, iv=iv)
6502                enc = op.recv(msglen * multiplier)
6503            self.assertEqual(len(enc), msglen * multiplier)
6504            self.assertEqual(enc[:msglen], ciphertext)
6505
6506            op, _ = algo.accept()
6507            with op:
6508                op.sendmsg_afalg([enc],
6509                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6510                dec = op.recv(msglen * multiplier)
6511            self.assertEqual(len(dec), msglen * multiplier)
6512            self.assertEqual(dec, msg * multiplier)
6513
6514    @support.requires_linux_version(4, 9)  # see issue29324
6515    def test_aead_aes_gcm(self):
6516        key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
6517        iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
6518        plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
6519        assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
6520        expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
6521        expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
6522
6523        taglen = len(expected_tag)
6524        assoclen = len(assoc)
6525
6526        with self.create_alg('aead', 'gcm(aes)') as algo:
6527            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6528            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
6529                            None, taglen)
6530
6531            # send assoc, plain and tag buffer in separate steps
6532            op, _ = algo.accept()
6533            with op:
6534                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6535                                 assoclen=assoclen, flags=socket.MSG_MORE)
6536                op.sendall(assoc, socket.MSG_MORE)
6537                op.sendall(plain)
6538                res = op.recv(assoclen + len(plain) + taglen)
6539                self.assertEqual(expected_ct, res[assoclen:-taglen])
6540                self.assertEqual(expected_tag, res[-taglen:])
6541
6542            # now with msg
6543            op, _ = algo.accept()
6544            with op:
6545                msg = assoc + plain
6546                op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
6547                                 assoclen=assoclen)
6548                res = op.recv(assoclen + len(plain) + taglen)
6549                self.assertEqual(expected_ct, res[assoclen:-taglen])
6550                self.assertEqual(expected_tag, res[-taglen:])
6551
6552            # create anc data manually
6553            pack_uint32 = struct.Struct('I').pack
6554            op, _ = algo.accept()
6555            with op:
6556                msg = assoc + plain
6557                op.sendmsg(
6558                    [msg],
6559                    ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
6560                     [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
6561                     [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
6562                    )
6563                )
6564                res = op.recv(len(msg) + taglen)
6565                self.assertEqual(expected_ct, res[assoclen:-taglen])
6566                self.assertEqual(expected_tag, res[-taglen:])
6567
6568            # decrypt and verify
6569            op, _ = algo.accept()
6570            with op:
6571                msg = assoc + expected_ct + expected_tag
6572                op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
6573                                 assoclen=assoclen)
6574                res = op.recv(len(msg) - taglen)
6575                self.assertEqual(plain, res[assoclen:])
6576
6577    @support.requires_linux_version(4, 3)  # see test_aes_cbc
6578    def test_drbg_pr_sha256(self):
6579        # deterministic random bit generator, prediction resistance, sha256
6580        with self.create_alg('rng', 'drbg_pr_sha256') as algo:
6581            extra_seed = os.urandom(32)
6582            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
6583            op, _ = algo.accept()
6584            with op:
6585                rn = op.recv(32)
6586                self.assertEqual(len(rn), 32)
6587
6588    def test_sendmsg_afalg_args(self):
6589        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6590        with sock:
6591            with self.assertRaises(TypeError):
6592                sock.sendmsg_afalg()
6593
6594            with self.assertRaises(TypeError):
6595                sock.sendmsg_afalg(op=None)
6596
6597            with self.assertRaises(TypeError):
6598                sock.sendmsg_afalg(1)
6599
6600            with self.assertRaises(TypeError):
6601                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
6602
6603            with self.assertRaises(TypeError):
6604                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
6605
6606    def test_length_restriction(self):
6607        # bpo-35050, off-by-one error in length check
6608        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6609        self.addCleanup(sock.close)
6610
6611        # salg_type[14]
6612        with self.assertRaises(FileNotFoundError):
6613            sock.bind(("t" * 13, "name"))
6614        with self.assertRaisesRegex(ValueError, "type too long"):
6615            sock.bind(("t" * 14, "name"))
6616
6617        # salg_name[64]
6618        with self.assertRaises(FileNotFoundError):
6619            sock.bind(("type", "n" * 63))
6620        with self.assertRaisesRegex(ValueError, "name too long"):
6621            sock.bind(("type", "n" * 64))
6622
6623
6624@unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
6625class TestMacOSTCPFlags(unittest.TestCase):
6626    def test_tcp_keepalive(self):
6627        self.assertTrue(socket.TCP_KEEPALIVE)
6628
6629
6630@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
6631class TestMSWindowsTCPFlags(unittest.TestCase):
6632    knownTCPFlags = {
6633                       # available since long time ago
6634                       'TCP_MAXSEG',
6635                       'TCP_NODELAY',
6636                       # available starting with Windows 10 1607
6637                       'TCP_FASTOPEN',
6638                       # available starting with Windows 10 1703
6639                       'TCP_KEEPCNT',
6640                       # available starting with Windows 10 1709
6641                       'TCP_KEEPIDLE',
6642                       'TCP_KEEPINTVL'
6643                       }
6644
6645    def test_new_tcp_flags(self):
6646        provided = [s for s in dir(socket) if s.startswith('TCP')]
6647        unknown = [s for s in provided if s not in self.knownTCPFlags]
6648
6649        self.assertEqual([], unknown,
6650            "New TCP flags were discovered. See bpo-32394 for more information")
6651
6652
6653class CreateServerTest(unittest.TestCase):
6654
6655    def test_address(self):
6656        port = socket_helper.find_unused_port()
6657        with socket.create_server(("127.0.0.1", port)) as sock:
6658            self.assertEqual(sock.getsockname()[0], "127.0.0.1")
6659            self.assertEqual(sock.getsockname()[1], port)
6660        if socket_helper.IPV6_ENABLED:
6661            with socket.create_server(("::1", port),
6662                                      family=socket.AF_INET6) as sock:
6663                self.assertEqual(sock.getsockname()[0], "::1")
6664                self.assertEqual(sock.getsockname()[1], port)
6665
6666    def test_family_and_type(self):
6667        with socket.create_server(("127.0.0.1", 0)) as sock:
6668            self.assertEqual(sock.family, socket.AF_INET)
6669            self.assertEqual(sock.type, socket.SOCK_STREAM)
6670        if socket_helper.IPV6_ENABLED:
6671            with socket.create_server(("::1", 0), family=socket.AF_INET6) as s:
6672                self.assertEqual(s.family, socket.AF_INET6)
6673                self.assertEqual(sock.type, socket.SOCK_STREAM)
6674
6675    def test_reuse_port(self):
6676        if not hasattr(socket, "SO_REUSEPORT"):
6677            with self.assertRaises(ValueError):
6678                socket.create_server(("localhost", 0), reuse_port=True)
6679        else:
6680            with socket.create_server(("localhost", 0)) as sock:
6681                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6682                self.assertEqual(opt, 0)
6683            with socket.create_server(("localhost", 0), reuse_port=True) as sock:
6684                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6685                self.assertNotEqual(opt, 0)
6686
6687    @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
6688                     not hasattr(_socket, 'IPV6_V6ONLY'),
6689                     "IPV6_V6ONLY option not supported")
6690    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6691    def test_ipv6_only_default(self):
6692        with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock:
6693            assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
6694
6695    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6696                     "dualstack_ipv6 not supported")
6697    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6698    def test_dualstack_ipv6_family(self):
6699        with socket.create_server(("::1", 0), family=socket.AF_INET6,
6700                                  dualstack_ipv6=True) as sock:
6701            self.assertEqual(sock.family, socket.AF_INET6)
6702
6703
6704class CreateServerFunctionalTest(unittest.TestCase):
6705    timeout = support.LOOPBACK_TIMEOUT
6706
6707    def echo_server(self, sock):
6708        def run(sock):
6709            with sock:
6710                conn, _ = sock.accept()
6711                with conn:
6712                    event.wait(self.timeout)
6713                    msg = conn.recv(1024)
6714                    if not msg:
6715                        return
6716                    conn.sendall(msg)
6717
6718        event = threading.Event()
6719        sock.settimeout(self.timeout)
6720        thread = threading.Thread(target=run, args=(sock, ))
6721        thread.start()
6722        self.addCleanup(thread.join, self.timeout)
6723        event.set()
6724
6725    def echo_client(self, addr, family):
6726        with socket.socket(family=family) as sock:
6727            sock.settimeout(self.timeout)
6728            sock.connect(addr)
6729            sock.sendall(b'foo')
6730            self.assertEqual(sock.recv(1024), b'foo')
6731
6732    def test_tcp4(self):
6733        port = socket_helper.find_unused_port()
6734        with socket.create_server(("", port)) as sock:
6735            self.echo_server(sock)
6736            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6737
6738    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6739    def test_tcp6(self):
6740        port = socket_helper.find_unused_port()
6741        with socket.create_server(("", port),
6742                                  family=socket.AF_INET6) as sock:
6743            self.echo_server(sock)
6744            self.echo_client(("::1", port), socket.AF_INET6)
6745
6746    # --- dual stack tests
6747
6748    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6749                     "dualstack_ipv6 not supported")
6750    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6751    def test_dual_stack_client_v4(self):
6752        port = socket_helper.find_unused_port()
6753        with socket.create_server(("", port), family=socket.AF_INET6,
6754                                  dualstack_ipv6=True) as sock:
6755            self.echo_server(sock)
6756            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6757
6758    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6759                     "dualstack_ipv6 not supported")
6760    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6761    def test_dual_stack_client_v6(self):
6762        port = socket_helper.find_unused_port()
6763        with socket.create_server(("", port), family=socket.AF_INET6,
6764                                  dualstack_ipv6=True) as sock:
6765            self.echo_server(sock)
6766            self.echo_client(("::1", port), socket.AF_INET6)
6767
6768@requireAttrs(socket, "send_fds")
6769@requireAttrs(socket, "recv_fds")
6770@requireAttrs(socket, "AF_UNIX")
6771class SendRecvFdsTests(unittest.TestCase):
6772    def testSendAndRecvFds(self):
6773        def close_pipes(pipes):
6774            for fd1, fd2 in pipes:
6775                os.close(fd1)
6776                os.close(fd2)
6777
6778        def close_fds(fds):
6779            for fd in fds:
6780                os.close(fd)
6781
6782        # send 10 file descriptors
6783        pipes = [os.pipe() for _ in range(10)]
6784        self.addCleanup(close_pipes, pipes)
6785        fds = [rfd for rfd, wfd in pipes]
6786
6787        # use a UNIX socket pair to exchange file descriptors locally
6788        sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
6789        with sock1, sock2:
6790            socket.send_fds(sock1, [MSG], fds)
6791            # request more data and file descriptors than expected
6792            msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2)
6793            self.addCleanup(close_fds, fds2)
6794
6795        self.assertEqual(msg, MSG)
6796        self.assertEqual(len(fds2), len(fds))
6797        self.assertEqual(flags, 0)
6798        # don't test addr
6799
6800        # test that file descriptors are connected
6801        for index, fds in enumerate(pipes):
6802            rfd, wfd = fds
6803            os.write(wfd, str(index).encode())
6804
6805        for index, rfd in enumerate(fds2):
6806            data = os.read(rfd, 100)
6807            self.assertEqual(data,  str(index).encode())
6808
6809
6810def setUpModule():
6811    thread_info = threading_helper.threading_setup()
6812    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
6813
6814
6815if __name__ == "__main__":
6816    unittest.main()
6817