• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from test.support import os_helper
4from test.support import socket_helper
5from test.support import threading_helper
6
7import errno
8import io
9import itertools
10import socket
11import select
12import tempfile
13import time
14import traceback
15import queue
16import sys
17import os
18import platform
19import array
20import contextlib
21from weakref import proxy
22import signal
23import math
24import pickle
25import struct
26import random
27import shutil
28import string
29import _thread as thread
30import threading
31try:
32    import multiprocessing
33except ImportError:
34    multiprocessing = False
35try:
36    import fcntl
37except ImportError:
38    fcntl = None
39
40HOST = socket_helper.HOST
41# test unicode string and carriage return
42MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
43
44VSOCKPORT = 1234
45AIX = platform.system() == "AIX"
46
47try:
48    import _socket
49except ImportError:
50    _socket = None
51
52def get_cid():
53    if fcntl is None:
54        return None
55    if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'):
56        return None
57    try:
58        with open("/dev/vsock", "rb") as f:
59            r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, "    ")
60    except OSError:
61        return None
62    else:
63        return struct.unpack("I", r)[0]
64
65def _have_socket_can():
66    """Check whether CAN sockets are supported on this host."""
67    try:
68        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
69    except (AttributeError, OSError):
70        return False
71    else:
72        s.close()
73    return True
74
75def _have_socket_can_isotp():
76    """Check whether CAN ISOTP sockets are supported on this host."""
77    try:
78        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
79    except (AttributeError, OSError):
80        return False
81    else:
82        s.close()
83    return True
84
85def _have_socket_can_j1939():
86    """Check whether CAN J1939 sockets are supported on this host."""
87    try:
88        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939)
89    except (AttributeError, OSError):
90        return False
91    else:
92        s.close()
93    return True
94
95def _have_socket_rds():
96    """Check whether RDS sockets are supported on this host."""
97    try:
98        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
99    except (AttributeError, OSError):
100        return False
101    else:
102        s.close()
103    return True
104
105def _have_socket_alg():
106    """Check whether AF_ALG sockets are supported on this host."""
107    try:
108        s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
109    except (AttributeError, OSError):
110        return False
111    else:
112        s.close()
113    return True
114
115def _have_socket_qipcrtr():
116    """Check whether AF_QIPCRTR sockets are supported on this host."""
117    try:
118        s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0)
119    except (AttributeError, OSError):
120        return False
121    else:
122        s.close()
123    return True
124
125def _have_socket_vsock():
126    """Check whether AF_VSOCK sockets are supported on this host."""
127    ret = get_cid() is not None
128    return ret
129
130
131def _have_socket_bluetooth():
132    """Check whether AF_BLUETOOTH sockets are supported on this host."""
133    try:
134        # RFCOMM is supported by all platforms with bluetooth support. Windows
135        # does not support omitting the protocol.
136        s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
137    except (AttributeError, OSError):
138        return False
139    else:
140        s.close()
141    return True
142
143
144@contextlib.contextmanager
145def socket_setdefaulttimeout(timeout):
146    old_timeout = socket.getdefaulttimeout()
147    try:
148        socket.setdefaulttimeout(timeout)
149        yield
150    finally:
151        socket.setdefaulttimeout(old_timeout)
152
153
154HAVE_SOCKET_CAN = _have_socket_can()
155
156HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
157
158HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939()
159
160HAVE_SOCKET_RDS = _have_socket_rds()
161
162HAVE_SOCKET_ALG = _have_socket_alg()
163
164HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr()
165
166HAVE_SOCKET_VSOCK = _have_socket_vsock()
167
168HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE")
169
170HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
171
172# Size in bytes of the int type
173SIZEOF_INT = array.array("i").itemsize
174
175class SocketTCPTest(unittest.TestCase):
176
177    def setUp(self):
178        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
179        self.port = socket_helper.bind_port(self.serv)
180        self.serv.listen()
181
182    def tearDown(self):
183        self.serv.close()
184        self.serv = None
185
186class SocketUDPTest(unittest.TestCase):
187
188    def setUp(self):
189        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
190        self.port = socket_helper.bind_port(self.serv)
191
192    def tearDown(self):
193        self.serv.close()
194        self.serv = None
195
196class SocketUDPLITETest(SocketUDPTest):
197
198    def setUp(self):
199        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
200        self.port = socket_helper.bind_port(self.serv)
201
202class ThreadSafeCleanupTestCase:
203    """Subclass of unittest.TestCase with thread-safe cleanup methods.
204
205    This subclass protects the addCleanup() and doCleanups() methods
206    with a recursive lock.
207    """
208
209    def __init__(self, *args, **kwargs):
210        super().__init__(*args, **kwargs)
211        self._cleanup_lock = threading.RLock()
212
213    def addCleanup(self, *args, **kwargs):
214        with self._cleanup_lock:
215            return super().addCleanup(*args, **kwargs)
216
217    def doCleanups(self, *args, **kwargs):
218        with self._cleanup_lock:
219            return super().doCleanups(*args, **kwargs)
220
221class SocketCANTest(unittest.TestCase):
222
223    """To be able to run this test, a `vcan0` CAN interface can be created with
224    the following commands:
225    # modprobe vcan
226    # ip link add dev vcan0 type vcan
227    # ip link set up vcan0
228    """
229    interface = 'vcan0'
230    bufsize = 128
231
232    """The CAN frame structure is defined in <linux/can.h>:
233
234    struct can_frame {
235        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
236        __u8    can_dlc; /* data length code: 0 .. 8 */
237        __u8    data[8] __attribute__((aligned(8)));
238    };
239    """
240    can_frame_fmt = "=IB3x8s"
241    can_frame_size = struct.calcsize(can_frame_fmt)
242
243    """The Broadcast Management Command frame structure is defined
244    in <linux/can/bcm.h>:
245
246    struct bcm_msg_head {
247        __u32 opcode;
248        __u32 flags;
249        __u32 count;
250        struct timeval ival1, ival2;
251        canid_t can_id;
252        __u32 nframes;
253        struct can_frame frames[0];
254    }
255
256    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
257    `struct can_frame` definition). Must use native not standard types for packing.
258    """
259    bcm_cmd_msg_fmt = "@3I4l2I"
260    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
261
262    def setUp(self):
263        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
264        self.addCleanup(self.s.close)
265        try:
266            self.s.bind((self.interface,))
267        except OSError:
268            self.skipTest('network interface `%s` does not exist' %
269                           self.interface)
270
271
272class SocketRDSTest(unittest.TestCase):
273
274    """To be able to run this test, the `rds` kernel module must be loaded:
275    # modprobe rds
276    """
277    bufsize = 8192
278
279    def setUp(self):
280        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
281        self.addCleanup(self.serv.close)
282        try:
283            self.port = socket_helper.bind_port(self.serv)
284        except OSError:
285            self.skipTest('unable to bind RDS socket')
286
287
288class ThreadableTest:
289    """Threadable Test class
290
291    The ThreadableTest class makes it easy to create a threaded
292    client/server pair from an existing unit test. To create a
293    new threaded class from an existing unit test, use multiple
294    inheritance:
295
296        class NewClass (OldClass, ThreadableTest):
297            pass
298
299    This class defines two new fixture functions with obvious
300    purposes for overriding:
301
302        clientSetUp ()
303        clientTearDown ()
304
305    Any new test functions within the class must then define
306    tests in pairs, where the test name is preceded with a
307    '_' to indicate the client portion of the test. Ex:
308
309        def testFoo(self):
310            # Server portion
311
312        def _testFoo(self):
313            # Client portion
314
315    Any exceptions raised by the clients during their tests
316    are caught and transferred to the main thread to alert
317    the testing framework.
318
319    Note, the server setup function cannot call any blocking
320    functions that rely on the client thread during setup,
321    unless serverExplicitReady() is called just before
322    the blocking call (such as in setting up a client/server
323    connection and performing the accept() in setUp().
324    """
325
326    def __init__(self):
327        # Swap the true setup function
328        self.__setUp = self.setUp
329        self.setUp = self._setUp
330
331    def serverExplicitReady(self):
332        """This method allows the server to explicitly indicate that
333        it wants the client thread to proceed. This is useful if the
334        server is about to execute a blocking routine that is
335        dependent upon the client thread during its setup routine."""
336        self.server_ready.set()
337
338    def _setUp(self):
339        self.wait_threads = threading_helper.wait_threads_exit()
340        self.wait_threads.__enter__()
341        self.addCleanup(self.wait_threads.__exit__, None, None, None)
342
343        self.server_ready = threading.Event()
344        self.client_ready = threading.Event()
345        self.done = threading.Event()
346        self.queue = queue.Queue(1)
347        self.server_crashed = False
348
349        def raise_queued_exception():
350            if self.queue.qsize():
351                raise self.queue.get()
352        self.addCleanup(raise_queued_exception)
353
354        # Do some munging to start the client test.
355        methodname = self.id()
356        i = methodname.rfind('.')
357        methodname = methodname[i+1:]
358        test_method = getattr(self, '_' + methodname)
359        self.client_thread = thread.start_new_thread(
360            self.clientRun, (test_method,))
361
362        try:
363            self.__setUp()
364        except:
365            self.server_crashed = True
366            raise
367        finally:
368            self.server_ready.set()
369        self.client_ready.wait()
370        self.addCleanup(self.done.wait)
371
372    def clientRun(self, test_func):
373        self.server_ready.wait()
374        try:
375            self.clientSetUp()
376        except BaseException as e:
377            self.queue.put(e)
378            self.clientTearDown()
379            return
380        finally:
381            self.client_ready.set()
382        if self.server_crashed:
383            self.clientTearDown()
384            return
385        if not hasattr(test_func, '__call__'):
386            raise TypeError("test_func must be a callable function")
387        try:
388            test_func()
389        except BaseException as e:
390            self.queue.put(e)
391        finally:
392            self.clientTearDown()
393
394    def clientSetUp(self):
395        raise NotImplementedError("clientSetUp must be implemented.")
396
397    def clientTearDown(self):
398        self.done.set()
399        thread.exit()
400
401class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
402
403    def __init__(self, methodName='runTest'):
404        SocketTCPTest.__init__(self, methodName=methodName)
405        ThreadableTest.__init__(self)
406
407    def clientSetUp(self):
408        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
409
410    def clientTearDown(self):
411        self.cli.close()
412        self.cli = None
413        ThreadableTest.clientTearDown(self)
414
415class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
416
417    def __init__(self, methodName='runTest'):
418        SocketUDPTest.__init__(self, methodName=methodName)
419        ThreadableTest.__init__(self)
420
421    def clientSetUp(self):
422        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
423
424    def clientTearDown(self):
425        self.cli.close()
426        self.cli = None
427        ThreadableTest.clientTearDown(self)
428
429@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
430          'UDPLITE sockets required for this test.')
431class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest):
432
433    def __init__(self, methodName='runTest'):
434        SocketUDPLITETest.__init__(self, methodName=methodName)
435        ThreadableTest.__init__(self)
436
437    def clientSetUp(self):
438        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
439
440    def clientTearDown(self):
441        self.cli.close()
442        self.cli = None
443        ThreadableTest.clientTearDown(self)
444
445class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
446
447    def __init__(self, methodName='runTest'):
448        SocketCANTest.__init__(self, methodName=methodName)
449        ThreadableTest.__init__(self)
450
451    def clientSetUp(self):
452        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
453        try:
454            self.cli.bind((self.interface,))
455        except OSError:
456            # skipTest should not be called here, and will be called in the
457            # server instead
458            pass
459
460    def clientTearDown(self):
461        self.cli.close()
462        self.cli = None
463        ThreadableTest.clientTearDown(self)
464
465class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
466
467    def __init__(self, methodName='runTest'):
468        SocketRDSTest.__init__(self, methodName=methodName)
469        ThreadableTest.__init__(self)
470
471    def clientSetUp(self):
472        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
473        try:
474            # RDS sockets must be bound explicitly to send or receive data
475            self.cli.bind((HOST, 0))
476            self.cli_addr = self.cli.getsockname()
477        except OSError:
478            # skipTest should not be called here, and will be called in the
479            # server instead
480            pass
481
482    def clientTearDown(self):
483        self.cli.close()
484        self.cli = None
485        ThreadableTest.clientTearDown(self)
486
487@unittest.skipIf(fcntl is None, "need fcntl")
488@unittest.skipUnless(HAVE_SOCKET_VSOCK,
489          'VSOCK sockets required for this test.')
490@unittest.skipUnless(get_cid() != 2,
491          "This test can only be run on a virtual guest.")
492class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest):
493
494    def __init__(self, methodName='runTest'):
495        unittest.TestCase.__init__(self, methodName=methodName)
496        ThreadableTest.__init__(self)
497
498    def setUp(self):
499        self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
500        self.addCleanup(self.serv.close)
501        self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT))
502        self.serv.listen()
503        self.serverExplicitReady()
504        self.conn, self.connaddr = self.serv.accept()
505        self.addCleanup(self.conn.close)
506
507    def clientSetUp(self):
508        time.sleep(0.1)
509        self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
510        self.addCleanup(self.cli.close)
511        cid = get_cid()
512        self.cli.connect((cid, VSOCKPORT))
513
514    def testStream(self):
515        msg = self.conn.recv(1024)
516        self.assertEqual(msg, MSG)
517
518    def _testStream(self):
519        self.cli.send(MSG)
520        self.cli.close()
521
522class SocketConnectedTest(ThreadedTCPSocketTest):
523    """Socket tests for client-server connection.
524
525    self.cli_conn is a client socket connected to the server.  The
526    setUp() method guarantees that it is connected to the server.
527    """
528
529    def __init__(self, methodName='runTest'):
530        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
531
532    def setUp(self):
533        ThreadedTCPSocketTest.setUp(self)
534        # Indicate explicitly we're ready for the client thread to
535        # proceed and then perform the blocking call to accept
536        self.serverExplicitReady()
537        conn, addr = self.serv.accept()
538        self.cli_conn = conn
539
540    def tearDown(self):
541        self.cli_conn.close()
542        self.cli_conn = None
543        ThreadedTCPSocketTest.tearDown(self)
544
545    def clientSetUp(self):
546        ThreadedTCPSocketTest.clientSetUp(self)
547        self.cli.connect((HOST, self.port))
548        self.serv_conn = self.cli
549
550    def clientTearDown(self):
551        self.serv_conn.close()
552        self.serv_conn = None
553        ThreadedTCPSocketTest.clientTearDown(self)
554
555class SocketPairTest(unittest.TestCase, ThreadableTest):
556
557    def __init__(self, methodName='runTest'):
558        unittest.TestCase.__init__(self, methodName=methodName)
559        ThreadableTest.__init__(self)
560
561    def setUp(self):
562        self.serv, self.cli = socket.socketpair()
563
564    def tearDown(self):
565        self.serv.close()
566        self.serv = None
567
568    def clientSetUp(self):
569        pass
570
571    def clientTearDown(self):
572        self.cli.close()
573        self.cli = None
574        ThreadableTest.clientTearDown(self)
575
576
577# The following classes are used by the sendmsg()/recvmsg() tests.
578# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
579# gives a drop-in replacement for SocketConnectedTest, but different
580# address families can be used, and the attributes serv_addr and
581# cli_addr will be set to the addresses of the endpoints.
582
583class SocketTestBase(unittest.TestCase):
584    """A base class for socket tests.
585
586    Subclasses must provide methods newSocket() to return a new socket
587    and bindSock(sock) to bind it to an unused address.
588
589    Creates a socket self.serv and sets self.serv_addr to its address.
590    """
591
592    def setUp(self):
593        self.serv = self.newSocket()
594        self.bindServer()
595
596    def bindServer(self):
597        """Bind server socket and set self.serv_addr to its address."""
598        self.bindSock(self.serv)
599        self.serv_addr = self.serv.getsockname()
600
601    def tearDown(self):
602        self.serv.close()
603        self.serv = None
604
605
606class SocketListeningTestMixin(SocketTestBase):
607    """Mixin to listen on the server socket."""
608
609    def setUp(self):
610        super().setUp()
611        self.serv.listen()
612
613
614class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
615                              ThreadableTest):
616    """Mixin to add client socket and allow client/server tests.
617
618    Client socket is self.cli and its address is self.cli_addr.  See
619    ThreadableTest for usage information.
620    """
621
622    def __init__(self, *args, **kwargs):
623        super().__init__(*args, **kwargs)
624        ThreadableTest.__init__(self)
625
626    def clientSetUp(self):
627        self.cli = self.newClientSocket()
628        self.bindClient()
629
630    def newClientSocket(self):
631        """Return a new socket for use as client."""
632        return self.newSocket()
633
634    def bindClient(self):
635        """Bind client socket and set self.cli_addr to its address."""
636        self.bindSock(self.cli)
637        self.cli_addr = self.cli.getsockname()
638
639    def clientTearDown(self):
640        self.cli.close()
641        self.cli = None
642        ThreadableTest.clientTearDown(self)
643
644
645class ConnectedStreamTestMixin(SocketListeningTestMixin,
646                               ThreadedSocketTestMixin):
647    """Mixin to allow client/server stream tests with connected client.
648
649    Server's socket representing connection to client is self.cli_conn
650    and client's connection to server is self.serv_conn.  (Based on
651    SocketConnectedTest.)
652    """
653
654    def setUp(self):
655        super().setUp()
656        # Indicate explicitly we're ready for the client thread to
657        # proceed and then perform the blocking call to accept
658        self.serverExplicitReady()
659        conn, addr = self.serv.accept()
660        self.cli_conn = conn
661
662    def tearDown(self):
663        self.cli_conn.close()
664        self.cli_conn = None
665        super().tearDown()
666
667    def clientSetUp(self):
668        super().clientSetUp()
669        self.cli.connect(self.serv_addr)
670        self.serv_conn = self.cli
671
672    def clientTearDown(self):
673        try:
674            self.serv_conn.close()
675            self.serv_conn = None
676        except AttributeError:
677            pass
678        super().clientTearDown()
679
680
681class UnixSocketTestBase(SocketTestBase):
682    """Base class for Unix-domain socket tests."""
683
684    # This class is used for file descriptor passing tests, so we
685    # create the sockets in a private directory so that other users
686    # can't send anything that might be problematic for a privileged
687    # user running the tests.
688
689    def setUp(self):
690        self.dir_path = tempfile.mkdtemp()
691        self.addCleanup(os.rmdir, self.dir_path)
692        super().setUp()
693
694    def bindSock(self, sock):
695        path = tempfile.mktemp(dir=self.dir_path)
696        socket_helper.bind_unix_socket(sock, path)
697        self.addCleanup(os_helper.unlink, path)
698
699class UnixStreamBase(UnixSocketTestBase):
700    """Base class for Unix-domain SOCK_STREAM tests."""
701
702    def newSocket(self):
703        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
704
705
706class InetTestBase(SocketTestBase):
707    """Base class for IPv4 socket tests."""
708
709    host = HOST
710
711    def setUp(self):
712        super().setUp()
713        self.port = self.serv_addr[1]
714
715    def bindSock(self, sock):
716        socket_helper.bind_port(sock, host=self.host)
717
718class TCPTestBase(InetTestBase):
719    """Base class for TCP-over-IPv4 tests."""
720
721    def newSocket(self):
722        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
723
724class UDPTestBase(InetTestBase):
725    """Base class for UDP-over-IPv4 tests."""
726
727    def newSocket(self):
728        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
729
730class UDPLITETestBase(InetTestBase):
731    """Base class for UDPLITE-over-IPv4 tests."""
732
733    def newSocket(self):
734        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
735
736class SCTPStreamBase(InetTestBase):
737    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
738
739    def newSocket(self):
740        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
741                             socket.IPPROTO_SCTP)
742
743
744class Inet6TestBase(InetTestBase):
745    """Base class for IPv6 socket tests."""
746
747    host = socket_helper.HOSTv6
748
749class UDP6TestBase(Inet6TestBase):
750    """Base class for UDP-over-IPv6 tests."""
751
752    def newSocket(self):
753        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
754
755class UDPLITE6TestBase(Inet6TestBase):
756    """Base class for UDPLITE-over-IPv6 tests."""
757
758    def newSocket(self):
759        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
760
761
762# Test-skipping decorators for use with ThreadableTest.
763
764def skipWithClientIf(condition, reason):
765    """Skip decorated test if condition is true, add client_skip decorator.
766
767    If the decorated object is not a class, sets its attribute
768    "client_skip" to a decorator which will return an empty function
769    if the test is to be skipped, or the original function if it is
770    not.  This can be used to avoid running the client part of a
771    skipped test when using ThreadableTest.
772    """
773    def client_pass(*args, **kwargs):
774        pass
775    def skipdec(obj):
776        retval = unittest.skip(reason)(obj)
777        if not isinstance(obj, type):
778            retval.client_skip = lambda f: client_pass
779        return retval
780    def noskipdec(obj):
781        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
782            obj.client_skip = lambda f: f
783        return obj
784    return skipdec if condition else noskipdec
785
786
787def requireAttrs(obj, *attributes):
788    """Skip decorated test if obj is missing any of the given attributes.
789
790    Sets client_skip attribute as skipWithClientIf() does.
791    """
792    missing = [name for name in attributes if not hasattr(obj, name)]
793    return skipWithClientIf(
794        missing, "don't have " + ", ".join(name for name in missing))
795
796
797def requireSocket(*args):
798    """Skip decorated test if a socket cannot be created with given arguments.
799
800    When an argument is given as a string, will use the value of that
801    attribute of the socket module, or skip the test if it doesn't
802    exist.  Sets client_skip attribute as skipWithClientIf() does.
803    """
804    err = None
805    missing = [obj for obj in args if
806               isinstance(obj, str) and not hasattr(socket, obj)]
807    if missing:
808        err = "don't have " + ", ".join(name for name in missing)
809    else:
810        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
811                    for obj in args]
812        try:
813            s = socket.socket(*callargs)
814        except OSError as e:
815            # XXX: check errno?
816            err = str(e)
817        else:
818            s.close()
819    return skipWithClientIf(
820        err is not None,
821        "can't create socket({0}): {1}".format(
822            ", ".join(str(o) for o in args), err))
823
824
825#######################################################################
826## Begin Tests
827
828class GeneralModuleTests(unittest.TestCase):
829
830    def test_SocketType_is_socketobject(self):
831        import _socket
832        self.assertTrue(socket.SocketType is _socket.socket)
833        s = socket.socket()
834        self.assertIsInstance(s, socket.SocketType)
835        s.close()
836
837    def test_repr(self):
838        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
839        with s:
840            self.assertIn('fd=%i' % s.fileno(), repr(s))
841            self.assertIn('family=%s' % socket.AF_INET, repr(s))
842            self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
843            self.assertIn('proto=0', repr(s))
844            self.assertNotIn('raddr', repr(s))
845            s.bind(('127.0.0.1', 0))
846            self.assertIn('laddr', repr(s))
847            self.assertIn(str(s.getsockname()), repr(s))
848        self.assertIn('[closed]', repr(s))
849        self.assertNotIn('laddr', repr(s))
850
851    @unittest.skipUnless(_socket is not None, 'need _socket module')
852    def test_csocket_repr(self):
853        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
854        try:
855            expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
856                        % (s.fileno(), s.family, s.type, s.proto))
857            self.assertEqual(repr(s), expected)
858        finally:
859            s.close()
860        expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
861                    % (s.family, s.type, s.proto))
862        self.assertEqual(repr(s), expected)
863
864    def test_weakref(self):
865        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
866            p = proxy(s)
867            self.assertEqual(p.fileno(), s.fileno())
868        s = None
869        support.gc_collect()  # For PyPy or other GCs.
870        try:
871            p.fileno()
872        except ReferenceError:
873            pass
874        else:
875            self.fail('Socket proxy still exists')
876
877    def testSocketError(self):
878        # Testing socket module exceptions
879        msg = "Error raising socket exception (%s)."
880        with self.assertRaises(OSError, msg=msg % 'OSError'):
881            raise OSError
882        with self.assertRaises(OSError, msg=msg % 'socket.herror'):
883            raise socket.herror
884        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
885            raise socket.gaierror
886
887    def testSendtoErrors(self):
888        # Testing that sendto doesn't mask failures. See #10169.
889        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
890        self.addCleanup(s.close)
891        s.bind(('', 0))
892        sockname = s.getsockname()
893        # 2 args
894        with self.assertRaises(TypeError) as cm:
895            s.sendto('\u2620', sockname)
896        self.assertEqual(str(cm.exception),
897                         "a bytes-like object is required, not 'str'")
898        with self.assertRaises(TypeError) as cm:
899            s.sendto(5j, sockname)
900        self.assertEqual(str(cm.exception),
901                         "a bytes-like object is required, not 'complex'")
902        with self.assertRaises(TypeError) as cm:
903            s.sendto(b'foo', None)
904        self.assertIn('not NoneType',str(cm.exception))
905        # 3 args
906        with self.assertRaises(TypeError) as cm:
907            s.sendto('\u2620', 0, sockname)
908        self.assertEqual(str(cm.exception),
909                         "a bytes-like object is required, not 'str'")
910        with self.assertRaises(TypeError) as cm:
911            s.sendto(5j, 0, sockname)
912        self.assertEqual(str(cm.exception),
913                         "a bytes-like object is required, not 'complex'")
914        with self.assertRaises(TypeError) as cm:
915            s.sendto(b'foo', 0, None)
916        self.assertIn('not NoneType', str(cm.exception))
917        with self.assertRaises(TypeError) as cm:
918            s.sendto(b'foo', 'bar', sockname)
919        with self.assertRaises(TypeError) as cm:
920            s.sendto(b'foo', None, None)
921        # wrong number of args
922        with self.assertRaises(TypeError) as cm:
923            s.sendto(b'foo')
924        self.assertIn('(1 given)', str(cm.exception))
925        with self.assertRaises(TypeError) as cm:
926            s.sendto(b'foo', 0, sockname, 4)
927        self.assertIn('(4 given)', str(cm.exception))
928
929    def testCrucialConstants(self):
930        # Testing for mission critical constants
931        socket.AF_INET
932        if socket.has_ipv6:
933            socket.AF_INET6
934        socket.SOCK_STREAM
935        socket.SOCK_DGRAM
936        socket.SOCK_RAW
937        socket.SOCK_RDM
938        socket.SOCK_SEQPACKET
939        socket.SOL_SOCKET
940        socket.SO_REUSEADDR
941
942    def testCrucialIpProtoConstants(self):
943        socket.IPPROTO_TCP
944        socket.IPPROTO_UDP
945        if socket.has_ipv6:
946            socket.IPPROTO_IPV6
947
948    @unittest.skipUnless(os.name == "nt", "Windows specific")
949    def testWindowsSpecificConstants(self):
950        socket.IPPROTO_ICLFXBM
951        socket.IPPROTO_ST
952        socket.IPPROTO_CBT
953        socket.IPPROTO_IGP
954        socket.IPPROTO_RDP
955        socket.IPPROTO_PGM
956        socket.IPPROTO_L2TP
957        socket.IPPROTO_SCTP
958
959    @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
960    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
961    def test3542SocketOptions(self):
962        # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542
963        opts = {
964            'IPV6_CHECKSUM',
965            'IPV6_DONTFRAG',
966            'IPV6_DSTOPTS',
967            'IPV6_HOPLIMIT',
968            'IPV6_HOPOPTS',
969            'IPV6_NEXTHOP',
970            'IPV6_PATHMTU',
971            'IPV6_PKTINFO',
972            'IPV6_RECVDSTOPTS',
973            'IPV6_RECVHOPLIMIT',
974            'IPV6_RECVHOPOPTS',
975            'IPV6_RECVPATHMTU',
976            'IPV6_RECVPKTINFO',
977            'IPV6_RECVRTHDR',
978            'IPV6_RECVTCLASS',
979            'IPV6_RTHDR',
980            'IPV6_RTHDRDSTOPTS',
981            'IPV6_RTHDR_TYPE_0',
982            'IPV6_TCLASS',
983            'IPV6_USE_MIN_MTU',
984        }
985        for opt in opts:
986            self.assertTrue(
987                hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'"
988            )
989
990    def testHostnameRes(self):
991        # Testing hostname resolution mechanisms
992        hostname = socket.gethostname()
993        try:
994            ip = socket.gethostbyname(hostname)
995        except OSError:
996            # Probably name lookup wasn't set up right; skip this test
997            self.skipTest('name lookup failure')
998        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
999        try:
1000            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
1001        except OSError:
1002            # Probably a similar problem as above; skip this test
1003            self.skipTest('name lookup failure')
1004        all_host_names = [hostname, hname] + aliases
1005        fqhn = socket.getfqdn(ip)
1006        if not fqhn in all_host_names:
1007            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
1008
1009    def test_host_resolution(self):
1010        for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']:
1011            self.assertEqual(socket.gethostbyname(addr), addr)
1012
1013        # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have
1014        # a matching name entry (e.g. 'ip6-localhost')
1015        for host in [socket_helper.HOSTv4]:
1016            self.assertIn(host, socket.gethostbyaddr(host)[2])
1017
1018    def test_host_resolution_bad_address(self):
1019        # These are all malformed IP addresses and expected not to resolve to
1020        # any result.  But some ISPs, e.g. AWS, may successfully resolve these
1021        # IPs.
1022        explanation = (
1023            "resolving an invalid IP address did not raise OSError; "
1024            "can be caused by a broken DNS server"
1025        )
1026        for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
1027                     '1:1:1:1:1:1:1:1:1']:
1028            with self.assertRaises(OSError, msg=addr):
1029                socket.gethostbyname(addr)
1030            with self.assertRaises(OSError, msg=explanation):
1031                socket.gethostbyaddr(addr)
1032
1033    @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
1034    @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
1035    def test_sethostname(self):
1036        oldhn = socket.gethostname()
1037        try:
1038            socket.sethostname('new')
1039        except OSError as e:
1040            if e.errno == errno.EPERM:
1041                self.skipTest("test should be run as root")
1042            else:
1043                raise
1044        try:
1045            # running test as root!
1046            self.assertEqual(socket.gethostname(), 'new')
1047            # Should work with bytes objects too
1048            socket.sethostname(b'bar')
1049            self.assertEqual(socket.gethostname(), 'bar')
1050        finally:
1051            socket.sethostname(oldhn)
1052
1053    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
1054                         'socket.if_nameindex() not available.')
1055    def testInterfaceNameIndex(self):
1056        interfaces = socket.if_nameindex()
1057        for index, name in interfaces:
1058            self.assertIsInstance(index, int)
1059            self.assertIsInstance(name, str)
1060            # interface indices are non-zero integers
1061            self.assertGreater(index, 0)
1062            _index = socket.if_nametoindex(name)
1063            self.assertIsInstance(_index, int)
1064            self.assertEqual(index, _index)
1065            _name = socket.if_indextoname(index)
1066            self.assertIsInstance(_name, str)
1067            self.assertEqual(name, _name)
1068
1069    @unittest.skipUnless(hasattr(socket, 'if_indextoname'),
1070                         'socket.if_indextoname() not available.')
1071    def testInvalidInterfaceIndexToName(self):
1072        self.assertRaises(OSError, socket.if_indextoname, 0)
1073        self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
1074
1075    @unittest.skipUnless(hasattr(socket, 'if_nametoindex'),
1076                         'socket.if_nametoindex() not available.')
1077    def testInvalidInterfaceNameToIndex(self):
1078        self.assertRaises(TypeError, socket.if_nametoindex, 0)
1079        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
1080
1081    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
1082                         'test needs sys.getrefcount()')
1083    def testRefCountGetNameInfo(self):
1084        # Testing reference count for getnameinfo
1085        try:
1086            # On some versions, this loses a reference
1087            orig = sys.getrefcount(__name__)
1088            socket.getnameinfo(__name__,0)
1089        except TypeError:
1090            if sys.getrefcount(__name__) != orig:
1091                self.fail("socket.getnameinfo loses a reference")
1092
1093    def testInterpreterCrash(self):
1094        # Making sure getnameinfo doesn't crash the interpreter
1095        try:
1096            # On some versions, this crashes the interpreter.
1097            socket.getnameinfo(('x', 0, 0, 0), 0)
1098        except OSError:
1099            pass
1100
1101    def testNtoH(self):
1102        # This just checks that htons etc. are their own inverse,
1103        # when looking at the lower 16 or 32 bits.
1104        sizes = {socket.htonl: 32, socket.ntohl: 32,
1105                 socket.htons: 16, socket.ntohs: 16}
1106        for func, size in sizes.items():
1107            mask = (1<<size) - 1
1108            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
1109                self.assertEqual(i & mask, func(func(i&mask)) & mask)
1110
1111            swapped = func(mask)
1112            self.assertEqual(swapped & mask, mask)
1113            self.assertRaises(OverflowError, func, 1<<34)
1114
1115    @support.cpython_only
1116    def testNtoHErrors(self):
1117        import _testcapi
1118        s_good_values = [0, 1, 2, 0xffff]
1119        l_good_values = s_good_values + [0xffffffff]
1120        l_bad_values = [-1, -2, 1<<32, 1<<1000]
1121        s_bad_values = (
1122            l_bad_values +
1123            [_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] +
1124            [1 << 16, _testcapi.INT_MAX]
1125        )
1126        for k in s_good_values:
1127            socket.ntohs(k)
1128            socket.htons(k)
1129        for k in l_good_values:
1130            socket.ntohl(k)
1131            socket.htonl(k)
1132        for k in s_bad_values:
1133            self.assertRaises(OverflowError, socket.ntohs, k)
1134            self.assertRaises(OverflowError, socket.htons, k)
1135        for k in l_bad_values:
1136            self.assertRaises(OverflowError, socket.ntohl, k)
1137            self.assertRaises(OverflowError, socket.htonl, k)
1138
1139    def testGetServBy(self):
1140        eq = self.assertEqual
1141        # Find one service that exists, then check all the related interfaces.
1142        # I've ordered this by protocols that have both a tcp and udp
1143        # protocol, at least for modern Linuxes.
1144        if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
1145            or sys.platform in ('linux', 'darwin')):
1146            # avoid the 'echo' service on this platform, as there is an
1147            # assumption breaking non-standard port/protocol entry
1148            services = ('daytime', 'qotd', 'domain')
1149        else:
1150            services = ('echo', 'daytime', 'domain')
1151        for service in services:
1152            try:
1153                port = socket.getservbyname(service, 'tcp')
1154                break
1155            except OSError:
1156                pass
1157        else:
1158            raise OSError
1159        # Try same call with optional protocol omitted
1160        # Issue #26936: Android getservbyname() was broken before API 23.
1161        if (not hasattr(sys, 'getandroidapilevel') or
1162                sys.getandroidapilevel() >= 23):
1163            port2 = socket.getservbyname(service)
1164            eq(port, port2)
1165        # Try udp, but don't barf if it doesn't exist
1166        try:
1167            udpport = socket.getservbyname(service, 'udp')
1168        except OSError:
1169            udpport = None
1170        else:
1171            eq(udpport, port)
1172        # Now make sure the lookup by port returns the same service name
1173        # Issue #26936: Android getservbyport() is broken.
1174        if not support.is_android:
1175            eq(socket.getservbyport(port2), service)
1176        eq(socket.getservbyport(port, 'tcp'), service)
1177        if udpport is not None:
1178            eq(socket.getservbyport(udpport, 'udp'), service)
1179        # Make sure getservbyport does not accept out of range ports.
1180        self.assertRaises(OverflowError, socket.getservbyport, -1)
1181        self.assertRaises(OverflowError, socket.getservbyport, 65536)
1182
1183    def testDefaultTimeout(self):
1184        # Testing default timeout
1185        # The default timeout should initially be None
1186        self.assertEqual(socket.getdefaulttimeout(), None)
1187        with socket.socket() as s:
1188            self.assertEqual(s.gettimeout(), None)
1189
1190        # Set the default timeout to 10, and see if it propagates
1191        with socket_setdefaulttimeout(10):
1192            self.assertEqual(socket.getdefaulttimeout(), 10)
1193            with socket.socket() as sock:
1194                self.assertEqual(sock.gettimeout(), 10)
1195
1196            # Reset the default timeout to None, and see if it propagates
1197            socket.setdefaulttimeout(None)
1198            self.assertEqual(socket.getdefaulttimeout(), None)
1199            with socket.socket() as sock:
1200                self.assertEqual(sock.gettimeout(), None)
1201
1202        # Check that setting it to an invalid value raises ValueError
1203        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
1204
1205        # Check that setting it to an invalid type raises TypeError
1206        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
1207
1208    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
1209                         'test needs socket.inet_aton()')
1210    def testIPv4_inet_aton_fourbytes(self):
1211        # Test that issue1008086 and issue767150 are fixed.
1212        # It must return 4 bytes.
1213        self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
1214        self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
1215
1216    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1217                         'test needs socket.inet_pton()')
1218    def testIPv4toString(self):
1219        from socket import inet_aton as f, inet_pton, AF_INET
1220        g = lambda a: inet_pton(AF_INET, a)
1221
1222        assertInvalid = lambda func,a: self.assertRaises(
1223            (OSError, ValueError), func, a
1224        )
1225
1226        self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
1227        self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1228        self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1229        self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1230        self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1231        # bpo-29972: inet_pton() doesn't fail on AIX
1232        if not AIX:
1233            assertInvalid(f, '0.0.0.')
1234        assertInvalid(f, '300.0.0.0')
1235        assertInvalid(f, 'a.0.0.0')
1236        assertInvalid(f, '1.2.3.4.5')
1237        assertInvalid(f, '::1')
1238
1239        self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1240        self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1241        self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1242        self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1243        assertInvalid(g, '0.0.0.')
1244        assertInvalid(g, '300.0.0.0')
1245        assertInvalid(g, 'a.0.0.0')
1246        assertInvalid(g, '1.2.3.4.5')
1247        assertInvalid(g, '::1')
1248
1249    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1250                         'test needs socket.inet_pton()')
1251    def testIPv6toString(self):
1252        try:
1253            from socket import inet_pton, AF_INET6, has_ipv6
1254            if not has_ipv6:
1255                self.skipTest('IPv6 not available')
1256        except ImportError:
1257            self.skipTest('could not import needed symbols from socket')
1258
1259        if sys.platform == "win32":
1260            try:
1261                inet_pton(AF_INET6, '::')
1262            except OSError as e:
1263                if e.winerror == 10022:
1264                    self.skipTest('IPv6 might not be supported')
1265
1266        f = lambda a: inet_pton(AF_INET6, a)
1267        assertInvalid = lambda a: self.assertRaises(
1268            (OSError, ValueError), f, a
1269        )
1270
1271        self.assertEqual(b'\x00' * 16, f('::'))
1272        self.assertEqual(b'\x00' * 16, f('0::0'))
1273        self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1274        self.assertEqual(
1275            b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1276            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1277        )
1278        self.assertEqual(
1279            b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1280            f('ad42:abc::127:0:254:2')
1281        )
1282        self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1283        assertInvalid('0x20::')
1284        assertInvalid(':::')
1285        assertInvalid('::0::')
1286        assertInvalid('1::abc::')
1287        assertInvalid('1::abc::def')
1288        assertInvalid('1:2:3:4:5:6')
1289        assertInvalid('1:2:3:4:5:6:')
1290        assertInvalid('1:2:3:4:5:6:7:8:0')
1291        # bpo-29972: inet_pton() doesn't fail on AIX
1292        if not AIX:
1293            assertInvalid('1:2:3:4:5:6:7:8:')
1294
1295        self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1296            f('::254.42.23.64')
1297        )
1298        self.assertEqual(
1299            b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1300            f('42::a29b:254.42.23.64')
1301        )
1302        self.assertEqual(
1303            b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1304            f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1305        )
1306        assertInvalid('255.254.253.252')
1307        assertInvalid('1::260.2.3.0')
1308        assertInvalid('1::0.be.e.0')
1309        assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1310        assertInvalid('::1.2.3.4:0')
1311        assertInvalid('0.100.200.0:3:4:5:6:7:8')
1312
1313    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1314                         'test needs socket.inet_ntop()')
1315    def testStringToIPv4(self):
1316        from socket import inet_ntoa as f, inet_ntop, AF_INET
1317        g = lambda a: inet_ntop(AF_INET, a)
1318        assertInvalid = lambda func,a: self.assertRaises(
1319            (OSError, ValueError), func, a
1320        )
1321
1322        self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1323        self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1324        self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1325        self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1326        assertInvalid(f, b'\x00' * 3)
1327        assertInvalid(f, b'\x00' * 5)
1328        assertInvalid(f, b'\x00' * 16)
1329        self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1330
1331        self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1332        self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1333        self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1334        assertInvalid(g, b'\x00' * 3)
1335        assertInvalid(g, b'\x00' * 5)
1336        assertInvalid(g, b'\x00' * 16)
1337        self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1338
1339    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1340                         'test needs socket.inet_ntop()')
1341    def testStringToIPv6(self):
1342        try:
1343            from socket import inet_ntop, AF_INET6, has_ipv6
1344            if not has_ipv6:
1345                self.skipTest('IPv6 not available')
1346        except ImportError:
1347            self.skipTest('could not import needed symbols from socket')
1348
1349        if sys.platform == "win32":
1350            try:
1351                inet_ntop(AF_INET6, b'\x00' * 16)
1352            except OSError as e:
1353                if e.winerror == 10022:
1354                    self.skipTest('IPv6 might not be supported')
1355
1356        f = lambda a: inet_ntop(AF_INET6, a)
1357        assertInvalid = lambda a: self.assertRaises(
1358            (OSError, ValueError), f, a
1359        )
1360
1361        self.assertEqual('::', f(b'\x00' * 16))
1362        self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1363        self.assertEqual(
1364            'aef:b01:506:1001:ffff:9997:55:170',
1365            f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1366        )
1367        self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1368
1369        assertInvalid(b'\x12' * 15)
1370        assertInvalid(b'\x12' * 17)
1371        assertInvalid(b'\x12' * 4)
1372
1373    # XXX The following don't test module-level functionality...
1374
1375    def testSockName(self):
1376        # Testing getsockname()
1377        port = socket_helper.find_unused_port()
1378        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1379        self.addCleanup(sock.close)
1380        sock.bind(("0.0.0.0", port))
1381        name = sock.getsockname()
1382        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1383        # it reasonable to get the host's addr in addition to 0.0.0.0.
1384        # At least for eCos.  This is required for the S/390 to pass.
1385        try:
1386            my_ip_addr = socket.gethostbyname(socket.gethostname())
1387        except OSError:
1388            # Probably name lookup wasn't set up right; skip this test
1389            self.skipTest('name lookup failure')
1390        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1391        self.assertEqual(name[1], port)
1392
1393    def testGetSockOpt(self):
1394        # Testing getsockopt()
1395        # We know a socket should start without reuse==0
1396        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1397        self.addCleanup(sock.close)
1398        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1399        self.assertFalse(reuse != 0, "initial mode is reuse")
1400
1401    def testSetSockOpt(self):
1402        # Testing setsockopt()
1403        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1404        self.addCleanup(sock.close)
1405        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1406        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1407        self.assertFalse(reuse == 0, "failed to set reuse mode")
1408
1409    def testSendAfterClose(self):
1410        # testing send() after close() with timeout
1411        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1412            sock.settimeout(1)
1413        self.assertRaises(OSError, sock.send, b"spam")
1414
1415    def testCloseException(self):
1416        sock = socket.socket()
1417        sock.bind((socket._LOCALHOST, 0))
1418        socket.socket(fileno=sock.fileno()).close()
1419        try:
1420            sock.close()
1421        except OSError as err:
1422            # Winsock apparently raises ENOTSOCK
1423            self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1424        else:
1425            self.fail("close() should raise EBADF/ENOTSOCK")
1426
1427    def testNewAttributes(self):
1428        # testing .family, .type and .protocol
1429
1430        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1431            self.assertEqual(sock.family, socket.AF_INET)
1432            if hasattr(socket, 'SOCK_CLOEXEC'):
1433                self.assertIn(sock.type,
1434                              (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1435                               socket.SOCK_STREAM))
1436            else:
1437                self.assertEqual(sock.type, socket.SOCK_STREAM)
1438            self.assertEqual(sock.proto, 0)
1439
1440    def test_getsockaddrarg(self):
1441        sock = socket.socket()
1442        self.addCleanup(sock.close)
1443        port = socket_helper.find_unused_port()
1444        big_port = port + 65536
1445        neg_port = port - 65536
1446        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1447        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1448        # Since find_unused_port() is inherently subject to race conditions, we
1449        # call it a couple times if necessary.
1450        for i in itertools.count():
1451            port = socket_helper.find_unused_port()
1452            try:
1453                sock.bind((HOST, port))
1454            except OSError as e:
1455                if e.errno != errno.EADDRINUSE or i == 5:
1456                    raise
1457            else:
1458                break
1459
1460    @unittest.skipUnless(os.name == "nt", "Windows specific")
1461    def test_sock_ioctl(self):
1462        self.assertTrue(hasattr(socket.socket, 'ioctl'))
1463        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1464        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1465        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1466        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1467        s = socket.socket()
1468        self.addCleanup(s.close)
1469        self.assertRaises(ValueError, s.ioctl, -1, None)
1470        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1471
1472    @unittest.skipUnless(os.name == "nt", "Windows specific")
1473    @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1474                         'Loopback fast path support required for this test')
1475    def test_sio_loopback_fast_path(self):
1476        s = socket.socket()
1477        self.addCleanup(s.close)
1478        try:
1479            s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1480        except OSError as exc:
1481            WSAEOPNOTSUPP = 10045
1482            if exc.winerror == WSAEOPNOTSUPP:
1483                self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1484                              "doesn't implemented in this Windows version")
1485            raise
1486        self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1487
1488    def testGetaddrinfo(self):
1489        try:
1490            socket.getaddrinfo('localhost', 80)
1491        except socket.gaierror as err:
1492            if err.errno == socket.EAI_SERVICE:
1493                # see http://bugs.python.org/issue1282647
1494                self.skipTest("buggy libc version")
1495            raise
1496        # len of every sequence is supposed to be == 5
1497        for info in socket.getaddrinfo(HOST, None):
1498            self.assertEqual(len(info), 5)
1499        # host can be a domain name, a string representation of an
1500        # IPv4/v6 address or None
1501        socket.getaddrinfo('localhost', 80)
1502        socket.getaddrinfo('127.0.0.1', 80)
1503        socket.getaddrinfo(None, 80)
1504        if socket_helper.IPV6_ENABLED:
1505            socket.getaddrinfo('::1', 80)
1506        # port can be a string service name such as "http", a numeric
1507        # port number or None
1508        # Issue #26936: Android getaddrinfo() was broken before API level 23.
1509        if (not hasattr(sys, 'getandroidapilevel') or
1510                sys.getandroidapilevel() >= 23):
1511            socket.getaddrinfo(HOST, "http")
1512        socket.getaddrinfo(HOST, 80)
1513        socket.getaddrinfo(HOST, None)
1514        # test family and socktype filters
1515        infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1516        for family, type, _, _, _ in infos:
1517            self.assertEqual(family, socket.AF_INET)
1518            self.assertEqual(str(family), 'AddressFamily.AF_INET')
1519            self.assertEqual(type, socket.SOCK_STREAM)
1520            self.assertEqual(str(type), 'SocketKind.SOCK_STREAM')
1521        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1522        for _, socktype, _, _, _ in infos:
1523            self.assertEqual(socktype, socket.SOCK_STREAM)
1524        # test proto and flags arguments
1525        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1526        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1527        # a server willing to support both IPv4 and IPv6 will
1528        # usually do this
1529        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1530                           socket.AI_PASSIVE)
1531        # test keyword arguments
1532        a = socket.getaddrinfo(HOST, None)
1533        b = socket.getaddrinfo(host=HOST, port=None)
1534        self.assertEqual(a, b)
1535        a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1536        b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1537        self.assertEqual(a, b)
1538        a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1539        b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1540        self.assertEqual(a, b)
1541        a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1542        b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1543        self.assertEqual(a, b)
1544        a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1545        b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1546        self.assertEqual(a, b)
1547        a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1548                               socket.AI_PASSIVE)
1549        b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1550                               type=socket.SOCK_STREAM, proto=0,
1551                               flags=socket.AI_PASSIVE)
1552        self.assertEqual(a, b)
1553        # Issue #6697.
1554        self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1555
1556        # Issue 17269: test workaround for OS X platform bug segfault
1557        if hasattr(socket, 'AI_NUMERICSERV'):
1558            try:
1559                # The arguments here are undefined and the call may succeed
1560                # or fail.  All we care here is that it doesn't segfault.
1561                socket.getaddrinfo("localhost", None, 0, 0, 0,
1562                                   socket.AI_NUMERICSERV)
1563            except socket.gaierror:
1564                pass
1565
1566    def test_getnameinfo(self):
1567        # only IP addresses are allowed
1568        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1569
1570    @unittest.skipUnless(support.is_resource_enabled('network'),
1571                         'network is not enabled')
1572    def test_idna(self):
1573        # Check for internet access before running test
1574        # (issue #12804, issue #25138).
1575        with socket_helper.transient_internet('python.org'):
1576            socket.gethostbyname('python.org')
1577
1578        # these should all be successful
1579        domain = 'испытание.pythontest.net'
1580        socket.gethostbyname(domain)
1581        socket.gethostbyname_ex(domain)
1582        socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1583        # this may not work if the forward lookup chooses the IPv6 address, as that doesn't
1584        # have a reverse entry yet
1585        # socket.gethostbyaddr('испытание.python.org')
1586
1587    def check_sendall_interrupted(self, with_timeout):
1588        # socketpair() is not strictly required, but it makes things easier.
1589        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1590            self.skipTest("signal.alarm and socket.socketpair required for this test")
1591        # Our signal handlers clobber the C errno by calling a math function
1592        # with an invalid domain value.
1593        def ok_handler(*args):
1594            self.assertRaises(ValueError, math.acosh, 0)
1595        def raising_handler(*args):
1596            self.assertRaises(ValueError, math.acosh, 0)
1597            1 // 0
1598        c, s = socket.socketpair()
1599        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1600        try:
1601            if with_timeout:
1602                # Just above the one second minimum for signal.alarm
1603                c.settimeout(1.5)
1604            with self.assertRaises(ZeroDivisionError):
1605                signal.alarm(1)
1606                c.sendall(b"x" * support.SOCK_MAX_SIZE)
1607            if with_timeout:
1608                signal.signal(signal.SIGALRM, ok_handler)
1609                signal.alarm(1)
1610                self.assertRaises(TimeoutError, c.sendall,
1611                                  b"x" * support.SOCK_MAX_SIZE)
1612        finally:
1613            signal.alarm(0)
1614            signal.signal(signal.SIGALRM, old_alarm)
1615            c.close()
1616            s.close()
1617
1618    def test_sendall_interrupted(self):
1619        self.check_sendall_interrupted(False)
1620
1621    def test_sendall_interrupted_with_timeout(self):
1622        self.check_sendall_interrupted(True)
1623
1624    def test_dealloc_warn(self):
1625        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1626        r = repr(sock)
1627        with self.assertWarns(ResourceWarning) as cm:
1628            sock = None
1629            support.gc_collect()
1630        self.assertIn(r, str(cm.warning.args[0]))
1631        # An open socket file object gets dereferenced after the socket
1632        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1633        f = sock.makefile('rb')
1634        r = repr(sock)
1635        sock = None
1636        support.gc_collect()
1637        with self.assertWarns(ResourceWarning):
1638            f = None
1639            support.gc_collect()
1640
1641    def test_name_closed_socketio(self):
1642        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1643            fp = sock.makefile("rb")
1644            fp.close()
1645            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1646
1647    def test_unusable_closed_socketio(self):
1648        with socket.socket() as sock:
1649            fp = sock.makefile("rb", buffering=0)
1650            self.assertTrue(fp.readable())
1651            self.assertFalse(fp.writable())
1652            self.assertFalse(fp.seekable())
1653            fp.close()
1654            self.assertRaises(ValueError, fp.readable)
1655            self.assertRaises(ValueError, fp.writable)
1656            self.assertRaises(ValueError, fp.seekable)
1657
1658    def test_socket_close(self):
1659        sock = socket.socket()
1660        try:
1661            sock.bind((HOST, 0))
1662            socket.close(sock.fileno())
1663            with self.assertRaises(OSError):
1664                sock.listen(1)
1665        finally:
1666            with self.assertRaises(OSError):
1667                # sock.close() fails with EBADF
1668                sock.close()
1669        with self.assertRaises(TypeError):
1670            socket.close(None)
1671        with self.assertRaises(OSError):
1672            socket.close(-1)
1673
1674    def test_makefile_mode(self):
1675        for mode in 'r', 'rb', 'rw', 'w', 'wb':
1676            with self.subTest(mode=mode):
1677                with socket.socket() as sock:
1678                    encoding = None if "b" in mode else "utf-8"
1679                    with sock.makefile(mode, encoding=encoding) as fp:
1680                        self.assertEqual(fp.mode, mode)
1681
1682    def test_makefile_invalid_mode(self):
1683        for mode in 'rt', 'x', '+', 'a':
1684            with self.subTest(mode=mode):
1685                with socket.socket() as sock:
1686                    with self.assertRaisesRegex(ValueError, 'invalid mode'):
1687                        sock.makefile(mode)
1688
1689    def test_pickle(self):
1690        sock = socket.socket()
1691        with sock:
1692            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1693                self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1694        for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1695            family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1696            self.assertEqual(family, socket.AF_INET)
1697            type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1698            self.assertEqual(type, socket.SOCK_STREAM)
1699
1700    def test_listen_backlog(self):
1701        for backlog in 0, -1:
1702            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1703                srv.bind((HOST, 0))
1704                srv.listen(backlog)
1705
1706        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1707            srv.bind((HOST, 0))
1708            srv.listen()
1709
1710    @support.cpython_only
1711    def test_listen_backlog_overflow(self):
1712        # Issue 15989
1713        import _testcapi
1714        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1715            srv.bind((HOST, 0))
1716            self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1717
1718    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1719    def test_flowinfo(self):
1720        self.assertRaises(OverflowError, socket.getnameinfo,
1721                          (socket_helper.HOSTv6, 0, 0xffffffff), 0)
1722        with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1723            self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10))
1724
1725    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1726    def test_getaddrinfo_ipv6_basic(self):
1727        ((*_, sockaddr),) = socket.getaddrinfo(
1728            'ff02::1de:c0:face:8D',  # Note capital letter `D`.
1729            1234, socket.AF_INET6,
1730            socket.SOCK_DGRAM,
1731            socket.IPPROTO_UDP
1732        )
1733        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
1734
1735    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1736    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1737    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1738    @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
1739    def test_getaddrinfo_ipv6_scopeid_symbolic(self):
1740        # Just pick up any network interface (Linux, Mac OS X)
1741        (ifindex, test_interface) = socket.if_nameindex()[0]
1742        ((*_, sockaddr),) = socket.getaddrinfo(
1743            'ff02::1de:c0:face:8D%' + test_interface,
1744            1234, socket.AF_INET6,
1745            socket.SOCK_DGRAM,
1746            socket.IPPROTO_UDP
1747        )
1748        # Note missing interface name part in IPv6 address
1749        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1750
1751    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1752    @unittest.skipUnless(
1753        sys.platform == 'win32',
1754        'Numeric scope id does not work or undocumented')
1755    def test_getaddrinfo_ipv6_scopeid_numeric(self):
1756        # Also works on Linux and Mac OS X, but is not documented (?)
1757        # Windows, Linux and Max OS X allow nonexistent interface numbers here.
1758        ifindex = 42
1759        ((*_, sockaddr),) = socket.getaddrinfo(
1760            'ff02::1de:c0:face:8D%' + str(ifindex),
1761            1234, socket.AF_INET6,
1762            socket.SOCK_DGRAM,
1763            socket.IPPROTO_UDP
1764        )
1765        # Note missing interface name part in IPv6 address
1766        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1767
1768    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1769    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1770    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1771    @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
1772    def test_getnameinfo_ipv6_scopeid_symbolic(self):
1773        # Just pick up any network interface.
1774        (ifindex, test_interface) = socket.if_nameindex()[0]
1775        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1776        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1777        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
1778
1779    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1780    @unittest.skipUnless( sys.platform == 'win32',
1781        'Numeric scope id does not work or undocumented')
1782    def test_getnameinfo_ipv6_scopeid_numeric(self):
1783        # Also works on Linux (undocumented), but does not work on Mac OS X
1784        # Windows and Linux allow nonexistent interface numbers here.
1785        ifindex = 42
1786        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1787        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1788        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234'))
1789
1790    def test_str_for_enums(self):
1791        # Make sure that the AF_* and SOCK_* constants have enum-like string
1792        # reprs.
1793        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1794            self.assertEqual(str(s.family), 'AddressFamily.AF_INET')
1795            self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM')
1796
1797    def test_socket_consistent_sock_type(self):
1798        SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
1799        SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0)
1800        sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC
1801
1802        with socket.socket(socket.AF_INET, sock_type) as s:
1803            self.assertEqual(s.type, socket.SOCK_STREAM)
1804            s.settimeout(1)
1805            self.assertEqual(s.type, socket.SOCK_STREAM)
1806            s.settimeout(0)
1807            self.assertEqual(s.type, socket.SOCK_STREAM)
1808            s.setblocking(True)
1809            self.assertEqual(s.type, socket.SOCK_STREAM)
1810            s.setblocking(False)
1811            self.assertEqual(s.type, socket.SOCK_STREAM)
1812
1813    def test_unknown_socket_family_repr(self):
1814        # Test that when created with a family that's not one of the known
1815        # AF_*/SOCK_* constants, socket.family just returns the number.
1816        #
1817        # To do this we fool socket.socket into believing it already has an
1818        # open fd because on this path it doesn't actually verify the family and
1819        # type and populates the socket object.
1820        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1821        fd = sock.detach()
1822        unknown_family = max(socket.AddressFamily.__members__.values()) + 1
1823
1824        unknown_type = max(
1825            kind
1826            for name, kind in socket.SocketKind.__members__.items()
1827            if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'}
1828        ) + 1
1829
1830        with socket.socket(
1831                family=unknown_family, type=unknown_type, proto=23,
1832                fileno=fd) as s:
1833            self.assertEqual(s.family, unknown_family)
1834            self.assertEqual(s.type, unknown_type)
1835            # some OS like macOS ignore proto
1836            self.assertIn(s.proto, {0, 23})
1837
1838    @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1839    def test__sendfile_use_sendfile(self):
1840        class File:
1841            def __init__(self, fd):
1842                self.fd = fd
1843
1844            def fileno(self):
1845                return self.fd
1846        with socket.socket() as sock:
1847            fd = os.open(os.curdir, os.O_RDONLY)
1848            os.close(fd)
1849            with self.assertRaises(socket._GiveupOnSendfile):
1850                sock._sendfile_use_sendfile(File(fd))
1851            with self.assertRaises(OverflowError):
1852                sock._sendfile_use_sendfile(File(2**1000))
1853            with self.assertRaises(TypeError):
1854                sock._sendfile_use_sendfile(File(None))
1855
1856    def _test_socket_fileno(self, s, family, stype):
1857        self.assertEqual(s.family, family)
1858        self.assertEqual(s.type, stype)
1859
1860        fd = s.fileno()
1861        s2 = socket.socket(fileno=fd)
1862        self.addCleanup(s2.close)
1863        # detach old fd to avoid double close
1864        s.detach()
1865        self.assertEqual(s2.family, family)
1866        self.assertEqual(s2.type, stype)
1867        self.assertEqual(s2.fileno(), fd)
1868
1869    def test_socket_fileno(self):
1870        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1871        self.addCleanup(s.close)
1872        s.bind((socket_helper.HOST, 0))
1873        self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
1874
1875        if hasattr(socket, "SOCK_DGRAM"):
1876            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1877            self.addCleanup(s.close)
1878            s.bind((socket_helper.HOST, 0))
1879            self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
1880
1881        if socket_helper.IPV6_ENABLED:
1882            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1883            self.addCleanup(s.close)
1884            s.bind((socket_helper.HOSTv6, 0, 0, 0))
1885            self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
1886
1887        if hasattr(socket, "AF_UNIX"):
1888            tmpdir = tempfile.mkdtemp()
1889            self.addCleanup(shutil.rmtree, tmpdir)
1890            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1891            self.addCleanup(s.close)
1892            try:
1893                s.bind(os.path.join(tmpdir, 'socket'))
1894            except PermissionError:
1895                pass
1896            else:
1897                self._test_socket_fileno(s, socket.AF_UNIX,
1898                                         socket.SOCK_STREAM)
1899
1900    def test_socket_fileno_rejects_float(self):
1901        with self.assertRaises(TypeError):
1902            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5)
1903
1904    def test_socket_fileno_rejects_other_types(self):
1905        with self.assertRaises(TypeError):
1906            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo")
1907
1908    def test_socket_fileno_rejects_invalid_socket(self):
1909        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1910            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1)
1911
1912    @unittest.skipIf(os.name == "nt", "Windows disallows -1 only")
1913    def test_socket_fileno_rejects_negative(self):
1914        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1915            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42)
1916
1917    def test_socket_fileno_requires_valid_fd(self):
1918        WSAENOTSOCK = 10038
1919        with self.assertRaises(OSError) as cm:
1920            socket.socket(fileno=os_helper.make_bad_fd())
1921        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1922
1923        with self.assertRaises(OSError) as cm:
1924            socket.socket(
1925                socket.AF_INET,
1926                socket.SOCK_STREAM,
1927                fileno=os_helper.make_bad_fd())
1928        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1929
1930    def test_socket_fileno_requires_socket_fd(self):
1931        with tempfile.NamedTemporaryFile() as afile:
1932            with self.assertRaises(OSError):
1933                socket.socket(fileno=afile.fileno())
1934
1935            with self.assertRaises(OSError) as cm:
1936                socket.socket(
1937                    socket.AF_INET,
1938                    socket.SOCK_STREAM,
1939                    fileno=afile.fileno())
1940            self.assertEqual(cm.exception.errno, errno.ENOTSOCK)
1941
1942
1943@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1944class BasicCANTest(unittest.TestCase):
1945
1946    def testCrucialConstants(self):
1947        socket.AF_CAN
1948        socket.PF_CAN
1949        socket.CAN_RAW
1950
1951    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1952                         'socket.CAN_BCM required for this test.')
1953    def testBCMConstants(self):
1954        socket.CAN_BCM
1955
1956        # opcodes
1957        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task
1958        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task
1959        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task
1960        socket.CAN_BCM_TX_SEND      # send one CAN frame
1961        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription
1962        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription
1963        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription
1964        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request
1965        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0)
1966        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request
1967        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent
1968        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change)
1969
1970        # flags
1971        socket.CAN_BCM_SETTIMER
1972        socket.CAN_BCM_STARTTIMER
1973        socket.CAN_BCM_TX_COUNTEVT
1974        socket.CAN_BCM_TX_ANNOUNCE
1975        socket.CAN_BCM_TX_CP_CAN_ID
1976        socket.CAN_BCM_RX_FILTER_ID
1977        socket.CAN_BCM_RX_CHECK_DLC
1978        socket.CAN_BCM_RX_NO_AUTOTIMER
1979        socket.CAN_BCM_RX_ANNOUNCE_RESUME
1980        socket.CAN_BCM_TX_RESET_MULTI_IDX
1981        socket.CAN_BCM_RX_RTR_FRAME
1982
1983    def testCreateSocket(self):
1984        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1985            pass
1986
1987    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1988                         'socket.CAN_BCM required for this test.')
1989    def testCreateBCMSocket(self):
1990        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
1991            pass
1992
1993    def testBindAny(self):
1994        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1995            address = ('', )
1996            s.bind(address)
1997            self.assertEqual(s.getsockname(), address)
1998
1999    def testTooLongInterfaceName(self):
2000        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2001        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2002            self.assertRaisesRegex(OSError, 'interface name too long',
2003                                   s.bind, ('x' * 1024,))
2004
2005    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
2006                         'socket.CAN_RAW_LOOPBACK required for this test.')
2007    def testLoopback(self):
2008        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2009            for loopback in (0, 1):
2010                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
2011                             loopback)
2012                self.assertEqual(loopback,
2013                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
2014
2015    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
2016                         'socket.CAN_RAW_FILTER required for this test.')
2017    def testFilter(self):
2018        can_id, can_mask = 0x200, 0x700
2019        can_filter = struct.pack("=II", can_id, can_mask)
2020        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2021            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
2022            self.assertEqual(can_filter,
2023                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
2024            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
2025
2026
2027@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
2028class CANTest(ThreadedCANSocketTest):
2029
2030    def __init__(self, methodName='runTest'):
2031        ThreadedCANSocketTest.__init__(self, methodName=methodName)
2032
2033    @classmethod
2034    def build_can_frame(cls, can_id, data):
2035        """Build a CAN frame."""
2036        can_dlc = len(data)
2037        data = data.ljust(8, b'\x00')
2038        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
2039
2040    @classmethod
2041    def dissect_can_frame(cls, frame):
2042        """Dissect a CAN frame."""
2043        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
2044        return (can_id, can_dlc, data[:can_dlc])
2045
2046    def testSendFrame(self):
2047        cf, addr = self.s.recvfrom(self.bufsize)
2048        self.assertEqual(self.cf, cf)
2049        self.assertEqual(addr[0], self.interface)
2050
2051    def _testSendFrame(self):
2052        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
2053        self.cli.send(self.cf)
2054
2055    def testSendMaxFrame(self):
2056        cf, addr = self.s.recvfrom(self.bufsize)
2057        self.assertEqual(self.cf, cf)
2058
2059    def _testSendMaxFrame(self):
2060        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
2061        self.cli.send(self.cf)
2062
2063    def testSendMultiFrames(self):
2064        cf, addr = self.s.recvfrom(self.bufsize)
2065        self.assertEqual(self.cf1, cf)
2066
2067        cf, addr = self.s.recvfrom(self.bufsize)
2068        self.assertEqual(self.cf2, cf)
2069
2070    def _testSendMultiFrames(self):
2071        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
2072        self.cli.send(self.cf1)
2073
2074        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
2075        self.cli.send(self.cf2)
2076
2077    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2078                         'socket.CAN_BCM required for this test.')
2079    def _testBCM(self):
2080        cf, addr = self.cli.recvfrom(self.bufsize)
2081        self.assertEqual(self.cf, cf)
2082        can_id, can_dlc, data = self.dissect_can_frame(cf)
2083        self.assertEqual(self.can_id, can_id)
2084        self.assertEqual(self.data, data)
2085
2086    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2087                         'socket.CAN_BCM required for this test.')
2088    def testBCM(self):
2089        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
2090        self.addCleanup(bcm.close)
2091        bcm.connect((self.interface,))
2092        self.can_id = 0x123
2093        self.data = bytes([0xc0, 0xff, 0xee])
2094        self.cf = self.build_can_frame(self.can_id, self.data)
2095        opcode = socket.CAN_BCM_TX_SEND
2096        flags = 0
2097        count = 0
2098        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
2099        bcm_can_id = 0x0222
2100        nframes = 1
2101        assert len(self.cf) == 16
2102        header = struct.pack(self.bcm_cmd_msg_fmt,
2103                    opcode,
2104                    flags,
2105                    count,
2106                    ival1_seconds,
2107                    ival1_usec,
2108                    ival2_seconds,
2109                    ival2_usec,
2110                    bcm_can_id,
2111                    nframes,
2112                    )
2113        header_plus_frame = header + self.cf
2114        bytes_sent = bcm.send(header_plus_frame)
2115        self.assertEqual(bytes_sent, len(header_plus_frame))
2116
2117
2118@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
2119class ISOTPTest(unittest.TestCase):
2120
2121    def __init__(self, *args, **kwargs):
2122        super().__init__(*args, **kwargs)
2123        self.interface = "vcan0"
2124
2125    def testCrucialConstants(self):
2126        socket.AF_CAN
2127        socket.PF_CAN
2128        socket.CAN_ISOTP
2129        socket.SOCK_DGRAM
2130
2131    def testCreateSocket(self):
2132        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2133            pass
2134
2135    @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
2136                         'socket.CAN_ISOTP required for this test.')
2137    def testCreateISOTPSocket(self):
2138        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2139            pass
2140
2141    def testTooLongInterfaceName(self):
2142        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2143        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2144            with self.assertRaisesRegex(OSError, 'interface name too long'):
2145                s.bind(('x' * 1024, 1, 2))
2146
2147    def testBind(self):
2148        try:
2149            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2150                addr = self.interface, 0x123, 0x456
2151                s.bind(addr)
2152                self.assertEqual(s.getsockname(), addr)
2153        except OSError as e:
2154            if e.errno == errno.ENODEV:
2155                self.skipTest('network interface `%s` does not exist' %
2156                           self.interface)
2157            else:
2158                raise
2159
2160
2161@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.')
2162class J1939Test(unittest.TestCase):
2163
2164    def __init__(self, *args, **kwargs):
2165        super().__init__(*args, **kwargs)
2166        self.interface = "vcan0"
2167
2168    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2169                         'socket.CAN_J1939 required for this test.')
2170    def testJ1939Constants(self):
2171        socket.CAN_J1939
2172
2173        socket.J1939_MAX_UNICAST_ADDR
2174        socket.J1939_IDLE_ADDR
2175        socket.J1939_NO_ADDR
2176        socket.J1939_NO_NAME
2177        socket.J1939_PGN_REQUEST
2178        socket.J1939_PGN_ADDRESS_CLAIMED
2179        socket.J1939_PGN_ADDRESS_COMMANDED
2180        socket.J1939_PGN_PDU1_MAX
2181        socket.J1939_PGN_MAX
2182        socket.J1939_NO_PGN
2183
2184        # J1939 socket options
2185        socket.SO_J1939_FILTER
2186        socket.SO_J1939_PROMISC
2187        socket.SO_J1939_SEND_PRIO
2188        socket.SO_J1939_ERRQUEUE
2189
2190        socket.SCM_J1939_DEST_ADDR
2191        socket.SCM_J1939_DEST_NAME
2192        socket.SCM_J1939_PRIO
2193        socket.SCM_J1939_ERRQUEUE
2194
2195        socket.J1939_NLA_PAD
2196        socket.J1939_NLA_BYTES_ACKED
2197
2198        socket.J1939_EE_INFO_NONE
2199        socket.J1939_EE_INFO_TX_ABORT
2200
2201        socket.J1939_FILTER_MAX
2202
2203    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2204                         'socket.CAN_J1939 required for this test.')
2205    def testCreateJ1939Socket(self):
2206        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2207            pass
2208
2209    def testBind(self):
2210        try:
2211            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2212                addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR
2213                s.bind(addr)
2214                self.assertEqual(s.getsockname(), addr)
2215        except OSError as e:
2216            if e.errno == errno.ENODEV:
2217                self.skipTest('network interface `%s` does not exist' %
2218                           self.interface)
2219            else:
2220                raise
2221
2222
2223@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2224class BasicRDSTest(unittest.TestCase):
2225
2226    def testCrucialConstants(self):
2227        socket.AF_RDS
2228        socket.PF_RDS
2229
2230    def testCreateSocket(self):
2231        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2232            pass
2233
2234    def testSocketBufferSize(self):
2235        bufsize = 16384
2236        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2237            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
2238            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
2239
2240
2241@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2242class RDSTest(ThreadedRDSSocketTest):
2243
2244    def __init__(self, methodName='runTest'):
2245        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
2246
2247    def setUp(self):
2248        super().setUp()
2249        self.evt = threading.Event()
2250
2251    def testSendAndRecv(self):
2252        data, addr = self.serv.recvfrom(self.bufsize)
2253        self.assertEqual(self.data, data)
2254        self.assertEqual(self.cli_addr, addr)
2255
2256    def _testSendAndRecv(self):
2257        self.data = b'spam'
2258        self.cli.sendto(self.data, 0, (HOST, self.port))
2259
2260    def testPeek(self):
2261        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
2262        self.assertEqual(self.data, data)
2263        data, addr = self.serv.recvfrom(self.bufsize)
2264        self.assertEqual(self.data, data)
2265
2266    def _testPeek(self):
2267        self.data = b'spam'
2268        self.cli.sendto(self.data, 0, (HOST, self.port))
2269
2270    @requireAttrs(socket.socket, 'recvmsg')
2271    def testSendAndRecvMsg(self):
2272        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
2273        self.assertEqual(self.data, data)
2274
2275    @requireAttrs(socket.socket, 'sendmsg')
2276    def _testSendAndRecvMsg(self):
2277        self.data = b'hello ' * 10
2278        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
2279
2280    def testSendAndRecvMulti(self):
2281        data, addr = self.serv.recvfrom(self.bufsize)
2282        self.assertEqual(self.data1, data)
2283
2284        data, addr = self.serv.recvfrom(self.bufsize)
2285        self.assertEqual(self.data2, data)
2286
2287    def _testSendAndRecvMulti(self):
2288        self.data1 = b'bacon'
2289        self.cli.sendto(self.data1, 0, (HOST, self.port))
2290
2291        self.data2 = b'egg'
2292        self.cli.sendto(self.data2, 0, (HOST, self.port))
2293
2294    def testSelect(self):
2295        r, w, x = select.select([self.serv], [], [], 3.0)
2296        self.assertIn(self.serv, r)
2297        data, addr = self.serv.recvfrom(self.bufsize)
2298        self.assertEqual(self.data, data)
2299
2300    def _testSelect(self):
2301        self.data = b'select'
2302        self.cli.sendto(self.data, 0, (HOST, self.port))
2303
2304@unittest.skipUnless(HAVE_SOCKET_QIPCRTR,
2305          'QIPCRTR sockets required for this test.')
2306class BasicQIPCRTRTest(unittest.TestCase):
2307
2308    def testCrucialConstants(self):
2309        socket.AF_QIPCRTR
2310
2311    def testCreateSocket(self):
2312        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2313            pass
2314
2315    def testUnbound(self):
2316        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2317            self.assertEqual(s.getsockname()[1], 0)
2318
2319    def testBindSock(self):
2320        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2321            socket_helper.bind_port(s, host=s.getsockname()[0])
2322            self.assertNotEqual(s.getsockname()[1], 0)
2323
2324    def testInvalidBindSock(self):
2325        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2326            self.assertRaises(OSError, socket_helper.bind_port, s, host=-2)
2327
2328    def testAutoBindSock(self):
2329        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2330            s.connect((123, 123))
2331            self.assertNotEqual(s.getsockname()[1], 0)
2332
2333@unittest.skipIf(fcntl is None, "need fcntl")
2334@unittest.skipUnless(HAVE_SOCKET_VSOCK,
2335          'VSOCK sockets required for this test.')
2336class BasicVSOCKTest(unittest.TestCase):
2337
2338    def testCrucialConstants(self):
2339        socket.AF_VSOCK
2340
2341    def testVSOCKConstants(self):
2342        socket.SO_VM_SOCKETS_BUFFER_SIZE
2343        socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE
2344        socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE
2345        socket.VMADDR_CID_ANY
2346        socket.VMADDR_PORT_ANY
2347        socket.VMADDR_CID_HOST
2348        socket.VM_SOCKETS_INVALID_VERSION
2349        socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID
2350
2351    def testCreateSocket(self):
2352        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2353            pass
2354
2355    def testSocketBufferSize(self):
2356        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2357            orig_max = s.getsockopt(socket.AF_VSOCK,
2358                                    socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)
2359            orig = s.getsockopt(socket.AF_VSOCK,
2360                                socket.SO_VM_SOCKETS_BUFFER_SIZE)
2361            orig_min = s.getsockopt(socket.AF_VSOCK,
2362                                    socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)
2363
2364            s.setsockopt(socket.AF_VSOCK,
2365                         socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2)
2366            s.setsockopt(socket.AF_VSOCK,
2367                         socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2)
2368            s.setsockopt(socket.AF_VSOCK,
2369                         socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2)
2370
2371            self.assertEqual(orig_max * 2,
2372                             s.getsockopt(socket.AF_VSOCK,
2373                             socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE))
2374            self.assertEqual(orig * 2,
2375                             s.getsockopt(socket.AF_VSOCK,
2376                             socket.SO_VM_SOCKETS_BUFFER_SIZE))
2377            self.assertEqual(orig_min * 2,
2378                             s.getsockopt(socket.AF_VSOCK,
2379                             socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE))
2380
2381
2382@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH,
2383                     'Bluetooth sockets required for this test.')
2384class BasicBluetoothTest(unittest.TestCase):
2385
2386    def testBluetoothConstants(self):
2387        socket.BDADDR_ANY
2388        socket.BDADDR_LOCAL
2389        socket.AF_BLUETOOTH
2390        socket.BTPROTO_RFCOMM
2391
2392        if sys.platform != "win32":
2393            socket.BTPROTO_HCI
2394            socket.SOL_HCI
2395            socket.BTPROTO_L2CAP
2396
2397            if not sys.platform.startswith("freebsd"):
2398                socket.BTPROTO_SCO
2399
2400    def testCreateRfcommSocket(self):
2401        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
2402            pass
2403
2404    @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets")
2405    def testCreateL2capSocket(self):
2406        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s:
2407            pass
2408
2409    @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets")
2410    def testCreateHciSocket(self):
2411        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
2412            pass
2413
2414    @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"),
2415                     "windows and freebsd do not support SCO sockets")
2416    def testCreateScoSocket(self):
2417        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
2418            pass
2419
2420
2421class BasicTCPTest(SocketConnectedTest):
2422
2423    def __init__(self, methodName='runTest'):
2424        SocketConnectedTest.__init__(self, methodName=methodName)
2425
2426    def testRecv(self):
2427        # Testing large receive over TCP
2428        msg = self.cli_conn.recv(1024)
2429        self.assertEqual(msg, MSG)
2430
2431    def _testRecv(self):
2432        self.serv_conn.send(MSG)
2433
2434    def testOverFlowRecv(self):
2435        # Testing receive in chunks over TCP
2436        seg1 = self.cli_conn.recv(len(MSG) - 3)
2437        seg2 = self.cli_conn.recv(1024)
2438        msg = seg1 + seg2
2439        self.assertEqual(msg, MSG)
2440
2441    def _testOverFlowRecv(self):
2442        self.serv_conn.send(MSG)
2443
2444    def testRecvFrom(self):
2445        # Testing large recvfrom() over TCP
2446        msg, addr = self.cli_conn.recvfrom(1024)
2447        self.assertEqual(msg, MSG)
2448
2449    def _testRecvFrom(self):
2450        self.serv_conn.send(MSG)
2451
2452    def testOverFlowRecvFrom(self):
2453        # Testing recvfrom() in chunks over TCP
2454        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
2455        seg2, addr = self.cli_conn.recvfrom(1024)
2456        msg = seg1 + seg2
2457        self.assertEqual(msg, MSG)
2458
2459    def _testOverFlowRecvFrom(self):
2460        self.serv_conn.send(MSG)
2461
2462    def testSendAll(self):
2463        # Testing sendall() with a 2048 byte string over TCP
2464        msg = b''
2465        while 1:
2466            read = self.cli_conn.recv(1024)
2467            if not read:
2468                break
2469            msg += read
2470        self.assertEqual(msg, b'f' * 2048)
2471
2472    def _testSendAll(self):
2473        big_chunk = b'f' * 2048
2474        self.serv_conn.sendall(big_chunk)
2475
2476    def testFromFd(self):
2477        # Testing fromfd()
2478        fd = self.cli_conn.fileno()
2479        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
2480        self.addCleanup(sock.close)
2481        self.assertIsInstance(sock, socket.socket)
2482        msg = sock.recv(1024)
2483        self.assertEqual(msg, MSG)
2484
2485    def _testFromFd(self):
2486        self.serv_conn.send(MSG)
2487
2488    def testDup(self):
2489        # Testing dup()
2490        sock = self.cli_conn.dup()
2491        self.addCleanup(sock.close)
2492        msg = sock.recv(1024)
2493        self.assertEqual(msg, MSG)
2494
2495    def _testDup(self):
2496        self.serv_conn.send(MSG)
2497
2498    def testShutdown(self):
2499        # Testing shutdown()
2500        msg = self.cli_conn.recv(1024)
2501        self.assertEqual(msg, MSG)
2502        # wait for _testShutdown to finish: on OS X, when the server
2503        # closes the connection the client also becomes disconnected,
2504        # and the client's shutdown call will fail. (Issue #4397.)
2505        self.done.wait()
2506
2507    def _testShutdown(self):
2508        self.serv_conn.send(MSG)
2509        self.serv_conn.shutdown(2)
2510
2511    testShutdown_overflow = support.cpython_only(testShutdown)
2512
2513    @support.cpython_only
2514    def _testShutdown_overflow(self):
2515        import _testcapi
2516        self.serv_conn.send(MSG)
2517        # Issue 15989
2518        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2519                          _testcapi.INT_MAX + 1)
2520        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2521                          2 + (_testcapi.UINT_MAX + 1))
2522        self.serv_conn.shutdown(2)
2523
2524    def testDetach(self):
2525        # Testing detach()
2526        fileno = self.cli_conn.fileno()
2527        f = self.cli_conn.detach()
2528        self.assertEqual(f, fileno)
2529        # cli_conn cannot be used anymore...
2530        self.assertTrue(self.cli_conn._closed)
2531        self.assertRaises(OSError, self.cli_conn.recv, 1024)
2532        self.cli_conn.close()
2533        # ...but we can create another socket using the (still open)
2534        # file descriptor
2535        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
2536        self.addCleanup(sock.close)
2537        msg = sock.recv(1024)
2538        self.assertEqual(msg, MSG)
2539
2540    def _testDetach(self):
2541        self.serv_conn.send(MSG)
2542
2543
2544class BasicUDPTest(ThreadedUDPSocketTest):
2545
2546    def __init__(self, methodName='runTest'):
2547        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
2548
2549    def testSendtoAndRecv(self):
2550        # Testing sendto() and Recv() over UDP
2551        msg = self.serv.recv(len(MSG))
2552        self.assertEqual(msg, MSG)
2553
2554    def _testSendtoAndRecv(self):
2555        self.cli.sendto(MSG, 0, (HOST, self.port))
2556
2557    def testRecvFrom(self):
2558        # Testing recvfrom() over UDP
2559        msg, addr = self.serv.recvfrom(len(MSG))
2560        self.assertEqual(msg, MSG)
2561
2562    def _testRecvFrom(self):
2563        self.cli.sendto(MSG, 0, (HOST, self.port))
2564
2565    def testRecvFromNegative(self):
2566        # Negative lengths passed to recvfrom should give ValueError.
2567        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2568
2569    def _testRecvFromNegative(self):
2570        self.cli.sendto(MSG, 0, (HOST, self.port))
2571
2572
2573@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
2574          'UDPLITE sockets required for this test.')
2575class BasicUDPLITETest(ThreadedUDPLITESocketTest):
2576
2577    def __init__(self, methodName='runTest'):
2578        ThreadedUDPLITESocketTest.__init__(self, methodName=methodName)
2579
2580    def testSendtoAndRecv(self):
2581        # Testing sendto() and Recv() over UDPLITE
2582        msg = self.serv.recv(len(MSG))
2583        self.assertEqual(msg, MSG)
2584
2585    def _testSendtoAndRecv(self):
2586        self.cli.sendto(MSG, 0, (HOST, self.port))
2587
2588    def testRecvFrom(self):
2589        # Testing recvfrom() over UDPLITE
2590        msg, addr = self.serv.recvfrom(len(MSG))
2591        self.assertEqual(msg, MSG)
2592
2593    def _testRecvFrom(self):
2594        self.cli.sendto(MSG, 0, (HOST, self.port))
2595
2596    def testRecvFromNegative(self):
2597        # Negative lengths passed to recvfrom should give ValueError.
2598        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2599
2600    def _testRecvFromNegative(self):
2601        self.cli.sendto(MSG, 0, (HOST, self.port))
2602
2603# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
2604# same test code is used with different families and types of socket
2605# (e.g. stream, datagram), and tests using recvmsg() are repeated
2606# using recvmsg_into().
2607#
2608# The generic test classes such as SendmsgTests and
2609# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
2610# supplied with sockets cli_sock and serv_sock representing the
2611# client's and the server's end of the connection respectively, and
2612# attributes cli_addr and serv_addr holding their (numeric where
2613# appropriate) addresses.
2614#
2615# The final concrete test classes combine these with subclasses of
2616# SocketTestBase which set up client and server sockets of a specific
2617# type, and with subclasses of SendrecvmsgBase such as
2618# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
2619# sockets to cli_sock and serv_sock and override the methods and
2620# attributes of SendrecvmsgBase to fill in destination addresses if
2621# needed when sending, check for specific flags in msg_flags, etc.
2622#
2623# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
2624# recvmsg_into().
2625
2626# XXX: like the other datagram (UDP) tests in this module, the code
2627# here assumes that datagram delivery on the local machine will be
2628# reliable.
2629
2630class SendrecvmsgBase(ThreadSafeCleanupTestCase):
2631    # Base class for sendmsg()/recvmsg() tests.
2632
2633    # Time in seconds to wait before considering a test failed, or
2634    # None for no timeout.  Not all tests actually set a timeout.
2635    fail_timeout = support.LOOPBACK_TIMEOUT
2636
2637    def setUp(self):
2638        self.misc_event = threading.Event()
2639        super().setUp()
2640
2641    def sendToServer(self, msg):
2642        # Send msg to the server.
2643        return self.cli_sock.send(msg)
2644
2645    # Tuple of alternative default arguments for sendmsg() when called
2646    # via sendmsgToServer() (e.g. to include a destination address).
2647    sendmsg_to_server_defaults = ()
2648
2649    def sendmsgToServer(self, *args):
2650        # Call sendmsg() on self.cli_sock with the given arguments,
2651        # filling in any arguments which are not supplied with the
2652        # corresponding items of self.sendmsg_to_server_defaults, if
2653        # any.
2654        return self.cli_sock.sendmsg(
2655            *(args + self.sendmsg_to_server_defaults[len(args):]))
2656
2657    def doRecvmsg(self, sock, bufsize, *args):
2658        # Call recvmsg() on sock with given arguments and return its
2659        # result.  Should be used for tests which can use either
2660        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
2661        # this method with one which emulates it using recvmsg_into(),
2662        # thus allowing the same test to be used for both methods.
2663        result = sock.recvmsg(bufsize, *args)
2664        self.registerRecvmsgResult(result)
2665        return result
2666
2667    def registerRecvmsgResult(self, result):
2668        # Called by doRecvmsg() with the return value of recvmsg() or
2669        # recvmsg_into().  Can be overridden to arrange cleanup based
2670        # on the returned ancillary data, for instance.
2671        pass
2672
2673    def checkRecvmsgAddress(self, addr1, addr2):
2674        # Called to compare the received address with the address of
2675        # the peer.
2676        self.assertEqual(addr1, addr2)
2677
2678    # Flags that are normally unset in msg_flags
2679    msg_flags_common_unset = 0
2680    for name in ("MSG_CTRUNC", "MSG_OOB"):
2681        msg_flags_common_unset |= getattr(socket, name, 0)
2682
2683    # Flags that are normally set
2684    msg_flags_common_set = 0
2685
2686    # Flags set when a complete record has been received (e.g. MSG_EOR
2687    # for SCTP)
2688    msg_flags_eor_indicator = 0
2689
2690    # Flags set when a complete record has not been received
2691    # (e.g. MSG_TRUNC for datagram sockets)
2692    msg_flags_non_eor_indicator = 0
2693
2694    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2695        # Method to check the value of msg_flags returned by recvmsg[_into]().
2696        #
2697        # Checks that all bits in msg_flags_common_set attribute are
2698        # set in "flags" and all bits in msg_flags_common_unset are
2699        # unset.
2700        #
2701        # The "eor" argument specifies whether the flags should
2702        # indicate that a full record (or datagram) has been received.
2703        # If "eor" is None, no checks are done; otherwise, checks
2704        # that:
2705        #
2706        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
2707        #    set and all bits in msg_flags_non_eor_indicator are unset
2708        #
2709        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
2710        #    are set and all bits in msg_flags_eor_indicator are unset
2711        #
2712        # If "checkset" and/or "checkunset" are supplied, they require
2713        # the given bits to be set or unset respectively, overriding
2714        # what the attributes require for those bits.
2715        #
2716        # If any bits are set in "ignore", they will not be checked,
2717        # regardless of the other inputs.
2718        #
2719        # Will raise Exception if the inputs require a bit to be both
2720        # set and unset, and it is not ignored.
2721
2722        defaultset = self.msg_flags_common_set
2723        defaultunset = self.msg_flags_common_unset
2724
2725        if eor:
2726            defaultset |= self.msg_flags_eor_indicator
2727            defaultunset |= self.msg_flags_non_eor_indicator
2728        elif eor is not None:
2729            defaultset |= self.msg_flags_non_eor_indicator
2730            defaultunset |= self.msg_flags_eor_indicator
2731
2732        # Function arguments override defaults
2733        defaultset &= ~checkunset
2734        defaultunset &= ~checkset
2735
2736        # Merge arguments with remaining defaults, and check for conflicts
2737        checkset |= defaultset
2738        checkunset |= defaultunset
2739        inboth = checkset & checkunset & ~ignore
2740        if inboth:
2741            raise Exception("contradictory set, unset requirements for flags "
2742                            "{0:#x}".format(inboth))
2743
2744        # Compare with given msg_flags value
2745        mask = (checkset | checkunset) & ~ignore
2746        self.assertEqual(flags & mask, checkset & mask)
2747
2748
2749class RecvmsgIntoMixin(SendrecvmsgBase):
2750    # Mixin to implement doRecvmsg() using recvmsg_into().
2751
2752    def doRecvmsg(self, sock, bufsize, *args):
2753        buf = bytearray(bufsize)
2754        result = sock.recvmsg_into([buf], *args)
2755        self.registerRecvmsgResult(result)
2756        self.assertGreaterEqual(result[0], 0)
2757        self.assertLessEqual(result[0], bufsize)
2758        return (bytes(buf[:result[0]]),) + result[1:]
2759
2760
2761class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2762    # Defines flags to be checked in msg_flags for datagram sockets.
2763
2764    @property
2765    def msg_flags_non_eor_indicator(self):
2766        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2767
2768
2769class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2770    # Defines flags to be checked in msg_flags for SCTP sockets.
2771
2772    @property
2773    def msg_flags_eor_indicator(self):
2774        return super().msg_flags_eor_indicator | socket.MSG_EOR
2775
2776
2777class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2778    # Base class for tests on connectionless-mode sockets.  Users must
2779    # supply sockets on attributes cli and serv to be mapped to
2780    # cli_sock and serv_sock respectively.
2781
2782    @property
2783    def serv_sock(self):
2784        return self.serv
2785
2786    @property
2787    def cli_sock(self):
2788        return self.cli
2789
2790    @property
2791    def sendmsg_to_server_defaults(self):
2792        return ([], [], 0, self.serv_addr)
2793
2794    def sendToServer(self, msg):
2795        return self.cli_sock.sendto(msg, self.serv_addr)
2796
2797
2798class SendrecvmsgConnectedBase(SendrecvmsgBase):
2799    # Base class for tests on connected sockets.  Users must supply
2800    # sockets on attributes serv_conn and cli_conn (representing the
2801    # connections *to* the server and the client), to be mapped to
2802    # cli_sock and serv_sock respectively.
2803
2804    @property
2805    def serv_sock(self):
2806        return self.cli_conn
2807
2808    @property
2809    def cli_sock(self):
2810        return self.serv_conn
2811
2812    def checkRecvmsgAddress(self, addr1, addr2):
2813        # Address is currently "unspecified" for a connected socket,
2814        # so we don't examine it
2815        pass
2816
2817
2818class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2819    # Base class to set a timeout on server's socket.
2820
2821    def setUp(self):
2822        super().setUp()
2823        self.serv_sock.settimeout(self.fail_timeout)
2824
2825
2826class SendmsgTests(SendrecvmsgServerTimeoutBase):
2827    # Tests for sendmsg() which can use any socket type and do not
2828    # involve recvmsg() or recvmsg_into().
2829
2830    def testSendmsg(self):
2831        # Send a simple message with sendmsg().
2832        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2833
2834    def _testSendmsg(self):
2835        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2836
2837    def testSendmsgDataGenerator(self):
2838        # Send from buffer obtained from a generator (not a sequence).
2839        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2840
2841    def _testSendmsgDataGenerator(self):
2842        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2843                         len(MSG))
2844
2845    def testSendmsgAncillaryGenerator(self):
2846        # Gather (empty) ancillary data from a generator.
2847        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2848
2849    def _testSendmsgAncillaryGenerator(self):
2850        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2851                         len(MSG))
2852
2853    def testSendmsgArray(self):
2854        # Send data from an array instead of the usual bytes object.
2855        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2856
2857    def _testSendmsgArray(self):
2858        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2859                         len(MSG))
2860
2861    def testSendmsgGather(self):
2862        # Send message data from more than one buffer (gather write).
2863        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2864
2865    def _testSendmsgGather(self):
2866        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2867
2868    def testSendmsgBadArgs(self):
2869        # Check that sendmsg() rejects invalid arguments.
2870        self.assertEqual(self.serv_sock.recv(1000), b"done")
2871
2872    def _testSendmsgBadArgs(self):
2873        self.assertRaises(TypeError, self.cli_sock.sendmsg)
2874        self.assertRaises(TypeError, self.sendmsgToServer,
2875                          b"not in an iterable")
2876        self.assertRaises(TypeError, self.sendmsgToServer,
2877                          object())
2878        self.assertRaises(TypeError, self.sendmsgToServer,
2879                          [object()])
2880        self.assertRaises(TypeError, self.sendmsgToServer,
2881                          [MSG, object()])
2882        self.assertRaises(TypeError, self.sendmsgToServer,
2883                          [MSG], object())
2884        self.assertRaises(TypeError, self.sendmsgToServer,
2885                          [MSG], [], object())
2886        self.assertRaises(TypeError, self.sendmsgToServer,
2887                          [MSG], [], 0, object())
2888        self.sendToServer(b"done")
2889
2890    def testSendmsgBadCmsg(self):
2891        # Check that invalid ancillary data items are rejected.
2892        self.assertEqual(self.serv_sock.recv(1000), b"done")
2893
2894    def _testSendmsgBadCmsg(self):
2895        self.assertRaises(TypeError, self.sendmsgToServer,
2896                          [MSG], [object()])
2897        self.assertRaises(TypeError, self.sendmsgToServer,
2898                          [MSG], [(object(), 0, b"data")])
2899        self.assertRaises(TypeError, self.sendmsgToServer,
2900                          [MSG], [(0, object(), b"data")])
2901        self.assertRaises(TypeError, self.sendmsgToServer,
2902                          [MSG], [(0, 0, object())])
2903        self.assertRaises(TypeError, self.sendmsgToServer,
2904                          [MSG], [(0, 0)])
2905        self.assertRaises(TypeError, self.sendmsgToServer,
2906                          [MSG], [(0, 0, b"data", 42)])
2907        self.sendToServer(b"done")
2908
2909    @requireAttrs(socket, "CMSG_SPACE")
2910    def testSendmsgBadMultiCmsg(self):
2911        # Check that invalid ancillary data items are rejected when
2912        # more than one item is present.
2913        self.assertEqual(self.serv_sock.recv(1000), b"done")
2914
2915    @testSendmsgBadMultiCmsg.client_skip
2916    def _testSendmsgBadMultiCmsg(self):
2917        self.assertRaises(TypeError, self.sendmsgToServer,
2918                          [MSG], [0, 0, b""])
2919        self.assertRaises(TypeError, self.sendmsgToServer,
2920                          [MSG], [(0, 0, b""), object()])
2921        self.sendToServer(b"done")
2922
2923    def testSendmsgExcessCmsgReject(self):
2924        # Check that sendmsg() rejects excess ancillary data items
2925        # when the number that can be sent is limited.
2926        self.assertEqual(self.serv_sock.recv(1000), b"done")
2927
2928    def _testSendmsgExcessCmsgReject(self):
2929        if not hasattr(socket, "CMSG_SPACE"):
2930            # Can only send one item
2931            with self.assertRaises(OSError) as cm:
2932                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
2933            self.assertIsNone(cm.exception.errno)
2934        self.sendToServer(b"done")
2935
2936    def testSendmsgAfterClose(self):
2937        # Check that sendmsg() fails on a closed socket.
2938        pass
2939
2940    def _testSendmsgAfterClose(self):
2941        self.cli_sock.close()
2942        self.assertRaises(OSError, self.sendmsgToServer, [MSG])
2943
2944
2945class SendmsgStreamTests(SendmsgTests):
2946    # Tests for sendmsg() which require a stream socket and do not
2947    # involve recvmsg() or recvmsg_into().
2948
2949    def testSendmsgExplicitNoneAddr(self):
2950        # Check that peer address can be specified as None.
2951        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2952
2953    def _testSendmsgExplicitNoneAddr(self):
2954        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
2955
2956    def testSendmsgTimeout(self):
2957        # Check that timeout works with sendmsg().
2958        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2959        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2960
2961    def _testSendmsgTimeout(self):
2962        try:
2963            self.cli_sock.settimeout(0.03)
2964            try:
2965                while True:
2966                    self.sendmsgToServer([b"a"*512])
2967            except TimeoutError:
2968                pass
2969            except OSError as exc:
2970                if exc.errno != errno.ENOMEM:
2971                    raise
2972                # bpo-33937 the test randomly fails on Travis CI with
2973                # "OSError: [Errno 12] Cannot allocate memory"
2974            else:
2975                self.fail("TimeoutError not raised")
2976        finally:
2977            self.misc_event.set()
2978
2979    # XXX: would be nice to have more tests for sendmsg flags argument.
2980
2981    # Linux supports MSG_DONTWAIT when sending, but in general, it
2982    # only works when receiving.  Could add other platforms if they
2983    # support it too.
2984    @skipWithClientIf(sys.platform not in {"linux"},
2985                      "MSG_DONTWAIT not known to work on this platform when "
2986                      "sending")
2987    def testSendmsgDontWait(self):
2988        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
2989        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2990        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2991
2992    @testSendmsgDontWait.client_skip
2993    def _testSendmsgDontWait(self):
2994        try:
2995            with self.assertRaises(OSError) as cm:
2996                while True:
2997                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
2998            # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI
2999            # with "OSError: [Errno 12] Cannot allocate memory"
3000            self.assertIn(cm.exception.errno,
3001                          (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM))
3002        finally:
3003            self.misc_event.set()
3004
3005
3006class SendmsgConnectionlessTests(SendmsgTests):
3007    # Tests for sendmsg() which require a connectionless-mode
3008    # (e.g. datagram) socket, and do not involve recvmsg() or
3009    # recvmsg_into().
3010
3011    def testSendmsgNoDestAddr(self):
3012        # Check that sendmsg() fails when no destination address is
3013        # given for unconnected socket.
3014        pass
3015
3016    def _testSendmsgNoDestAddr(self):
3017        self.assertRaises(OSError, self.cli_sock.sendmsg,
3018                          [MSG])
3019        self.assertRaises(OSError, self.cli_sock.sendmsg,
3020                          [MSG], [], 0, None)
3021
3022
3023class RecvmsgGenericTests(SendrecvmsgBase):
3024    # Tests for recvmsg() which can also be emulated using
3025    # recvmsg_into(), and can use any socket type.
3026
3027    def testRecvmsg(self):
3028        # Receive a simple message with recvmsg[_into]().
3029        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3030        self.assertEqual(msg, MSG)
3031        self.checkRecvmsgAddress(addr, self.cli_addr)
3032        self.assertEqual(ancdata, [])
3033        self.checkFlags(flags, eor=True)
3034
3035    def _testRecvmsg(self):
3036        self.sendToServer(MSG)
3037
3038    def testRecvmsgExplicitDefaults(self):
3039        # Test recvmsg[_into]() with default arguments provided explicitly.
3040        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3041                                                   len(MSG), 0, 0)
3042        self.assertEqual(msg, MSG)
3043        self.checkRecvmsgAddress(addr, self.cli_addr)
3044        self.assertEqual(ancdata, [])
3045        self.checkFlags(flags, eor=True)
3046
3047    def _testRecvmsgExplicitDefaults(self):
3048        self.sendToServer(MSG)
3049
3050    def testRecvmsgShorter(self):
3051        # Receive a message smaller than buffer.
3052        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3053                                                   len(MSG) + 42)
3054        self.assertEqual(msg, MSG)
3055        self.checkRecvmsgAddress(addr, self.cli_addr)
3056        self.assertEqual(ancdata, [])
3057        self.checkFlags(flags, eor=True)
3058
3059    def _testRecvmsgShorter(self):
3060        self.sendToServer(MSG)
3061
3062    def testRecvmsgTrunc(self):
3063        # Receive part of message, check for truncation indicators.
3064        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3065                                                   len(MSG) - 3)
3066        self.assertEqual(msg, MSG[:-3])
3067        self.checkRecvmsgAddress(addr, self.cli_addr)
3068        self.assertEqual(ancdata, [])
3069        self.checkFlags(flags, eor=False)
3070
3071    def _testRecvmsgTrunc(self):
3072        self.sendToServer(MSG)
3073
3074    def testRecvmsgShortAncillaryBuf(self):
3075        # Test ancillary data buffer too small to hold any ancillary data.
3076        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3077                                                   len(MSG), 1)
3078        self.assertEqual(msg, MSG)
3079        self.checkRecvmsgAddress(addr, self.cli_addr)
3080        self.assertEqual(ancdata, [])
3081        self.checkFlags(flags, eor=True)
3082
3083    def _testRecvmsgShortAncillaryBuf(self):
3084        self.sendToServer(MSG)
3085
3086    def testRecvmsgLongAncillaryBuf(self):
3087        # Test large ancillary data buffer.
3088        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3089                                                   len(MSG), 10240)
3090        self.assertEqual(msg, MSG)
3091        self.checkRecvmsgAddress(addr, self.cli_addr)
3092        self.assertEqual(ancdata, [])
3093        self.checkFlags(flags, eor=True)
3094
3095    def _testRecvmsgLongAncillaryBuf(self):
3096        self.sendToServer(MSG)
3097
3098    def testRecvmsgAfterClose(self):
3099        # Check that recvmsg[_into]() fails on a closed socket.
3100        self.serv_sock.close()
3101        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
3102
3103    def _testRecvmsgAfterClose(self):
3104        pass
3105
3106    def testRecvmsgTimeout(self):
3107        # Check that timeout works.
3108        try:
3109            self.serv_sock.settimeout(0.03)
3110            self.assertRaises(TimeoutError,
3111                              self.doRecvmsg, self.serv_sock, len(MSG))
3112        finally:
3113            self.misc_event.set()
3114
3115    def _testRecvmsgTimeout(self):
3116        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3117
3118    @requireAttrs(socket, "MSG_PEEK")
3119    def testRecvmsgPeek(self):
3120        # Check that MSG_PEEK in flags enables examination of pending
3121        # data without consuming it.
3122
3123        # Receive part of data with MSG_PEEK.
3124        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3125                                                   len(MSG) - 3, 0,
3126                                                   socket.MSG_PEEK)
3127        self.assertEqual(msg, MSG[:-3])
3128        self.checkRecvmsgAddress(addr, self.cli_addr)
3129        self.assertEqual(ancdata, [])
3130        # Ignoring MSG_TRUNC here (so this test is the same for stream
3131        # and datagram sockets).  Some wording in POSIX seems to
3132        # suggest that it needn't be set when peeking, but that may
3133        # just be a slip.
3134        self.checkFlags(flags, eor=False,
3135                        ignore=getattr(socket, "MSG_TRUNC", 0))
3136
3137        # Receive all data with MSG_PEEK.
3138        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3139                                                   len(MSG), 0,
3140                                                   socket.MSG_PEEK)
3141        self.assertEqual(msg, MSG)
3142        self.checkRecvmsgAddress(addr, self.cli_addr)
3143        self.assertEqual(ancdata, [])
3144        self.checkFlags(flags, eor=True)
3145
3146        # Check that the same data can still be received normally.
3147        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3148        self.assertEqual(msg, MSG)
3149        self.checkRecvmsgAddress(addr, self.cli_addr)
3150        self.assertEqual(ancdata, [])
3151        self.checkFlags(flags, eor=True)
3152
3153    @testRecvmsgPeek.client_skip
3154    def _testRecvmsgPeek(self):
3155        self.sendToServer(MSG)
3156
3157    @requireAttrs(socket.socket, "sendmsg")
3158    def testRecvmsgFromSendmsg(self):
3159        # Test receiving with recvmsg[_into]() when message is sent
3160        # using sendmsg().
3161        self.serv_sock.settimeout(self.fail_timeout)
3162        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3163        self.assertEqual(msg, MSG)
3164        self.checkRecvmsgAddress(addr, self.cli_addr)
3165        self.assertEqual(ancdata, [])
3166        self.checkFlags(flags, eor=True)
3167
3168    @testRecvmsgFromSendmsg.client_skip
3169    def _testRecvmsgFromSendmsg(self):
3170        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
3171
3172
3173class RecvmsgGenericStreamTests(RecvmsgGenericTests):
3174    # Tests which require a stream socket and can use either recvmsg()
3175    # or recvmsg_into().
3176
3177    def testRecvmsgEOF(self):
3178        # Receive end-of-stream indicator (b"", peer socket closed).
3179        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3180        self.assertEqual(msg, b"")
3181        self.checkRecvmsgAddress(addr, self.cli_addr)
3182        self.assertEqual(ancdata, [])
3183        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
3184
3185    def _testRecvmsgEOF(self):
3186        self.cli_sock.close()
3187
3188    def testRecvmsgOverflow(self):
3189        # Receive a message in more than one chunk.
3190        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3191                                                    len(MSG) - 3)
3192        self.checkRecvmsgAddress(addr, self.cli_addr)
3193        self.assertEqual(ancdata, [])
3194        self.checkFlags(flags, eor=False)
3195
3196        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3197        self.checkRecvmsgAddress(addr, self.cli_addr)
3198        self.assertEqual(ancdata, [])
3199        self.checkFlags(flags, eor=True)
3200
3201        msg = seg1 + seg2
3202        self.assertEqual(msg, MSG)
3203
3204    def _testRecvmsgOverflow(self):
3205        self.sendToServer(MSG)
3206
3207
3208class RecvmsgTests(RecvmsgGenericTests):
3209    # Tests for recvmsg() which can use any socket type.
3210
3211    def testRecvmsgBadArgs(self):
3212        # Check that recvmsg() rejects invalid arguments.
3213        self.assertRaises(TypeError, self.serv_sock.recvmsg)
3214        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3215                          -1, 0, 0)
3216        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3217                          len(MSG), -1, 0)
3218        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3219                          [bytearray(10)], 0, 0)
3220        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3221                          object(), 0, 0)
3222        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3223                          len(MSG), object(), 0)
3224        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3225                          len(MSG), 0, object())
3226
3227        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
3228        self.assertEqual(msg, MSG)
3229        self.checkRecvmsgAddress(addr, self.cli_addr)
3230        self.assertEqual(ancdata, [])
3231        self.checkFlags(flags, eor=True)
3232
3233    def _testRecvmsgBadArgs(self):
3234        self.sendToServer(MSG)
3235
3236
3237class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
3238    # Tests for recvmsg_into() which can use any socket type.
3239
3240    def testRecvmsgIntoBadArgs(self):
3241        # Check that recvmsg_into() rejects invalid arguments.
3242        buf = bytearray(len(MSG))
3243        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
3244        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3245                          len(MSG), 0, 0)
3246        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3247                          buf, 0, 0)
3248        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3249                          [object()], 0, 0)
3250        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3251                          [b"I'm not writable"], 0, 0)
3252        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3253                          [buf, object()], 0, 0)
3254        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
3255                          [buf], -1, 0)
3256        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3257                          [buf], object(), 0)
3258        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3259                          [buf], 0, object())
3260
3261        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
3262        self.assertEqual(nbytes, len(MSG))
3263        self.assertEqual(buf, bytearray(MSG))
3264        self.checkRecvmsgAddress(addr, self.cli_addr)
3265        self.assertEqual(ancdata, [])
3266        self.checkFlags(flags, eor=True)
3267
3268    def _testRecvmsgIntoBadArgs(self):
3269        self.sendToServer(MSG)
3270
3271    def testRecvmsgIntoGenerator(self):
3272        # Receive into buffer obtained from a generator (not a sequence).
3273        buf = bytearray(len(MSG))
3274        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3275            (o for o in [buf]))
3276        self.assertEqual(nbytes, len(MSG))
3277        self.assertEqual(buf, bytearray(MSG))
3278        self.checkRecvmsgAddress(addr, self.cli_addr)
3279        self.assertEqual(ancdata, [])
3280        self.checkFlags(flags, eor=True)
3281
3282    def _testRecvmsgIntoGenerator(self):
3283        self.sendToServer(MSG)
3284
3285    def testRecvmsgIntoArray(self):
3286        # Receive into an array rather than the usual bytearray.
3287        buf = array.array("B", [0] * len(MSG))
3288        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
3289        self.assertEqual(nbytes, len(MSG))
3290        self.assertEqual(buf.tobytes(), MSG)
3291        self.checkRecvmsgAddress(addr, self.cli_addr)
3292        self.assertEqual(ancdata, [])
3293        self.checkFlags(flags, eor=True)
3294
3295    def _testRecvmsgIntoArray(self):
3296        self.sendToServer(MSG)
3297
3298    def testRecvmsgIntoScatter(self):
3299        # Receive into multiple buffers (scatter write).
3300        b1 = bytearray(b"----")
3301        b2 = bytearray(b"0123456789")
3302        b3 = bytearray(b"--------------")
3303        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3304            [b1, memoryview(b2)[2:9], b3])
3305        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
3306        self.assertEqual(b1, bytearray(b"Mary"))
3307        self.assertEqual(b2, bytearray(b"01 had a 9"))
3308        self.assertEqual(b3, bytearray(b"little lamb---"))
3309        self.checkRecvmsgAddress(addr, self.cli_addr)
3310        self.assertEqual(ancdata, [])
3311        self.checkFlags(flags, eor=True)
3312
3313    def _testRecvmsgIntoScatter(self):
3314        self.sendToServer(b"Mary had a little lamb")
3315
3316
3317class CmsgMacroTests(unittest.TestCase):
3318    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
3319    # assumptions used by sendmsg() and recvmsg[_into](), which share
3320    # code with these functions.
3321
3322    # Match the definition in socketmodule.c
3323    try:
3324        import _testcapi
3325    except ImportError:
3326        socklen_t_limit = 0x7fffffff
3327    else:
3328        socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
3329
3330    @requireAttrs(socket, "CMSG_LEN")
3331    def testCMSG_LEN(self):
3332        # Test CMSG_LEN() with various valid and invalid values,
3333        # checking the assumptions used by recvmsg() and sendmsg().
3334        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
3335        values = list(range(257)) + list(range(toobig - 257, toobig))
3336
3337        # struct cmsghdr has at least three members, two of which are ints
3338        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
3339        for n in values:
3340            ret = socket.CMSG_LEN(n)
3341            # This is how recvmsg() calculates the data size
3342            self.assertEqual(ret - socket.CMSG_LEN(0), n)
3343            self.assertLessEqual(ret, self.socklen_t_limit)
3344
3345        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
3346        # sendmsg() shares code with these functions, and requires
3347        # that it reject values over the limit.
3348        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
3349        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
3350
3351    @requireAttrs(socket, "CMSG_SPACE")
3352    def testCMSG_SPACE(self):
3353        # Test CMSG_SPACE() with various valid and invalid values,
3354        # checking the assumptions used by sendmsg().
3355        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
3356        values = list(range(257)) + list(range(toobig - 257, toobig))
3357
3358        last = socket.CMSG_SPACE(0)
3359        # struct cmsghdr has at least three members, two of which are ints
3360        self.assertGreater(last, array.array("i").itemsize * 2)
3361        for n in values:
3362            ret = socket.CMSG_SPACE(n)
3363            self.assertGreaterEqual(ret, last)
3364            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
3365            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
3366            self.assertLessEqual(ret, self.socklen_t_limit)
3367            last = ret
3368
3369        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
3370        # sendmsg() shares code with these functions, and requires
3371        # that it reject values over the limit.
3372        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
3373        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
3374
3375
3376class SCMRightsTest(SendrecvmsgServerTimeoutBase):
3377    # Tests for file descriptor passing on Unix-domain sockets.
3378
3379    # Invalid file descriptor value that's unlikely to evaluate to a
3380    # real FD even if one of its bytes is replaced with a different
3381    # value (which shouldn't actually happen).
3382    badfd = -0x5555
3383
3384    def newFDs(self, n):
3385        # Return a list of n file descriptors for newly-created files
3386        # containing their list indices as ASCII numbers.
3387        fds = []
3388        for i in range(n):
3389            fd, path = tempfile.mkstemp()
3390            self.addCleanup(os.unlink, path)
3391            self.addCleanup(os.close, fd)
3392            os.write(fd, str(i).encode())
3393            fds.append(fd)
3394        return fds
3395
3396    def checkFDs(self, fds):
3397        # Check that the file descriptors in the given list contain
3398        # their correct list indices as ASCII numbers.
3399        for n, fd in enumerate(fds):
3400            os.lseek(fd, 0, os.SEEK_SET)
3401            self.assertEqual(os.read(fd, 1024), str(n).encode())
3402
3403    def registerRecvmsgResult(self, result):
3404        self.addCleanup(self.closeRecvmsgFDs, result)
3405
3406    def closeRecvmsgFDs(self, recvmsg_result):
3407        # Close all file descriptors specified in the ancillary data
3408        # of the given return value from recvmsg() or recvmsg_into().
3409        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
3410            if (cmsg_level == socket.SOL_SOCKET and
3411                    cmsg_type == socket.SCM_RIGHTS):
3412                fds = array.array("i")
3413                fds.frombytes(cmsg_data[:
3414                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3415                for fd in fds:
3416                    os.close(fd)
3417
3418    def createAndSendFDs(self, n):
3419        # Send n new file descriptors created by newFDs() to the
3420        # server, with the constant MSG as the non-ancillary data.
3421        self.assertEqual(
3422            self.sendmsgToServer([MSG],
3423                                 [(socket.SOL_SOCKET,
3424                                   socket.SCM_RIGHTS,
3425                                   array.array("i", self.newFDs(n)))]),
3426            len(MSG))
3427
3428    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
3429        # Check that constant MSG was received with numfds file
3430        # descriptors in a maximum of maxcmsgs control messages (which
3431        # must contain only complete integers).  By default, check
3432        # that MSG_CTRUNC is unset, but ignore any flags in
3433        # ignoreflags.
3434        msg, ancdata, flags, addr = result
3435        self.assertEqual(msg, MSG)
3436        self.checkRecvmsgAddress(addr, self.cli_addr)
3437        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3438                        ignore=ignoreflags)
3439
3440        self.assertIsInstance(ancdata, list)
3441        self.assertLessEqual(len(ancdata), maxcmsgs)
3442        fds = array.array("i")
3443        for item in ancdata:
3444            self.assertIsInstance(item, tuple)
3445            cmsg_level, cmsg_type, cmsg_data = item
3446            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3447            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3448            self.assertIsInstance(cmsg_data, bytes)
3449            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
3450            fds.frombytes(cmsg_data)
3451
3452        self.assertEqual(len(fds), numfds)
3453        self.checkFDs(fds)
3454
3455    def testFDPassSimple(self):
3456        # Pass a single FD (array read from bytes object).
3457        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
3458                                               len(MSG), 10240))
3459
3460    def _testFDPassSimple(self):
3461        self.assertEqual(
3462            self.sendmsgToServer(
3463                [MSG],
3464                [(socket.SOL_SOCKET,
3465                  socket.SCM_RIGHTS,
3466                  array.array("i", self.newFDs(1)).tobytes())]),
3467            len(MSG))
3468
3469    def testMultipleFDPass(self):
3470        # Pass multiple FDs in a single array.
3471        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
3472                                               len(MSG), 10240))
3473
3474    def _testMultipleFDPass(self):
3475        self.createAndSendFDs(4)
3476
3477    @requireAttrs(socket, "CMSG_SPACE")
3478    def testFDPassCMSG_SPACE(self):
3479        # Test using CMSG_SPACE() to calculate ancillary buffer size.
3480        self.checkRecvmsgFDs(
3481            4, self.doRecvmsg(self.serv_sock, len(MSG),
3482                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
3483
3484    @testFDPassCMSG_SPACE.client_skip
3485    def _testFDPassCMSG_SPACE(self):
3486        self.createAndSendFDs(4)
3487
3488    def testFDPassCMSG_LEN(self):
3489        # Test using CMSG_LEN() to calculate ancillary buffer size.
3490        self.checkRecvmsgFDs(1,
3491                             self.doRecvmsg(self.serv_sock, len(MSG),
3492                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
3493                             # RFC 3542 says implementations may set
3494                             # MSG_CTRUNC if there isn't enough space
3495                             # for trailing padding.
3496                             ignoreflags=socket.MSG_CTRUNC)
3497
3498    def _testFDPassCMSG_LEN(self):
3499        self.createAndSendFDs(1)
3500
3501    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3502    @unittest.skipIf(AIX, "skipping, see issue #22397")
3503    @requireAttrs(socket, "CMSG_SPACE")
3504    def testFDPassSeparate(self):
3505        # Pass two FDs in two separate arrays.  Arrays may be combined
3506        # into a single control message by the OS.
3507        self.checkRecvmsgFDs(2,
3508                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
3509                             maxcmsgs=2)
3510
3511    @testFDPassSeparate.client_skip
3512    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3513    @unittest.skipIf(AIX, "skipping, see issue #22397")
3514    def _testFDPassSeparate(self):
3515        fd0, fd1 = self.newFDs(2)
3516        self.assertEqual(
3517            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3518                                          socket.SCM_RIGHTS,
3519                                          array.array("i", [fd0])),
3520                                         (socket.SOL_SOCKET,
3521                                          socket.SCM_RIGHTS,
3522                                          array.array("i", [fd1]))]),
3523            len(MSG))
3524
3525    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3526    @unittest.skipIf(AIX, "skipping, see issue #22397")
3527    @requireAttrs(socket, "CMSG_SPACE")
3528    def testFDPassSeparateMinSpace(self):
3529        # Pass two FDs in two separate arrays, receiving them into the
3530        # minimum space for two arrays.
3531        num_fds = 2
3532        self.checkRecvmsgFDs(num_fds,
3533                             self.doRecvmsg(self.serv_sock, len(MSG),
3534                                            socket.CMSG_SPACE(SIZEOF_INT) +
3535                                            socket.CMSG_LEN(SIZEOF_INT * num_fds)),
3536                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
3537
3538    @testFDPassSeparateMinSpace.client_skip
3539    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3540    @unittest.skipIf(AIX, "skipping, see issue #22397")
3541    def _testFDPassSeparateMinSpace(self):
3542        fd0, fd1 = self.newFDs(2)
3543        self.assertEqual(
3544            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3545                                          socket.SCM_RIGHTS,
3546                                          array.array("i", [fd0])),
3547                                         (socket.SOL_SOCKET,
3548                                          socket.SCM_RIGHTS,
3549                                          array.array("i", [fd1]))]),
3550            len(MSG))
3551
3552    def sendAncillaryIfPossible(self, msg, ancdata):
3553        # Try to send msg and ancdata to server, but if the system
3554        # call fails, just send msg with no ancillary data.
3555        try:
3556            nbytes = self.sendmsgToServer([msg], ancdata)
3557        except OSError as e:
3558            # Check that it was the system call that failed
3559            self.assertIsInstance(e.errno, int)
3560            nbytes = self.sendmsgToServer([msg])
3561        self.assertEqual(nbytes, len(msg))
3562
3563    @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
3564    def testFDPassEmpty(self):
3565        # Try to pass an empty FD array.  Can receive either no array
3566        # or an empty array.
3567        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
3568                                               len(MSG), 10240),
3569                             ignoreflags=socket.MSG_CTRUNC)
3570
3571    def _testFDPassEmpty(self):
3572        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
3573                                            socket.SCM_RIGHTS,
3574                                            b"")])
3575
3576    def testFDPassPartialInt(self):
3577        # Try to pass a truncated FD array.
3578        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3579                                                   len(MSG), 10240)
3580        self.assertEqual(msg, MSG)
3581        self.checkRecvmsgAddress(addr, self.cli_addr)
3582        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3583        self.assertLessEqual(len(ancdata), 1)
3584        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3585            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3586            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3587            self.assertLess(len(cmsg_data), SIZEOF_INT)
3588
3589    def _testFDPassPartialInt(self):
3590        self.sendAncillaryIfPossible(
3591            MSG,
3592            [(socket.SOL_SOCKET,
3593              socket.SCM_RIGHTS,
3594              array.array("i", [self.badfd]).tobytes()[:-1])])
3595
3596    @requireAttrs(socket, "CMSG_SPACE")
3597    def testFDPassPartialIntInMiddle(self):
3598        # Try to pass two FD arrays, the first of which is truncated.
3599        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3600                                                   len(MSG), 10240)
3601        self.assertEqual(msg, MSG)
3602        self.checkRecvmsgAddress(addr, self.cli_addr)
3603        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3604        self.assertLessEqual(len(ancdata), 2)
3605        fds = array.array("i")
3606        # Arrays may have been combined in a single control message
3607        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3608            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3609            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3610            fds.frombytes(cmsg_data[:
3611                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3612        self.assertLessEqual(len(fds), 2)
3613        self.checkFDs(fds)
3614
3615    @testFDPassPartialIntInMiddle.client_skip
3616    def _testFDPassPartialIntInMiddle(self):
3617        fd0, fd1 = self.newFDs(2)
3618        self.sendAncillaryIfPossible(
3619            MSG,
3620            [(socket.SOL_SOCKET,
3621              socket.SCM_RIGHTS,
3622              array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
3623             (socket.SOL_SOCKET,
3624              socket.SCM_RIGHTS,
3625              array.array("i", [fd1]))])
3626
3627    def checkTruncatedHeader(self, result, ignoreflags=0):
3628        # Check that no ancillary data items are returned when data is
3629        # truncated inside the cmsghdr structure.
3630        msg, ancdata, flags, addr = result
3631        self.assertEqual(msg, MSG)
3632        self.checkRecvmsgAddress(addr, self.cli_addr)
3633        self.assertEqual(ancdata, [])
3634        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3635                        ignore=ignoreflags)
3636
3637    def testCmsgTruncNoBufSize(self):
3638        # Check that no ancillary data is received when no buffer size
3639        # is specified.
3640        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
3641                                  # BSD seems to set MSG_CTRUNC only
3642                                  # if an item has been partially
3643                                  # received.
3644                                  ignoreflags=socket.MSG_CTRUNC)
3645
3646    def _testCmsgTruncNoBufSize(self):
3647        self.createAndSendFDs(1)
3648
3649    def testCmsgTrunc0(self):
3650        # Check that no ancillary data is received when buffer size is 0.
3651        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
3652                                  ignoreflags=socket.MSG_CTRUNC)
3653
3654    def _testCmsgTrunc0(self):
3655        self.createAndSendFDs(1)
3656
3657    # Check that no ancillary data is returned for various non-zero
3658    # (but still too small) buffer sizes.
3659
3660    def testCmsgTrunc1(self):
3661        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
3662
3663    def _testCmsgTrunc1(self):
3664        self.createAndSendFDs(1)
3665
3666    def testCmsgTrunc2Int(self):
3667        # The cmsghdr structure has at least three members, two of
3668        # which are ints, so we still shouldn't see any ancillary
3669        # data.
3670        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3671                                                 SIZEOF_INT * 2))
3672
3673    def _testCmsgTrunc2Int(self):
3674        self.createAndSendFDs(1)
3675
3676    def testCmsgTruncLen0Minus1(self):
3677        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3678                                                 socket.CMSG_LEN(0) - 1))
3679
3680    def _testCmsgTruncLen0Minus1(self):
3681        self.createAndSendFDs(1)
3682
3683    # The following tests try to truncate the control message in the
3684    # middle of the FD array.
3685
3686    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3687        # Check that file descriptor data is truncated to between
3688        # mindata and maxdata bytes when received with buffer size
3689        # ancbuf, and that any complete file descriptor numbers are
3690        # valid.
3691        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3692                                                   len(MSG), ancbuf)
3693        self.assertEqual(msg, MSG)
3694        self.checkRecvmsgAddress(addr, self.cli_addr)
3695        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3696
3697        if mindata == 0 and ancdata == []:
3698            return
3699        self.assertEqual(len(ancdata), 1)
3700        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3701        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3702        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3703        self.assertGreaterEqual(len(cmsg_data), mindata)
3704        self.assertLessEqual(len(cmsg_data), maxdata)
3705        fds = array.array("i")
3706        fds.frombytes(cmsg_data[:
3707                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3708        self.checkFDs(fds)
3709
3710    def testCmsgTruncLen0(self):
3711        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3712
3713    def _testCmsgTruncLen0(self):
3714        self.createAndSendFDs(1)
3715
3716    def testCmsgTruncLen0Plus1(self):
3717        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3718
3719    def _testCmsgTruncLen0Plus1(self):
3720        self.createAndSendFDs(2)
3721
3722    def testCmsgTruncLen1(self):
3723        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3724                                 maxdata=SIZEOF_INT)
3725
3726    def _testCmsgTruncLen1(self):
3727        self.createAndSendFDs(2)
3728
3729    def testCmsgTruncLen2Minus1(self):
3730        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3731                                 maxdata=(2 * SIZEOF_INT) - 1)
3732
3733    def _testCmsgTruncLen2Minus1(self):
3734        self.createAndSendFDs(2)
3735
3736
3737class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3738    # Test sendmsg() and recvmsg[_into]() using the ancillary data
3739    # features of the RFC 3542 Advanced Sockets API for IPv6.
3740    # Currently we can only handle certain data items (e.g. traffic
3741    # class, hop limit, MTU discovery and fragmentation settings)
3742    # without resorting to unportable means such as the struct module,
3743    # but the tests here are aimed at testing the ancillary data
3744    # handling in sendmsg() and recvmsg() rather than the IPv6 API
3745    # itself.
3746
3747    # Test value to use when setting hop limit of packet
3748    hop_limit = 2
3749
3750    # Test value to use when setting traffic class of packet.
3751    # -1 means "use kernel default".
3752    traffic_class = -1
3753
3754    def ancillaryMapping(self, ancdata):
3755        # Given ancillary data list ancdata, return a mapping from
3756        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3757        # Check that no (level, type) pair appears more than once.
3758        d = {}
3759        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3760            self.assertNotIn((cmsg_level, cmsg_type), d)
3761            d[(cmsg_level, cmsg_type)] = cmsg_data
3762        return d
3763
3764    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3765        # Receive hop limit into ancbufsize bytes of ancillary data
3766        # space.  Check that data is MSG, ancillary data is not
3767        # truncated (but ignore any flags in ignoreflags), and hop
3768        # limit is between 0 and maxhop inclusive.
3769        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3770                                  socket.IPV6_RECVHOPLIMIT, 1)
3771        self.misc_event.set()
3772        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3773                                                   len(MSG), ancbufsize)
3774
3775        self.assertEqual(msg, MSG)
3776        self.checkRecvmsgAddress(addr, self.cli_addr)
3777        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3778                        ignore=ignoreflags)
3779
3780        self.assertEqual(len(ancdata), 1)
3781        self.assertIsInstance(ancdata[0], tuple)
3782        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3783        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3784        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3785        self.assertIsInstance(cmsg_data, bytes)
3786        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3787        a = array.array("i")
3788        a.frombytes(cmsg_data)
3789        self.assertGreaterEqual(a[0], 0)
3790        self.assertLessEqual(a[0], maxhop)
3791
3792    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3793    def testRecvHopLimit(self):
3794        # Test receiving the packet hop limit as ancillary data.
3795        self.checkHopLimit(ancbufsize=10240)
3796
3797    @testRecvHopLimit.client_skip
3798    def _testRecvHopLimit(self):
3799        # Need to wait until server has asked to receive ancillary
3800        # data, as implementations are not required to buffer it
3801        # otherwise.
3802        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3803        self.sendToServer(MSG)
3804
3805    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3806    def testRecvHopLimitCMSG_SPACE(self):
3807        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3808        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3809
3810    @testRecvHopLimitCMSG_SPACE.client_skip
3811    def _testRecvHopLimitCMSG_SPACE(self):
3812        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3813        self.sendToServer(MSG)
3814
3815    # Could test receiving into buffer sized using CMSG_LEN, but RFC
3816    # 3542 says portable applications must provide space for trailing
3817    # padding.  Implementations may set MSG_CTRUNC if there isn't
3818    # enough space for the padding.
3819
3820    @requireAttrs(socket.socket, "sendmsg")
3821    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3822    def testSetHopLimit(self):
3823        # Test setting hop limit on outgoing packet and receiving it
3824        # at the other end.
3825        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3826
3827    @testSetHopLimit.client_skip
3828    def _testSetHopLimit(self):
3829        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3830        self.assertEqual(
3831            self.sendmsgToServer([MSG],
3832                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3833                                   array.array("i", [self.hop_limit]))]),
3834            len(MSG))
3835
3836    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3837                                     ignoreflags=0):
3838        # Receive traffic class and hop limit into ancbufsize bytes of
3839        # ancillary data space.  Check that data is MSG, ancillary
3840        # data is not truncated (but ignore any flags in ignoreflags),
3841        # and traffic class and hop limit are in range (hop limit no
3842        # more than maxhop).
3843        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3844                                  socket.IPV6_RECVHOPLIMIT, 1)
3845        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3846                                  socket.IPV6_RECVTCLASS, 1)
3847        self.misc_event.set()
3848        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3849                                                   len(MSG), ancbufsize)
3850
3851        self.assertEqual(msg, MSG)
3852        self.checkRecvmsgAddress(addr, self.cli_addr)
3853        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3854                        ignore=ignoreflags)
3855        self.assertEqual(len(ancdata), 2)
3856        ancmap = self.ancillaryMapping(ancdata)
3857
3858        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3859        self.assertEqual(len(tcdata), SIZEOF_INT)
3860        a = array.array("i")
3861        a.frombytes(tcdata)
3862        self.assertGreaterEqual(a[0], 0)
3863        self.assertLessEqual(a[0], 255)
3864
3865        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3866        self.assertEqual(len(hldata), SIZEOF_INT)
3867        a = array.array("i")
3868        a.frombytes(hldata)
3869        self.assertGreaterEqual(a[0], 0)
3870        self.assertLessEqual(a[0], maxhop)
3871
3872    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3873                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3874    def testRecvTrafficClassAndHopLimit(self):
3875        # Test receiving traffic class and hop limit as ancillary data.
3876        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3877
3878    @testRecvTrafficClassAndHopLimit.client_skip
3879    def _testRecvTrafficClassAndHopLimit(self):
3880        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3881        self.sendToServer(MSG)
3882
3883    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3884                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3885    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3886        # Test receiving traffic class and hop limit, using
3887        # CMSG_SPACE() to calculate buffer size.
3888        self.checkTrafficClassAndHopLimit(
3889            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3890
3891    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3892    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3893        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3894        self.sendToServer(MSG)
3895
3896    @requireAttrs(socket.socket, "sendmsg")
3897    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3898                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3899    def testSetTrafficClassAndHopLimit(self):
3900        # Test setting traffic class and hop limit on outgoing packet,
3901        # and receiving them at the other end.
3902        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3903                                          maxhop=self.hop_limit)
3904
3905    @testSetTrafficClassAndHopLimit.client_skip
3906    def _testSetTrafficClassAndHopLimit(self):
3907        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3908        self.assertEqual(
3909            self.sendmsgToServer([MSG],
3910                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3911                                   array.array("i", [self.traffic_class])),
3912                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3913                                   array.array("i", [self.hop_limit]))]),
3914            len(MSG))
3915
3916    @requireAttrs(socket.socket, "sendmsg")
3917    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3918                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3919    def testOddCmsgSize(self):
3920        # Try to send ancillary data with first item one byte too
3921        # long.  Fall back to sending with correct size if this fails,
3922        # and check that second item was handled correctly.
3923        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3924                                          maxhop=self.hop_limit)
3925
3926    @testOddCmsgSize.client_skip
3927    def _testOddCmsgSize(self):
3928        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3929        try:
3930            nbytes = self.sendmsgToServer(
3931                [MSG],
3932                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3933                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
3934                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3935                  array.array("i", [self.hop_limit]))])
3936        except OSError as e:
3937            self.assertIsInstance(e.errno, int)
3938            nbytes = self.sendmsgToServer(
3939                [MSG],
3940                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3941                  array.array("i", [self.traffic_class])),
3942                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3943                  array.array("i", [self.hop_limit]))])
3944            self.assertEqual(nbytes, len(MSG))
3945
3946    # Tests for proper handling of truncated ancillary data
3947
3948    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
3949        # Receive hop limit into ancbufsize bytes of ancillary data
3950        # space, which should be too small to contain the ancillary
3951        # data header (if ancbufsize is None, pass no second argument
3952        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
3953        # (unless included in ignoreflags), and no ancillary data is
3954        # returned.
3955        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3956                                  socket.IPV6_RECVHOPLIMIT, 1)
3957        self.misc_event.set()
3958        args = () if ancbufsize is None else (ancbufsize,)
3959        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3960                                                   len(MSG), *args)
3961
3962        self.assertEqual(msg, MSG)
3963        self.checkRecvmsgAddress(addr, self.cli_addr)
3964        self.assertEqual(ancdata, [])
3965        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3966                        ignore=ignoreflags)
3967
3968    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3969    def testCmsgTruncNoBufSize(self):
3970        # Check that no ancillary data is received when no ancillary
3971        # buffer size is provided.
3972        self.checkHopLimitTruncatedHeader(ancbufsize=None,
3973                                          # BSD seems to set
3974                                          # MSG_CTRUNC only if an item
3975                                          # has been partially
3976                                          # received.
3977                                          ignoreflags=socket.MSG_CTRUNC)
3978
3979    @testCmsgTruncNoBufSize.client_skip
3980    def _testCmsgTruncNoBufSize(self):
3981        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3982        self.sendToServer(MSG)
3983
3984    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3985    def testSingleCmsgTrunc0(self):
3986        # Check that no ancillary data is received when ancillary
3987        # buffer size is zero.
3988        self.checkHopLimitTruncatedHeader(ancbufsize=0,
3989                                          ignoreflags=socket.MSG_CTRUNC)
3990
3991    @testSingleCmsgTrunc0.client_skip
3992    def _testSingleCmsgTrunc0(self):
3993        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3994        self.sendToServer(MSG)
3995
3996    # Check that no ancillary data is returned for various non-zero
3997    # (but still too small) buffer sizes.
3998
3999    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4000    def testSingleCmsgTrunc1(self):
4001        self.checkHopLimitTruncatedHeader(ancbufsize=1)
4002
4003    @testSingleCmsgTrunc1.client_skip
4004    def _testSingleCmsgTrunc1(self):
4005        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4006        self.sendToServer(MSG)
4007
4008    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4009    def testSingleCmsgTrunc2Int(self):
4010        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
4011
4012    @testSingleCmsgTrunc2Int.client_skip
4013    def _testSingleCmsgTrunc2Int(self):
4014        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4015        self.sendToServer(MSG)
4016
4017    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4018    def testSingleCmsgTruncLen0Minus1(self):
4019        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
4020
4021    @testSingleCmsgTruncLen0Minus1.client_skip
4022    def _testSingleCmsgTruncLen0Minus1(self):
4023        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4024        self.sendToServer(MSG)
4025
4026    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4027    def testSingleCmsgTruncInData(self):
4028        # Test truncation of a control message inside its associated
4029        # data.  The message may be returned with its data truncated,
4030        # or not returned at all.
4031        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4032                                  socket.IPV6_RECVHOPLIMIT, 1)
4033        self.misc_event.set()
4034        msg, ancdata, flags, addr = self.doRecvmsg(
4035            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
4036
4037        self.assertEqual(msg, MSG)
4038        self.checkRecvmsgAddress(addr, self.cli_addr)
4039        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4040
4041        self.assertLessEqual(len(ancdata), 1)
4042        if ancdata:
4043            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4044            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4045            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
4046            self.assertLess(len(cmsg_data), SIZEOF_INT)
4047
4048    @testSingleCmsgTruncInData.client_skip
4049    def _testSingleCmsgTruncInData(self):
4050        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4051        self.sendToServer(MSG)
4052
4053    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
4054        # Receive traffic class and hop limit into ancbufsize bytes of
4055        # ancillary data space, which should be large enough to
4056        # contain the first item, but too small to contain the header
4057        # of the second.  Check that data is MSG, MSG_CTRUNC is set
4058        # (unless included in ignoreflags), and only one ancillary
4059        # data item is returned.
4060        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4061                                  socket.IPV6_RECVHOPLIMIT, 1)
4062        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4063                                  socket.IPV6_RECVTCLASS, 1)
4064        self.misc_event.set()
4065        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
4066                                                   len(MSG), ancbufsize)
4067
4068        self.assertEqual(msg, MSG)
4069        self.checkRecvmsgAddress(addr, self.cli_addr)
4070        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
4071                        ignore=ignoreflags)
4072
4073        self.assertEqual(len(ancdata), 1)
4074        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4075        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4076        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
4077        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4078        a = array.array("i")
4079        a.frombytes(cmsg_data)
4080        self.assertGreaterEqual(a[0], 0)
4081        self.assertLessEqual(a[0], 255)
4082
4083    # Try the above test with various buffer sizes.
4084
4085    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4086                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4087    def testSecondCmsgTrunc0(self):
4088        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
4089                                        ignoreflags=socket.MSG_CTRUNC)
4090
4091    @testSecondCmsgTrunc0.client_skip
4092    def _testSecondCmsgTrunc0(self):
4093        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4094        self.sendToServer(MSG)
4095
4096    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4097                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4098    def testSecondCmsgTrunc1(self):
4099        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
4100
4101    @testSecondCmsgTrunc1.client_skip
4102    def _testSecondCmsgTrunc1(self):
4103        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4104        self.sendToServer(MSG)
4105
4106    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4107                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4108    def testSecondCmsgTrunc2Int(self):
4109        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4110                                        2 * SIZEOF_INT)
4111
4112    @testSecondCmsgTrunc2Int.client_skip
4113    def _testSecondCmsgTrunc2Int(self):
4114        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4115        self.sendToServer(MSG)
4116
4117    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4118                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4119    def testSecondCmsgTruncLen0Minus1(self):
4120        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4121                                        socket.CMSG_LEN(0) - 1)
4122
4123    @testSecondCmsgTruncLen0Minus1.client_skip
4124    def _testSecondCmsgTruncLen0Minus1(self):
4125        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4126        self.sendToServer(MSG)
4127
4128    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4129                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4130    def testSecondCmsgTruncInData(self):
4131        # Test truncation of the second of two control messages inside
4132        # its associated data.
4133        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4134                                  socket.IPV6_RECVHOPLIMIT, 1)
4135        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4136                                  socket.IPV6_RECVTCLASS, 1)
4137        self.misc_event.set()
4138        msg, ancdata, flags, addr = self.doRecvmsg(
4139            self.serv_sock, len(MSG),
4140            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
4141
4142        self.assertEqual(msg, MSG)
4143        self.checkRecvmsgAddress(addr, self.cli_addr)
4144        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4145
4146        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
4147
4148        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4149        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4150        cmsg_types.remove(cmsg_type)
4151        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4152        a = array.array("i")
4153        a.frombytes(cmsg_data)
4154        self.assertGreaterEqual(a[0], 0)
4155        self.assertLessEqual(a[0], 255)
4156
4157        if ancdata:
4158            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4159            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4160            cmsg_types.remove(cmsg_type)
4161            self.assertLess(len(cmsg_data), SIZEOF_INT)
4162
4163        self.assertEqual(ancdata, [])
4164
4165    @testSecondCmsgTruncInData.client_skip
4166    def _testSecondCmsgTruncInData(self):
4167        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4168        self.sendToServer(MSG)
4169
4170
4171# Derive concrete test classes for different socket types.
4172
4173class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
4174                             SendrecvmsgConnectionlessBase,
4175                             ThreadedSocketTestMixin, UDPTestBase):
4176    pass
4177
4178@requireAttrs(socket.socket, "sendmsg")
4179class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
4180    pass
4181
4182@requireAttrs(socket.socket, "recvmsg")
4183class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
4184    pass
4185
4186@requireAttrs(socket.socket, "recvmsg_into")
4187class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
4188    pass
4189
4190
4191class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
4192                              SendrecvmsgConnectionlessBase,
4193                              ThreadedSocketTestMixin, UDP6TestBase):
4194
4195    def checkRecvmsgAddress(self, addr1, addr2):
4196        # Called to compare the received address with the address of
4197        # the peer, ignoring scope ID
4198        self.assertEqual(addr1[:-1], addr2[:-1])
4199
4200@requireAttrs(socket.socket, "sendmsg")
4201@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4202@requireSocket("AF_INET6", "SOCK_DGRAM")
4203class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
4204    pass
4205
4206@requireAttrs(socket.socket, "recvmsg")
4207@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4208@requireSocket("AF_INET6", "SOCK_DGRAM")
4209class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
4210    pass
4211
4212@requireAttrs(socket.socket, "recvmsg_into")
4213@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4214@requireSocket("AF_INET6", "SOCK_DGRAM")
4215class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
4216    pass
4217
4218@requireAttrs(socket.socket, "recvmsg")
4219@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4220@requireAttrs(socket, "IPPROTO_IPV6")
4221@requireSocket("AF_INET6", "SOCK_DGRAM")
4222class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
4223                                      SendrecvmsgUDP6TestBase):
4224    pass
4225
4226@requireAttrs(socket.socket, "recvmsg_into")
4227@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4228@requireAttrs(socket, "IPPROTO_IPV6")
4229@requireSocket("AF_INET6", "SOCK_DGRAM")
4230class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
4231                                          RFC3542AncillaryTest,
4232                                          SendrecvmsgUDP6TestBase):
4233    pass
4234
4235
4236@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4237          'UDPLITE sockets required for this test.')
4238class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase,
4239                             SendrecvmsgConnectionlessBase,
4240                             ThreadedSocketTestMixin, UDPLITETestBase):
4241    pass
4242
4243@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4244          'UDPLITE sockets required for this test.')
4245@requireAttrs(socket.socket, "sendmsg")
4246class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase):
4247    pass
4248
4249@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4250          'UDPLITE sockets required for this test.')
4251@requireAttrs(socket.socket, "recvmsg")
4252class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase):
4253    pass
4254
4255@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4256          'UDPLITE sockets required for this test.')
4257@requireAttrs(socket.socket, "recvmsg_into")
4258class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase):
4259    pass
4260
4261
4262@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4263          'UDPLITE sockets required for this test.')
4264class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase,
4265                              SendrecvmsgConnectionlessBase,
4266                              ThreadedSocketTestMixin, UDPLITE6TestBase):
4267
4268    def checkRecvmsgAddress(self, addr1, addr2):
4269        # Called to compare the received address with the address of
4270        # the peer, ignoring scope ID
4271        self.assertEqual(addr1[:-1], addr2[:-1])
4272
4273@requireAttrs(socket.socket, "sendmsg")
4274@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4275@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4276          'UDPLITE sockets required for this test.')
4277@requireSocket("AF_INET6", "SOCK_DGRAM")
4278class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase):
4279    pass
4280
4281@requireAttrs(socket.socket, "recvmsg")
4282@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4283@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4284          'UDPLITE sockets required for this test.')
4285@requireSocket("AF_INET6", "SOCK_DGRAM")
4286class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase):
4287    pass
4288
4289@requireAttrs(socket.socket, "recvmsg_into")
4290@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4291@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4292          'UDPLITE sockets required for this test.')
4293@requireSocket("AF_INET6", "SOCK_DGRAM")
4294class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase):
4295    pass
4296
4297@requireAttrs(socket.socket, "recvmsg")
4298@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4299@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4300          'UDPLITE sockets required for this test.')
4301@requireAttrs(socket, "IPPROTO_IPV6")
4302@requireSocket("AF_INET6", "SOCK_DGRAM")
4303class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest,
4304                                      SendrecvmsgUDPLITE6TestBase):
4305    pass
4306
4307@requireAttrs(socket.socket, "recvmsg_into")
4308@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4309@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4310          'UDPLITE sockets required for this test.')
4311@requireAttrs(socket, "IPPROTO_IPV6")
4312@requireSocket("AF_INET6", "SOCK_DGRAM")
4313class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin,
4314                                          RFC3542AncillaryTest,
4315                                          SendrecvmsgUDPLITE6TestBase):
4316    pass
4317
4318
4319class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
4320                             ConnectedStreamTestMixin, TCPTestBase):
4321    pass
4322
4323@requireAttrs(socket.socket, "sendmsg")
4324class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
4325    pass
4326
4327@requireAttrs(socket.socket, "recvmsg")
4328class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
4329                     SendrecvmsgTCPTestBase):
4330    pass
4331
4332@requireAttrs(socket.socket, "recvmsg_into")
4333class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4334                         SendrecvmsgTCPTestBase):
4335    pass
4336
4337
4338class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
4339                                    SendrecvmsgConnectedBase,
4340                                    ConnectedStreamTestMixin, SCTPStreamBase):
4341    pass
4342
4343@requireAttrs(socket.socket, "sendmsg")
4344@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4345@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4346class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
4347    pass
4348
4349@requireAttrs(socket.socket, "recvmsg")
4350@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4351@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4352class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4353                            SendrecvmsgSCTPStreamTestBase):
4354
4355    def testRecvmsgEOF(self):
4356        try:
4357            super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
4358        except OSError as e:
4359            if e.errno != errno.ENOTCONN:
4360                raise
4361            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4362
4363@requireAttrs(socket.socket, "recvmsg_into")
4364@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4365@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4366class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4367                                SendrecvmsgSCTPStreamTestBase):
4368
4369    def testRecvmsgEOF(self):
4370        try:
4371            super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
4372        except OSError as e:
4373            if e.errno != errno.ENOTCONN:
4374                raise
4375            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4376
4377
4378class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
4379                                    ConnectedStreamTestMixin, UnixStreamBase):
4380    pass
4381
4382@requireAttrs(socket.socket, "sendmsg")
4383@requireAttrs(socket, "AF_UNIX")
4384class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
4385    pass
4386
4387@requireAttrs(socket.socket, "recvmsg")
4388@requireAttrs(socket, "AF_UNIX")
4389class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4390                            SendrecvmsgUnixStreamTestBase):
4391    pass
4392
4393@requireAttrs(socket.socket, "recvmsg_into")
4394@requireAttrs(socket, "AF_UNIX")
4395class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4396                                SendrecvmsgUnixStreamTestBase):
4397    pass
4398
4399@requireAttrs(socket.socket, "sendmsg", "recvmsg")
4400@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4401class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
4402    pass
4403
4404@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
4405@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4406class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
4407                                     SendrecvmsgUnixStreamTestBase):
4408    pass
4409
4410
4411# Test interrupting the interruptible send/receive methods with a
4412# signal when a timeout is set.  These tests avoid having multiple
4413# threads alive during the test so that the OS cannot deliver the
4414# signal to the wrong one.
4415
4416class InterruptedTimeoutBase:
4417    # Base class for interrupted send/receive tests.  Installs an
4418    # empty handler for SIGALRM and removes it on teardown, along with
4419    # any scheduled alarms.
4420
4421    def setUp(self):
4422        super().setUp()
4423        orig_alrm_handler = signal.signal(signal.SIGALRM,
4424                                          lambda signum, frame: 1 / 0)
4425        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
4426
4427    # Timeout for socket operations
4428    timeout = support.LOOPBACK_TIMEOUT
4429
4430    # Provide setAlarm() method to schedule delivery of SIGALRM after
4431    # given number of seconds, or cancel it if zero, and an
4432    # appropriate time value to use.  Use setitimer() if available.
4433    if hasattr(signal, "setitimer"):
4434        alarm_time = 0.05
4435
4436        def setAlarm(self, seconds):
4437            signal.setitimer(signal.ITIMER_REAL, seconds)
4438    else:
4439        # Old systems may deliver the alarm up to one second early
4440        alarm_time = 2
4441
4442        def setAlarm(self, seconds):
4443            signal.alarm(seconds)
4444
4445
4446# Require siginterrupt() in order to ensure that system calls are
4447# interrupted by default.
4448@requireAttrs(signal, "siginterrupt")
4449@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4450                     "Don't have signal.alarm or signal.setitimer")
4451class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
4452    # Test interrupting the recv*() methods with signals when a
4453    # timeout is set.
4454
4455    def setUp(self):
4456        super().setUp()
4457        self.serv.settimeout(self.timeout)
4458
4459    def checkInterruptedRecv(self, func, *args, **kwargs):
4460        # Check that func(*args, **kwargs) raises
4461        # errno of EINTR when interrupted by a signal.
4462        try:
4463            self.setAlarm(self.alarm_time)
4464            with self.assertRaises(ZeroDivisionError) as cm:
4465                func(*args, **kwargs)
4466        finally:
4467            self.setAlarm(0)
4468
4469    def testInterruptedRecvTimeout(self):
4470        self.checkInterruptedRecv(self.serv.recv, 1024)
4471
4472    def testInterruptedRecvIntoTimeout(self):
4473        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
4474
4475    def testInterruptedRecvfromTimeout(self):
4476        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
4477
4478    def testInterruptedRecvfromIntoTimeout(self):
4479        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
4480
4481    @requireAttrs(socket.socket, "recvmsg")
4482    def testInterruptedRecvmsgTimeout(self):
4483        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
4484
4485    @requireAttrs(socket.socket, "recvmsg_into")
4486    def testInterruptedRecvmsgIntoTimeout(self):
4487        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
4488
4489
4490# Require siginterrupt() in order to ensure that system calls are
4491# interrupted by default.
4492@requireAttrs(signal, "siginterrupt")
4493@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4494                     "Don't have signal.alarm or signal.setitimer")
4495class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
4496                                 ThreadSafeCleanupTestCase,
4497                                 SocketListeningTestMixin, TCPTestBase):
4498    # Test interrupting the interruptible send*() methods with signals
4499    # when a timeout is set.
4500
4501    def setUp(self):
4502        super().setUp()
4503        self.serv_conn = self.newSocket()
4504        self.addCleanup(self.serv_conn.close)
4505        # Use a thread to complete the connection, but wait for it to
4506        # terminate before running the test, so that there is only one
4507        # thread to accept the signal.
4508        cli_thread = threading.Thread(target=self.doConnect)
4509        cli_thread.start()
4510        self.cli_conn, addr = self.serv.accept()
4511        self.addCleanup(self.cli_conn.close)
4512        cli_thread.join()
4513        self.serv_conn.settimeout(self.timeout)
4514
4515    def doConnect(self):
4516        self.serv_conn.connect(self.serv_addr)
4517
4518    def checkInterruptedSend(self, func, *args, **kwargs):
4519        # Check that func(*args, **kwargs), run in a loop, raises
4520        # OSError with an errno of EINTR when interrupted by a
4521        # signal.
4522        try:
4523            with self.assertRaises(ZeroDivisionError) as cm:
4524                while True:
4525                    self.setAlarm(self.alarm_time)
4526                    func(*args, **kwargs)
4527        finally:
4528            self.setAlarm(0)
4529
4530    # Issue #12958: The following tests have problems on OS X prior to 10.7
4531    @support.requires_mac_ver(10, 7)
4532    def testInterruptedSendTimeout(self):
4533        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
4534
4535    @support.requires_mac_ver(10, 7)
4536    def testInterruptedSendtoTimeout(self):
4537        # Passing an actual address here as Python's wrapper for
4538        # sendto() doesn't allow passing a zero-length one; POSIX
4539        # requires that the address is ignored since the socket is
4540        # connection-mode, however.
4541        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
4542                                  self.serv_addr)
4543
4544    @support.requires_mac_ver(10, 7)
4545    @requireAttrs(socket.socket, "sendmsg")
4546    def testInterruptedSendmsgTimeout(self):
4547        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
4548
4549
4550class TCPCloserTest(ThreadedTCPSocketTest):
4551
4552    def testClose(self):
4553        conn, addr = self.serv.accept()
4554        conn.close()
4555
4556        sd = self.cli
4557        read, write, err = select.select([sd], [], [], 1.0)
4558        self.assertEqual(read, [sd])
4559        self.assertEqual(sd.recv(1), b'')
4560
4561        # Calling close() many times should be safe.
4562        conn.close()
4563        conn.close()
4564
4565    def _testClose(self):
4566        self.cli.connect((HOST, self.port))
4567        time.sleep(1.0)
4568
4569
4570class BasicSocketPairTest(SocketPairTest):
4571
4572    def __init__(self, methodName='runTest'):
4573        SocketPairTest.__init__(self, methodName=methodName)
4574
4575    def _check_defaults(self, sock):
4576        self.assertIsInstance(sock, socket.socket)
4577        if hasattr(socket, 'AF_UNIX'):
4578            self.assertEqual(sock.family, socket.AF_UNIX)
4579        else:
4580            self.assertEqual(sock.family, socket.AF_INET)
4581        self.assertEqual(sock.type, socket.SOCK_STREAM)
4582        self.assertEqual(sock.proto, 0)
4583
4584    def _testDefaults(self):
4585        self._check_defaults(self.cli)
4586
4587    def testDefaults(self):
4588        self._check_defaults(self.serv)
4589
4590    def testRecv(self):
4591        msg = self.serv.recv(1024)
4592        self.assertEqual(msg, MSG)
4593
4594    def _testRecv(self):
4595        self.cli.send(MSG)
4596
4597    def testSend(self):
4598        self.serv.send(MSG)
4599
4600    def _testSend(self):
4601        msg = self.cli.recv(1024)
4602        self.assertEqual(msg, MSG)
4603
4604
4605class NonBlockingTCPTests(ThreadedTCPSocketTest):
4606
4607    def __init__(self, methodName='runTest'):
4608        self.event = threading.Event()
4609        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
4610
4611    def assert_sock_timeout(self, sock, timeout):
4612        self.assertEqual(self.serv.gettimeout(), timeout)
4613
4614        blocking = (timeout != 0.0)
4615        self.assertEqual(sock.getblocking(), blocking)
4616
4617        if fcntl is not None:
4618            # When a Python socket has a non-zero timeout, it's switched
4619            # internally to a non-blocking mode. Later, sock.sendall(),
4620            # sock.recv(), and other socket operations use a select() call and
4621            # handle EWOULDBLOCK/EGAIN on all socket operations. That's how
4622            # timeouts are enforced.
4623            fd_blocking = (timeout is None)
4624
4625            flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK)
4626            self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking)
4627
4628    def testSetBlocking(self):
4629        # Test setblocking() and settimeout() methods
4630        self.serv.setblocking(True)
4631        self.assert_sock_timeout(self.serv, None)
4632
4633        self.serv.setblocking(False)
4634        self.assert_sock_timeout(self.serv, 0.0)
4635
4636        self.serv.settimeout(None)
4637        self.assert_sock_timeout(self.serv, None)
4638
4639        self.serv.settimeout(0)
4640        self.assert_sock_timeout(self.serv, 0)
4641
4642        self.serv.settimeout(10)
4643        self.assert_sock_timeout(self.serv, 10)
4644
4645        self.serv.settimeout(0)
4646        self.assert_sock_timeout(self.serv, 0)
4647
4648    def _testSetBlocking(self):
4649        pass
4650
4651    @support.cpython_only
4652    def testSetBlocking_overflow(self):
4653        # Issue 15989
4654        import _testcapi
4655        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
4656            self.skipTest('needs UINT_MAX < ULONG_MAX')
4657
4658        self.serv.setblocking(False)
4659        self.assertEqual(self.serv.gettimeout(), 0.0)
4660
4661        self.serv.setblocking(_testcapi.UINT_MAX + 1)
4662        self.assertIsNone(self.serv.gettimeout())
4663
4664    _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
4665
4666    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
4667                         'test needs socket.SOCK_NONBLOCK')
4668    @support.requires_linux_version(2, 6, 28)
4669    def testInitNonBlocking(self):
4670        # create a socket with SOCK_NONBLOCK
4671        self.serv.close()
4672        self.serv = socket.socket(socket.AF_INET,
4673                                  socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
4674        self.assert_sock_timeout(self.serv, 0)
4675
4676    def _testInitNonBlocking(self):
4677        pass
4678
4679    def testInheritFlagsBlocking(self):
4680        # bpo-7995: accept() on a listening socket with a timeout and the
4681        # default timeout is None, the resulting socket must be blocking.
4682        with socket_setdefaulttimeout(None):
4683            self.serv.settimeout(10)
4684            conn, addr = self.serv.accept()
4685            self.addCleanup(conn.close)
4686            self.assertIsNone(conn.gettimeout())
4687
4688    def _testInheritFlagsBlocking(self):
4689        self.cli.connect((HOST, self.port))
4690
4691    def testInheritFlagsTimeout(self):
4692        # bpo-7995: accept() on a listening socket with a timeout and the
4693        # default timeout is None, the resulting socket must inherit
4694        # the default timeout.
4695        default_timeout = 20.0
4696        with socket_setdefaulttimeout(default_timeout):
4697            self.serv.settimeout(10)
4698            conn, addr = self.serv.accept()
4699            self.addCleanup(conn.close)
4700            self.assertEqual(conn.gettimeout(), default_timeout)
4701
4702    def _testInheritFlagsTimeout(self):
4703        self.cli.connect((HOST, self.port))
4704
4705    def testAccept(self):
4706        # Testing non-blocking accept
4707        self.serv.setblocking(False)
4708
4709        # connect() didn't start: non-blocking accept() fails
4710        start_time = time.monotonic()
4711        with self.assertRaises(BlockingIOError):
4712            conn, addr = self.serv.accept()
4713        dt = time.monotonic() - start_time
4714        self.assertLess(dt, 1.0)
4715
4716        self.event.set()
4717
4718        read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT)
4719        if self.serv not in read:
4720            self.fail("Error trying to do accept after select.")
4721
4722        # connect() completed: non-blocking accept() doesn't block
4723        conn, addr = self.serv.accept()
4724        self.addCleanup(conn.close)
4725        self.assertIsNone(conn.gettimeout())
4726
4727    def _testAccept(self):
4728        # don't connect before event is set to check
4729        # that non-blocking accept() raises BlockingIOError
4730        self.event.wait()
4731
4732        self.cli.connect((HOST, self.port))
4733
4734    def testRecv(self):
4735        # Testing non-blocking recv
4736        conn, addr = self.serv.accept()
4737        self.addCleanup(conn.close)
4738        conn.setblocking(False)
4739
4740        # the server didn't send data yet: non-blocking recv() fails
4741        with self.assertRaises(BlockingIOError):
4742            msg = conn.recv(len(MSG))
4743
4744        self.event.set()
4745
4746        read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT)
4747        if conn not in read:
4748            self.fail("Error during select call to non-blocking socket.")
4749
4750        # the server sent data yet: non-blocking recv() doesn't block
4751        msg = conn.recv(len(MSG))
4752        self.assertEqual(msg, MSG)
4753
4754    def _testRecv(self):
4755        self.cli.connect((HOST, self.port))
4756
4757        # don't send anything before event is set to check
4758        # that non-blocking recv() raises BlockingIOError
4759        self.event.wait()
4760
4761        # send data: recv() will no longer block
4762        self.cli.sendall(MSG)
4763
4764
4765class FileObjectClassTestCase(SocketConnectedTest):
4766    """Unit tests for the object returned by socket.makefile()
4767
4768    self.read_file is the io object returned by makefile() on
4769    the client connection.  You can read from this file to
4770    get output from the server.
4771
4772    self.write_file is the io object returned by makefile() on the
4773    server connection.  You can write to this file to send output
4774    to the client.
4775    """
4776
4777    bufsize = -1 # Use default buffer size
4778    encoding = 'utf-8'
4779    errors = 'strict'
4780    newline = None
4781
4782    read_mode = 'rb'
4783    read_msg = MSG
4784    write_mode = 'wb'
4785    write_msg = MSG
4786
4787    def __init__(self, methodName='runTest'):
4788        SocketConnectedTest.__init__(self, methodName=methodName)
4789
4790    def setUp(self):
4791        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4792            threading.Event() for i in range(4)]
4793        SocketConnectedTest.setUp(self)
4794        self.read_file = self.cli_conn.makefile(
4795            self.read_mode, self.bufsize,
4796            encoding = self.encoding,
4797            errors = self.errors,
4798            newline = self.newline)
4799
4800    def tearDown(self):
4801        self.serv_finished.set()
4802        self.read_file.close()
4803        self.assertTrue(self.read_file.closed)
4804        self.read_file = None
4805        SocketConnectedTest.tearDown(self)
4806
4807    def clientSetUp(self):
4808        SocketConnectedTest.clientSetUp(self)
4809        self.write_file = self.serv_conn.makefile(
4810            self.write_mode, self.bufsize,
4811            encoding = self.encoding,
4812            errors = self.errors,
4813            newline = self.newline)
4814
4815    def clientTearDown(self):
4816        self.cli_finished.set()
4817        self.write_file.close()
4818        self.assertTrue(self.write_file.closed)
4819        self.write_file = None
4820        SocketConnectedTest.clientTearDown(self)
4821
4822    def testReadAfterTimeout(self):
4823        # Issue #7322: A file object must disallow further reads
4824        # after a timeout has occurred.
4825        self.cli_conn.settimeout(1)
4826        self.read_file.read(3)
4827        # First read raises a timeout
4828        self.assertRaises(TimeoutError, self.read_file.read, 1)
4829        # Second read is disallowed
4830        with self.assertRaises(OSError) as ctx:
4831            self.read_file.read(1)
4832        self.assertIn("cannot read from timed out object", str(ctx.exception))
4833
4834    def _testReadAfterTimeout(self):
4835        self.write_file.write(self.write_msg[0:3])
4836        self.write_file.flush()
4837        self.serv_finished.wait()
4838
4839    def testSmallRead(self):
4840        # Performing small file read test
4841        first_seg = self.read_file.read(len(self.read_msg)-3)
4842        second_seg = self.read_file.read(3)
4843        msg = first_seg + second_seg
4844        self.assertEqual(msg, self.read_msg)
4845
4846    def _testSmallRead(self):
4847        self.write_file.write(self.write_msg)
4848        self.write_file.flush()
4849
4850    def testFullRead(self):
4851        # read until EOF
4852        msg = self.read_file.read()
4853        self.assertEqual(msg, self.read_msg)
4854
4855    def _testFullRead(self):
4856        self.write_file.write(self.write_msg)
4857        self.write_file.close()
4858
4859    def testUnbufferedRead(self):
4860        # Performing unbuffered file read test
4861        buf = type(self.read_msg)()
4862        while 1:
4863            char = self.read_file.read(1)
4864            if not char:
4865                break
4866            buf += char
4867        self.assertEqual(buf, self.read_msg)
4868
4869    def _testUnbufferedRead(self):
4870        self.write_file.write(self.write_msg)
4871        self.write_file.flush()
4872
4873    def testReadline(self):
4874        # Performing file readline test
4875        line = self.read_file.readline()
4876        self.assertEqual(line, self.read_msg)
4877
4878    def _testReadline(self):
4879        self.write_file.write(self.write_msg)
4880        self.write_file.flush()
4881
4882    def testCloseAfterMakefile(self):
4883        # The file returned by makefile should keep the socket open.
4884        self.cli_conn.close()
4885        # read until EOF
4886        msg = self.read_file.read()
4887        self.assertEqual(msg, self.read_msg)
4888
4889    def _testCloseAfterMakefile(self):
4890        self.write_file.write(self.write_msg)
4891        self.write_file.flush()
4892
4893    def testMakefileAfterMakefileClose(self):
4894        self.read_file.close()
4895        msg = self.cli_conn.recv(len(MSG))
4896        if isinstance(self.read_msg, str):
4897            msg = msg.decode()
4898        self.assertEqual(msg, self.read_msg)
4899
4900    def _testMakefileAfterMakefileClose(self):
4901        self.write_file.write(self.write_msg)
4902        self.write_file.flush()
4903
4904    def testClosedAttr(self):
4905        self.assertTrue(not self.read_file.closed)
4906
4907    def _testClosedAttr(self):
4908        self.assertTrue(not self.write_file.closed)
4909
4910    def testAttributes(self):
4911        self.assertEqual(self.read_file.mode, self.read_mode)
4912        self.assertEqual(self.read_file.name, self.cli_conn.fileno())
4913
4914    def _testAttributes(self):
4915        self.assertEqual(self.write_file.mode, self.write_mode)
4916        self.assertEqual(self.write_file.name, self.serv_conn.fileno())
4917
4918    def testRealClose(self):
4919        self.read_file.close()
4920        self.assertRaises(ValueError, self.read_file.fileno)
4921        self.cli_conn.close()
4922        self.assertRaises(OSError, self.cli_conn.getsockname)
4923
4924    def _testRealClose(self):
4925        pass
4926
4927
4928class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
4929
4930    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
4931
4932    In this case (and in this case only), it should be possible to
4933    create a file object, read a line from it, create another file
4934    object, read another line from it, without loss of data in the
4935    first file object's buffer.  Note that http.client relies on this
4936    when reading multiple requests from the same socket."""
4937
4938    bufsize = 0 # Use unbuffered mode
4939
4940    def testUnbufferedReadline(self):
4941        # Read a line, create a new file object, read another line with it
4942        line = self.read_file.readline() # first line
4943        self.assertEqual(line, b"A. " + self.write_msg) # first line
4944        self.read_file = self.cli_conn.makefile('rb', 0)
4945        line = self.read_file.readline() # second line
4946        self.assertEqual(line, b"B. " + self.write_msg) # second line
4947
4948    def _testUnbufferedReadline(self):
4949        self.write_file.write(b"A. " + self.write_msg)
4950        self.write_file.write(b"B. " + self.write_msg)
4951        self.write_file.flush()
4952
4953    def testMakefileClose(self):
4954        # The file returned by makefile should keep the socket open...
4955        self.cli_conn.close()
4956        msg = self.cli_conn.recv(1024)
4957        self.assertEqual(msg, self.read_msg)
4958        # ...until the file is itself closed
4959        self.read_file.close()
4960        self.assertRaises(OSError, self.cli_conn.recv, 1024)
4961
4962    def _testMakefileClose(self):
4963        self.write_file.write(self.write_msg)
4964        self.write_file.flush()
4965
4966    def testMakefileCloseSocketDestroy(self):
4967        refcount_before = sys.getrefcount(self.cli_conn)
4968        self.read_file.close()
4969        refcount_after = sys.getrefcount(self.cli_conn)
4970        self.assertEqual(refcount_before - 1, refcount_after)
4971
4972    def _testMakefileCloseSocketDestroy(self):
4973        pass
4974
4975    # Non-blocking ops
4976    # NOTE: to set `read_file` as non-blocking, we must call
4977    # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
4978
4979    def testSmallReadNonBlocking(self):
4980        self.cli_conn.setblocking(False)
4981        self.assertEqual(self.read_file.readinto(bytearray(10)), None)
4982        self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
4983        self.evt1.set()
4984        self.evt2.wait(1.0)
4985        first_seg = self.read_file.read(len(self.read_msg) - 3)
4986        if first_seg is None:
4987            # Data not arrived (can happen under Windows), wait a bit
4988            time.sleep(0.5)
4989            first_seg = self.read_file.read(len(self.read_msg) - 3)
4990        buf = bytearray(10)
4991        n = self.read_file.readinto(buf)
4992        self.assertEqual(n, 3)
4993        msg = first_seg + buf[:n]
4994        self.assertEqual(msg, self.read_msg)
4995        self.assertEqual(self.read_file.readinto(bytearray(16)), None)
4996        self.assertEqual(self.read_file.read(1), None)
4997
4998    def _testSmallReadNonBlocking(self):
4999        self.evt1.wait(1.0)
5000        self.write_file.write(self.write_msg)
5001        self.write_file.flush()
5002        self.evt2.set()
5003        # Avoid closing the socket before the server test has finished,
5004        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
5005        self.serv_finished.wait(5.0)
5006
5007    def testWriteNonBlocking(self):
5008        self.cli_finished.wait(5.0)
5009        # The client thread can't skip directly - the SkipTest exception
5010        # would appear as a failure.
5011        if self.serv_skipped:
5012            self.skipTest(self.serv_skipped)
5013
5014    def _testWriteNonBlocking(self):
5015        self.serv_skipped = None
5016        self.serv_conn.setblocking(False)
5017        # Try to saturate the socket buffer pipe with repeated large writes.
5018        BIG = b"x" * support.SOCK_MAX_SIZE
5019        LIMIT = 10
5020        # The first write() succeeds since a chunk of data can be buffered
5021        n = self.write_file.write(BIG)
5022        self.assertGreater(n, 0)
5023        for i in range(LIMIT):
5024            n = self.write_file.write(BIG)
5025            if n is None:
5026                # Succeeded
5027                break
5028            self.assertGreater(n, 0)
5029        else:
5030            # Let us know that this test didn't manage to establish
5031            # the expected conditions. This is not a failure in itself but,
5032            # if it happens repeatedly, the test should be fixed.
5033            self.serv_skipped = "failed to saturate the socket buffer"
5034
5035
5036class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5037
5038    bufsize = 1 # Default-buffered for reading; line-buffered for writing
5039
5040
5041class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5042
5043    bufsize = 2 # Exercise the buffering code
5044
5045
5046class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
5047    """Tests for socket.makefile() in text mode (rather than binary)"""
5048
5049    read_mode = 'r'
5050    read_msg = MSG.decode('utf-8')
5051    write_mode = 'wb'
5052    write_msg = MSG
5053    newline = ''
5054
5055
5056class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
5057    """Tests for socket.makefile() in text mode (rather than binary)"""
5058
5059    read_mode = 'rb'
5060    read_msg = MSG
5061    write_mode = 'w'
5062    write_msg = MSG.decode('utf-8')
5063    newline = ''
5064
5065
5066class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
5067    """Tests for socket.makefile() in text mode (rather than binary)"""
5068
5069    read_mode = 'r'
5070    read_msg = MSG.decode('utf-8')
5071    write_mode = 'w'
5072    write_msg = MSG.decode('utf-8')
5073    newline = ''
5074
5075
5076class NetworkConnectionTest(object):
5077    """Prove network connection."""
5078
5079    def clientSetUp(self):
5080        # We're inherited below by BasicTCPTest2, which also inherits
5081        # BasicTCPTest, which defines self.port referenced below.
5082        self.cli = socket.create_connection((HOST, self.port))
5083        self.serv_conn = self.cli
5084
5085class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
5086    """Tests that NetworkConnection does not break existing TCP functionality.
5087    """
5088
5089class NetworkConnectionNoServer(unittest.TestCase):
5090
5091    class MockSocket(socket.socket):
5092        def connect(self, *args):
5093            raise TimeoutError('timed out')
5094
5095    @contextlib.contextmanager
5096    def mocked_socket_module(self):
5097        """Return a socket which times out on connect"""
5098        old_socket = socket.socket
5099        socket.socket = self.MockSocket
5100        try:
5101            yield
5102        finally:
5103            socket.socket = old_socket
5104
5105    def test_connect(self):
5106        port = socket_helper.find_unused_port()
5107        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
5108        self.addCleanup(cli.close)
5109        with self.assertRaises(OSError) as cm:
5110            cli.connect((HOST, port))
5111        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
5112
5113    def test_create_connection(self):
5114        # Issue #9792: errors raised by create_connection() should have
5115        # a proper errno attribute.
5116        port = socket_helper.find_unused_port()
5117        with self.assertRaises(OSError) as cm:
5118            socket.create_connection((HOST, port))
5119
5120        # Issue #16257: create_connection() calls getaddrinfo() against
5121        # 'localhost'.  This may result in an IPV6 addr being returned
5122        # as well as an IPV4 one:
5123        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
5124        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
5125        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
5126        #
5127        # create_connection() enumerates through all the addresses returned
5128        # and if it doesn't successfully bind to any of them, it propagates
5129        # the last exception it encountered.
5130        #
5131        # On Solaris, ENETUNREACH is returned in this circumstance instead
5132        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
5133        # expected errnos.
5134        expected_errnos = socket_helper.get_socket_conn_refused_errs()
5135        self.assertIn(cm.exception.errno, expected_errnos)
5136
5137    def test_create_connection_timeout(self):
5138        # Issue #9792: create_connection() should not recast timeout errors
5139        # as generic socket errors.
5140        with self.mocked_socket_module():
5141            try:
5142                socket.create_connection((HOST, 1234))
5143            except TimeoutError:
5144                pass
5145            except OSError as exc:
5146                if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
5147                    raise
5148            else:
5149                self.fail('TimeoutError not raised')
5150
5151
5152class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
5153
5154    def __init__(self, methodName='runTest'):
5155        SocketTCPTest.__init__(self, methodName=methodName)
5156        ThreadableTest.__init__(self)
5157
5158    def clientSetUp(self):
5159        self.source_port = socket_helper.find_unused_port()
5160
5161    def clientTearDown(self):
5162        self.cli.close()
5163        self.cli = None
5164        ThreadableTest.clientTearDown(self)
5165
5166    def _justAccept(self):
5167        conn, addr = self.serv.accept()
5168        conn.close()
5169
5170    testFamily = _justAccept
5171    def _testFamily(self):
5172        self.cli = socket.create_connection((HOST, self.port),
5173                            timeout=support.LOOPBACK_TIMEOUT)
5174        self.addCleanup(self.cli.close)
5175        self.assertEqual(self.cli.family, 2)
5176
5177    testSourceAddress = _justAccept
5178    def _testSourceAddress(self):
5179        self.cli = socket.create_connection((HOST, self.port),
5180                            timeout=support.LOOPBACK_TIMEOUT,
5181                            source_address=('', self.source_port))
5182        self.addCleanup(self.cli.close)
5183        self.assertEqual(self.cli.getsockname()[1], self.source_port)
5184        # The port number being used is sufficient to show that the bind()
5185        # call happened.
5186
5187    testTimeoutDefault = _justAccept
5188    def _testTimeoutDefault(self):
5189        # passing no explicit timeout uses socket's global default
5190        self.assertTrue(socket.getdefaulttimeout() is None)
5191        socket.setdefaulttimeout(42)
5192        try:
5193            self.cli = socket.create_connection((HOST, self.port))
5194            self.addCleanup(self.cli.close)
5195        finally:
5196            socket.setdefaulttimeout(None)
5197        self.assertEqual(self.cli.gettimeout(), 42)
5198
5199    testTimeoutNone = _justAccept
5200    def _testTimeoutNone(self):
5201        # None timeout means the same as sock.settimeout(None)
5202        self.assertTrue(socket.getdefaulttimeout() is None)
5203        socket.setdefaulttimeout(30)
5204        try:
5205            self.cli = socket.create_connection((HOST, self.port), timeout=None)
5206            self.addCleanup(self.cli.close)
5207        finally:
5208            socket.setdefaulttimeout(None)
5209        self.assertEqual(self.cli.gettimeout(), None)
5210
5211    testTimeoutValueNamed = _justAccept
5212    def _testTimeoutValueNamed(self):
5213        self.cli = socket.create_connection((HOST, self.port), timeout=30)
5214        self.assertEqual(self.cli.gettimeout(), 30)
5215
5216    testTimeoutValueNonamed = _justAccept
5217    def _testTimeoutValueNonamed(self):
5218        self.cli = socket.create_connection((HOST, self.port), 30)
5219        self.addCleanup(self.cli.close)
5220        self.assertEqual(self.cli.gettimeout(), 30)
5221
5222
5223class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
5224
5225    def __init__(self, methodName='runTest'):
5226        SocketTCPTest.__init__(self, methodName=methodName)
5227        ThreadableTest.__init__(self)
5228
5229    def clientSetUp(self):
5230        pass
5231
5232    def clientTearDown(self):
5233        self.cli.close()
5234        self.cli = None
5235        ThreadableTest.clientTearDown(self)
5236
5237    def testInsideTimeout(self):
5238        conn, addr = self.serv.accept()
5239        self.addCleanup(conn.close)
5240        time.sleep(3)
5241        conn.send(b"done!")
5242    testOutsideTimeout = testInsideTimeout
5243
5244    def _testInsideTimeout(self):
5245        self.cli = sock = socket.create_connection((HOST, self.port))
5246        data = sock.recv(5)
5247        self.assertEqual(data, b"done!")
5248
5249    def _testOutsideTimeout(self):
5250        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
5251        self.assertRaises(TimeoutError, lambda: sock.recv(5))
5252
5253
5254class TCPTimeoutTest(SocketTCPTest):
5255
5256    def testTCPTimeout(self):
5257        def raise_timeout(*args, **kwargs):
5258            self.serv.settimeout(1.0)
5259            self.serv.accept()
5260        self.assertRaises(TimeoutError, raise_timeout,
5261                              "Error generating a timeout exception (TCP)")
5262
5263    def testTimeoutZero(self):
5264        ok = False
5265        try:
5266            self.serv.settimeout(0.0)
5267            foo = self.serv.accept()
5268        except TimeoutError:
5269            self.fail("caught timeout instead of error (TCP)")
5270        except OSError:
5271            ok = True
5272        except:
5273            self.fail("caught unexpected exception (TCP)")
5274        if not ok:
5275            self.fail("accept() returned success when we did not expect it")
5276
5277    @unittest.skipUnless(hasattr(signal, 'alarm'),
5278                         'test needs signal.alarm()')
5279    def testInterruptedTimeout(self):
5280        # XXX I don't know how to do this test on MSWindows or any other
5281        # platform that doesn't support signal.alarm() or os.kill(), though
5282        # the bug should have existed on all platforms.
5283        self.serv.settimeout(5.0)   # must be longer than alarm
5284        class Alarm(Exception):
5285            pass
5286        def alarm_handler(signal, frame):
5287            raise Alarm
5288        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
5289        try:
5290            try:
5291                signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
5292                foo = self.serv.accept()
5293            except TimeoutError:
5294                self.fail("caught timeout instead of Alarm")
5295            except Alarm:
5296                pass
5297            except:
5298                self.fail("caught other exception instead of Alarm:"
5299                          " %s(%s):\n%s" %
5300                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
5301            else:
5302                self.fail("nothing caught")
5303            finally:
5304                signal.alarm(0)         # shut off alarm
5305        except Alarm:
5306            self.fail("got Alarm in wrong place")
5307        finally:
5308            # no alarm can be pending.  Safe to restore old handler.
5309            signal.signal(signal.SIGALRM, old_alarm)
5310
5311class UDPTimeoutTest(SocketUDPTest):
5312
5313    def testUDPTimeout(self):
5314        def raise_timeout(*args, **kwargs):
5315            self.serv.settimeout(1.0)
5316            self.serv.recv(1024)
5317        self.assertRaises(TimeoutError, raise_timeout,
5318                              "Error generating a timeout exception (UDP)")
5319
5320    def testTimeoutZero(self):
5321        ok = False
5322        try:
5323            self.serv.settimeout(0.0)
5324            foo = self.serv.recv(1024)
5325        except TimeoutError:
5326            self.fail("caught timeout instead of error (UDP)")
5327        except OSError:
5328            ok = True
5329        except:
5330            self.fail("caught unexpected exception (UDP)")
5331        if not ok:
5332            self.fail("recv() returned success when we did not expect it")
5333
5334@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
5335          'UDPLITE sockets required for this test.')
5336class UDPLITETimeoutTest(SocketUDPLITETest):
5337
5338    def testUDPLITETimeout(self):
5339        def raise_timeout(*args, **kwargs):
5340            self.serv.settimeout(1.0)
5341            self.serv.recv(1024)
5342        self.assertRaises(TimeoutError, raise_timeout,
5343                              "Error generating a timeout exception (UDPLITE)")
5344
5345    def testTimeoutZero(self):
5346        ok = False
5347        try:
5348            self.serv.settimeout(0.0)
5349            foo = self.serv.recv(1024)
5350        except TimeoutError:
5351            self.fail("caught timeout instead of error (UDPLITE)")
5352        except OSError:
5353            ok = True
5354        except:
5355            self.fail("caught unexpected exception (UDPLITE)")
5356        if not ok:
5357            self.fail("recv() returned success when we did not expect it")
5358
5359class TestExceptions(unittest.TestCase):
5360
5361    def testExceptionTree(self):
5362        self.assertTrue(issubclass(OSError, Exception))
5363        self.assertTrue(issubclass(socket.herror, OSError))
5364        self.assertTrue(issubclass(socket.gaierror, OSError))
5365        self.assertTrue(issubclass(socket.timeout, OSError))
5366        self.assertIs(socket.error, OSError)
5367        self.assertIs(socket.timeout, TimeoutError)
5368
5369    def test_setblocking_invalidfd(self):
5370        # Regression test for issue #28471
5371
5372        sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
5373        sock = socket.socket(
5374            socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
5375        sock0.close()
5376        self.addCleanup(sock.detach)
5377
5378        with self.assertRaises(OSError):
5379            sock.setblocking(False)
5380
5381
5382@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
5383class TestLinuxAbstractNamespace(unittest.TestCase):
5384
5385    UNIX_PATH_MAX = 108
5386
5387    def testLinuxAbstractNamespace(self):
5388        address = b"\x00python-test-hello\x00\xff"
5389        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
5390            s1.bind(address)
5391            s1.listen()
5392            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
5393                s2.connect(s1.getsockname())
5394                with s1.accept()[0] as s3:
5395                    self.assertEqual(s1.getsockname(), address)
5396                    self.assertEqual(s2.getpeername(), address)
5397
5398    def testMaxName(self):
5399        address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
5400        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5401            s.bind(address)
5402            self.assertEqual(s.getsockname(), address)
5403
5404    def testNameOverflow(self):
5405        address = "\x00" + "h" * self.UNIX_PATH_MAX
5406        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5407            self.assertRaises(OSError, s.bind, address)
5408
5409    def testStrName(self):
5410        # Check that an abstract name can be passed as a string.
5411        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5412        try:
5413            s.bind("\x00python\x00test\x00")
5414            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5415        finally:
5416            s.close()
5417
5418    def testBytearrayName(self):
5419        # Check that an abstract name can be passed as a bytearray.
5420        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5421            s.bind(bytearray(b"\x00python\x00test\x00"))
5422            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5423
5424@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
5425class TestUnixDomain(unittest.TestCase):
5426
5427    def setUp(self):
5428        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5429
5430    def tearDown(self):
5431        self.sock.close()
5432
5433    def encoded(self, path):
5434        # Return the given path encoded in the file system encoding,
5435        # or skip the test if this is not possible.
5436        try:
5437            return os.fsencode(path)
5438        except UnicodeEncodeError:
5439            self.skipTest(
5440                "Pathname {0!a} cannot be represented in file "
5441                "system encoding {1!r}".format(
5442                    path, sys.getfilesystemencoding()))
5443
5444    def bind(self, sock, path):
5445        # Bind the socket
5446        try:
5447            socket_helper.bind_unix_socket(sock, path)
5448        except OSError as e:
5449            if str(e) == "AF_UNIX path too long":
5450                self.skipTest(
5451                    "Pathname {0!a} is too long to serve as an AF_UNIX path"
5452                    .format(path))
5453            else:
5454                raise
5455
5456    def testUnbound(self):
5457        # Issue #30205 (note getsockname() can return None on OS X)
5458        self.assertIn(self.sock.getsockname(), ('', None))
5459
5460    def testStrAddr(self):
5461        # Test binding to and retrieving a normal string pathname.
5462        path = os.path.abspath(os_helper.TESTFN)
5463        self.bind(self.sock, path)
5464        self.addCleanup(os_helper.unlink, path)
5465        self.assertEqual(self.sock.getsockname(), path)
5466
5467    def testBytesAddr(self):
5468        # Test binding to a bytes pathname.
5469        path = os.path.abspath(os_helper.TESTFN)
5470        self.bind(self.sock, self.encoded(path))
5471        self.addCleanup(os_helper.unlink, path)
5472        self.assertEqual(self.sock.getsockname(), path)
5473
5474    def testSurrogateescapeBind(self):
5475        # Test binding to a valid non-ASCII pathname, with the
5476        # non-ASCII bytes supplied using surrogateescape encoding.
5477        path = os.path.abspath(os_helper.TESTFN_UNICODE)
5478        b = self.encoded(path)
5479        self.bind(self.sock, b.decode("ascii", "surrogateescape"))
5480        self.addCleanup(os_helper.unlink, path)
5481        self.assertEqual(self.sock.getsockname(), path)
5482
5483    def testUnencodableAddr(self):
5484        # Test binding to a pathname that cannot be encoded in the
5485        # file system encoding.
5486        if os_helper.TESTFN_UNENCODABLE is None:
5487            self.skipTest("No unencodable filename available")
5488        path = os.path.abspath(os_helper.TESTFN_UNENCODABLE)
5489        self.bind(self.sock, path)
5490        self.addCleanup(os_helper.unlink, path)
5491        self.assertEqual(self.sock.getsockname(), path)
5492
5493
5494class BufferIOTest(SocketConnectedTest):
5495    """
5496    Test the buffer versions of socket.recv() and socket.send().
5497    """
5498    def __init__(self, methodName='runTest'):
5499        SocketConnectedTest.__init__(self, methodName=methodName)
5500
5501    def testRecvIntoArray(self):
5502        buf = array.array("B", [0] * len(MSG))
5503        nbytes = self.cli_conn.recv_into(buf)
5504        self.assertEqual(nbytes, len(MSG))
5505        buf = buf.tobytes()
5506        msg = buf[:len(MSG)]
5507        self.assertEqual(msg, MSG)
5508
5509    def _testRecvIntoArray(self):
5510        buf = bytes(MSG)
5511        self.serv_conn.send(buf)
5512
5513    def testRecvIntoBytearray(self):
5514        buf = bytearray(1024)
5515        nbytes = self.cli_conn.recv_into(buf)
5516        self.assertEqual(nbytes, len(MSG))
5517        msg = buf[:len(MSG)]
5518        self.assertEqual(msg, MSG)
5519
5520    _testRecvIntoBytearray = _testRecvIntoArray
5521
5522    def testRecvIntoMemoryview(self):
5523        buf = bytearray(1024)
5524        nbytes = self.cli_conn.recv_into(memoryview(buf))
5525        self.assertEqual(nbytes, len(MSG))
5526        msg = buf[:len(MSG)]
5527        self.assertEqual(msg, MSG)
5528
5529    _testRecvIntoMemoryview = _testRecvIntoArray
5530
5531    def testRecvFromIntoArray(self):
5532        buf = array.array("B", [0] * len(MSG))
5533        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5534        self.assertEqual(nbytes, len(MSG))
5535        buf = buf.tobytes()
5536        msg = buf[:len(MSG)]
5537        self.assertEqual(msg, MSG)
5538
5539    def _testRecvFromIntoArray(self):
5540        buf = bytes(MSG)
5541        self.serv_conn.send(buf)
5542
5543    def testRecvFromIntoBytearray(self):
5544        buf = bytearray(1024)
5545        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5546        self.assertEqual(nbytes, len(MSG))
5547        msg = buf[:len(MSG)]
5548        self.assertEqual(msg, MSG)
5549
5550    _testRecvFromIntoBytearray = _testRecvFromIntoArray
5551
5552    def testRecvFromIntoMemoryview(self):
5553        buf = bytearray(1024)
5554        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
5555        self.assertEqual(nbytes, len(MSG))
5556        msg = buf[:len(MSG)]
5557        self.assertEqual(msg, MSG)
5558
5559    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
5560
5561    def testRecvFromIntoSmallBuffer(self):
5562        # See issue #20246.
5563        buf = bytearray(8)
5564        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
5565
5566    def _testRecvFromIntoSmallBuffer(self):
5567        self.serv_conn.send(MSG)
5568
5569    def testRecvFromIntoEmptyBuffer(self):
5570        buf = bytearray()
5571        self.cli_conn.recvfrom_into(buf)
5572        self.cli_conn.recvfrom_into(buf, 0)
5573
5574    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
5575
5576
5577TIPC_STYPE = 2000
5578TIPC_LOWER = 200
5579TIPC_UPPER = 210
5580
5581def isTipcAvailable():
5582    """Check if the TIPC module is loaded
5583
5584    The TIPC module is not loaded automatically on Ubuntu and probably
5585    other Linux distros.
5586    """
5587    if not hasattr(socket, "AF_TIPC"):
5588        return False
5589    try:
5590        f = open("/proc/modules", encoding="utf-8")
5591    except (FileNotFoundError, IsADirectoryError, PermissionError):
5592        # It's ok if the file does not exist, is a directory or if we
5593        # have not the permission to read it.
5594        return False
5595    with f:
5596        for line in f:
5597            if line.startswith("tipc "):
5598                return True
5599    return False
5600
5601@unittest.skipUnless(isTipcAvailable(),
5602                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5603class TIPCTest(unittest.TestCase):
5604    def testRDM(self):
5605        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5606        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5607        self.addCleanup(srv.close)
5608        self.addCleanup(cli.close)
5609
5610        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5611        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5612                TIPC_LOWER, TIPC_UPPER)
5613        srv.bind(srvaddr)
5614
5615        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5616                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5617        cli.sendto(MSG, sendaddr)
5618
5619        msg, recvaddr = srv.recvfrom(1024)
5620
5621        self.assertEqual(cli.getsockname(), recvaddr)
5622        self.assertEqual(msg, MSG)
5623
5624
5625@unittest.skipUnless(isTipcAvailable(),
5626                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5627class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
5628    def __init__(self, methodName = 'runTest'):
5629        unittest.TestCase.__init__(self, methodName = methodName)
5630        ThreadableTest.__init__(self)
5631
5632    def setUp(self):
5633        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5634        self.addCleanup(self.srv.close)
5635        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5636        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5637                TIPC_LOWER, TIPC_UPPER)
5638        self.srv.bind(srvaddr)
5639        self.srv.listen()
5640        self.serverExplicitReady()
5641        self.conn, self.connaddr = self.srv.accept()
5642        self.addCleanup(self.conn.close)
5643
5644    def clientSetUp(self):
5645        # There is a hittable race between serverExplicitReady() and the
5646        # accept() call; sleep a little while to avoid it, otherwise
5647        # we could get an exception
5648        time.sleep(0.1)
5649        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5650        self.addCleanup(self.cli.close)
5651        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5652                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5653        self.cli.connect(addr)
5654        self.cliaddr = self.cli.getsockname()
5655
5656    def testStream(self):
5657        msg = self.conn.recv(1024)
5658        self.assertEqual(msg, MSG)
5659        self.assertEqual(self.cliaddr, self.connaddr)
5660
5661    def _testStream(self):
5662        self.cli.send(MSG)
5663        self.cli.close()
5664
5665
5666class ContextManagersTest(ThreadedTCPSocketTest):
5667
5668    def _testSocketClass(self):
5669        # base test
5670        with socket.socket() as sock:
5671            self.assertFalse(sock._closed)
5672        self.assertTrue(sock._closed)
5673        # close inside with block
5674        with socket.socket() as sock:
5675            sock.close()
5676        self.assertTrue(sock._closed)
5677        # exception inside with block
5678        with socket.socket() as sock:
5679            self.assertRaises(OSError, sock.sendall, b'foo')
5680        self.assertTrue(sock._closed)
5681
5682    def testCreateConnectionBase(self):
5683        conn, addr = self.serv.accept()
5684        self.addCleanup(conn.close)
5685        data = conn.recv(1024)
5686        conn.sendall(data)
5687
5688    def _testCreateConnectionBase(self):
5689        address = self.serv.getsockname()
5690        with socket.create_connection(address) as sock:
5691            self.assertFalse(sock._closed)
5692            sock.sendall(b'foo')
5693            self.assertEqual(sock.recv(1024), b'foo')
5694        self.assertTrue(sock._closed)
5695
5696    def testCreateConnectionClose(self):
5697        conn, addr = self.serv.accept()
5698        self.addCleanup(conn.close)
5699        data = conn.recv(1024)
5700        conn.sendall(data)
5701
5702    def _testCreateConnectionClose(self):
5703        address = self.serv.getsockname()
5704        with socket.create_connection(address) as sock:
5705            sock.close()
5706        self.assertTrue(sock._closed)
5707        self.assertRaises(OSError, sock.sendall, b'foo')
5708
5709
5710class InheritanceTest(unittest.TestCase):
5711    @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
5712                         "SOCK_CLOEXEC not defined")
5713    @support.requires_linux_version(2, 6, 28)
5714    def test_SOCK_CLOEXEC(self):
5715        with socket.socket(socket.AF_INET,
5716                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
5717            self.assertEqual(s.type, socket.SOCK_STREAM)
5718            self.assertFalse(s.get_inheritable())
5719
5720    def test_default_inheritable(self):
5721        sock = socket.socket()
5722        with sock:
5723            self.assertEqual(sock.get_inheritable(), False)
5724
5725    def test_dup(self):
5726        sock = socket.socket()
5727        with sock:
5728            newsock = sock.dup()
5729            sock.close()
5730            with newsock:
5731                self.assertEqual(newsock.get_inheritable(), False)
5732
5733    def test_set_inheritable(self):
5734        sock = socket.socket()
5735        with sock:
5736            sock.set_inheritable(True)
5737            self.assertEqual(sock.get_inheritable(), True)
5738
5739            sock.set_inheritable(False)
5740            self.assertEqual(sock.get_inheritable(), False)
5741
5742    @unittest.skipIf(fcntl is None, "need fcntl")
5743    def test_get_inheritable_cloexec(self):
5744        sock = socket.socket()
5745        with sock:
5746            fd = sock.fileno()
5747            self.assertEqual(sock.get_inheritable(), False)
5748
5749            # clear FD_CLOEXEC flag
5750            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
5751            flags &= ~fcntl.FD_CLOEXEC
5752            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
5753
5754            self.assertEqual(sock.get_inheritable(), True)
5755
5756    @unittest.skipIf(fcntl is None, "need fcntl")
5757    def test_set_inheritable_cloexec(self):
5758        sock = socket.socket()
5759        with sock:
5760            fd = sock.fileno()
5761            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5762                             fcntl.FD_CLOEXEC)
5763
5764            sock.set_inheritable(True)
5765            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5766                             0)
5767
5768
5769    def test_socketpair(self):
5770        s1, s2 = socket.socketpair()
5771        self.addCleanup(s1.close)
5772        self.addCleanup(s2.close)
5773        self.assertEqual(s1.get_inheritable(), False)
5774        self.assertEqual(s2.get_inheritable(), False)
5775
5776
5777@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
5778                     "SOCK_NONBLOCK not defined")
5779class NonblockConstantTest(unittest.TestCase):
5780    def checkNonblock(self, s, nonblock=True, timeout=0.0):
5781        if nonblock:
5782            self.assertEqual(s.type, socket.SOCK_STREAM)
5783            self.assertEqual(s.gettimeout(), timeout)
5784            self.assertTrue(
5785                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5786            if timeout == 0:
5787                # timeout == 0: means that getblocking() must be False.
5788                self.assertFalse(s.getblocking())
5789            else:
5790                # If timeout > 0, the socket will be in a "blocking" mode
5791                # from the standpoint of the Python API.  For Python socket
5792                # object, "blocking" means that operations like 'sock.recv()'
5793                # will block.  Internally, file descriptors for
5794                # "blocking" Python sockets *with timeouts* are in a
5795                # *non-blocking* mode, and 'sock.recv()' uses 'select()'
5796                # and handles EWOULDBLOCK/EAGAIN to enforce the timeout.
5797                self.assertTrue(s.getblocking())
5798        else:
5799            self.assertEqual(s.type, socket.SOCK_STREAM)
5800            self.assertEqual(s.gettimeout(), None)
5801            self.assertFalse(
5802                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5803            self.assertTrue(s.getblocking())
5804
5805    @support.requires_linux_version(2, 6, 28)
5806    def test_SOCK_NONBLOCK(self):
5807        # a lot of it seems silly and redundant, but I wanted to test that
5808        # changing back and forth worked ok
5809        with socket.socket(socket.AF_INET,
5810                           socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
5811            self.checkNonblock(s)
5812            s.setblocking(True)
5813            self.checkNonblock(s, nonblock=False)
5814            s.setblocking(False)
5815            self.checkNonblock(s)
5816            s.settimeout(None)
5817            self.checkNonblock(s, nonblock=False)
5818            s.settimeout(2.0)
5819            self.checkNonblock(s, timeout=2.0)
5820            s.setblocking(True)
5821            self.checkNonblock(s, nonblock=False)
5822        # defaulttimeout
5823        t = socket.getdefaulttimeout()
5824        socket.setdefaulttimeout(0.0)
5825        with socket.socket() as s:
5826            self.checkNonblock(s)
5827        socket.setdefaulttimeout(None)
5828        with socket.socket() as s:
5829            self.checkNonblock(s, False)
5830        socket.setdefaulttimeout(2.0)
5831        with socket.socket() as s:
5832            self.checkNonblock(s, timeout=2.0)
5833        socket.setdefaulttimeout(None)
5834        with socket.socket() as s:
5835            self.checkNonblock(s, False)
5836        socket.setdefaulttimeout(t)
5837
5838
5839@unittest.skipUnless(os.name == "nt", "Windows specific")
5840@unittest.skipUnless(multiprocessing, "need multiprocessing")
5841class TestSocketSharing(SocketTCPTest):
5842    # This must be classmethod and not staticmethod or multiprocessing
5843    # won't be able to bootstrap it.
5844    @classmethod
5845    def remoteProcessServer(cls, q):
5846        # Recreate socket from shared data
5847        sdata = q.get()
5848        message = q.get()
5849
5850        s = socket.fromshare(sdata)
5851        s2, c = s.accept()
5852
5853        # Send the message
5854        s2.sendall(message)
5855        s2.close()
5856        s.close()
5857
5858    def testShare(self):
5859        # Transfer the listening server socket to another process
5860        # and service it from there.
5861
5862        # Create process:
5863        q = multiprocessing.Queue()
5864        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
5865        p.start()
5866
5867        # Get the shared socket data
5868        data = self.serv.share(p.pid)
5869
5870        # Pass the shared socket to the other process
5871        addr = self.serv.getsockname()
5872        self.serv.close()
5873        q.put(data)
5874
5875        # The data that the server will send us
5876        message = b"slapmahfro"
5877        q.put(message)
5878
5879        # Connect
5880        s = socket.create_connection(addr)
5881        #  listen for the data
5882        m = []
5883        while True:
5884            data = s.recv(100)
5885            if not data:
5886                break
5887            m.append(data)
5888        s.close()
5889        received = b"".join(m)
5890        self.assertEqual(received, message)
5891        p.join()
5892
5893    def testShareLength(self):
5894        data = self.serv.share(os.getpid())
5895        self.assertRaises(ValueError, socket.fromshare, data[:-1])
5896        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
5897
5898    def compareSockets(self, org, other):
5899        # socket sharing is expected to work only for blocking socket
5900        # since the internal python timeout value isn't transferred.
5901        self.assertEqual(org.gettimeout(), None)
5902        self.assertEqual(org.gettimeout(), other.gettimeout())
5903
5904        self.assertEqual(org.family, other.family)
5905        self.assertEqual(org.type, other.type)
5906        # If the user specified "0" for proto, then
5907        # internally windows will have picked the correct value.
5908        # Python introspection on the socket however will still return
5909        # 0.  For the shared socket, the python value is recreated
5910        # from the actual value, so it may not compare correctly.
5911        if org.proto != 0:
5912            self.assertEqual(org.proto, other.proto)
5913
5914    def testShareLocal(self):
5915        data = self.serv.share(os.getpid())
5916        s = socket.fromshare(data)
5917        try:
5918            self.compareSockets(self.serv, s)
5919        finally:
5920            s.close()
5921
5922    def testTypes(self):
5923        families = [socket.AF_INET, socket.AF_INET6]
5924        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
5925        for f in families:
5926            for t in types:
5927                try:
5928                    source = socket.socket(f, t)
5929                except OSError:
5930                    continue # This combination is not supported
5931                try:
5932                    data = source.share(os.getpid())
5933                    shared = socket.fromshare(data)
5934                    try:
5935                        self.compareSockets(source, shared)
5936                    finally:
5937                        shared.close()
5938                finally:
5939                    source.close()
5940
5941
5942class SendfileUsingSendTest(ThreadedTCPSocketTest):
5943    """
5944    Test the send() implementation of socket.sendfile().
5945    """
5946
5947    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
5948    BUFSIZE = 8192
5949    FILEDATA = b""
5950    TIMEOUT = support.LOOPBACK_TIMEOUT
5951
5952    @classmethod
5953    def setUpClass(cls):
5954        def chunks(total, step):
5955            assert total >= step
5956            while total > step:
5957                yield step
5958                total -= step
5959            if total:
5960                yield total
5961
5962        chunk = b"".join([random.choice(string.ascii_letters).encode()
5963                          for i in range(cls.BUFSIZE)])
5964        with open(os_helper.TESTFN, 'wb') as f:
5965            for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
5966                f.write(chunk)
5967        with open(os_helper.TESTFN, 'rb') as f:
5968            cls.FILEDATA = f.read()
5969            assert len(cls.FILEDATA) == cls.FILESIZE
5970
5971    @classmethod
5972    def tearDownClass(cls):
5973        os_helper.unlink(os_helper.TESTFN)
5974
5975    def accept_conn(self):
5976        self.serv.settimeout(support.LONG_TIMEOUT)
5977        conn, addr = self.serv.accept()
5978        conn.settimeout(self.TIMEOUT)
5979        self.addCleanup(conn.close)
5980        return conn
5981
5982    def recv_data(self, conn):
5983        received = []
5984        while True:
5985            chunk = conn.recv(self.BUFSIZE)
5986            if not chunk:
5987                break
5988            received.append(chunk)
5989        return b''.join(received)
5990
5991    def meth_from_sock(self, sock):
5992        # Depending on the mixin class being run return either send()
5993        # or sendfile() method implementation.
5994        return getattr(sock, "_sendfile_use_send")
5995
5996    # regular file
5997
5998    def _testRegularFile(self):
5999        address = self.serv.getsockname()
6000        file = open(os_helper.TESTFN, 'rb')
6001        with socket.create_connection(address) as sock, file as file:
6002            meth = self.meth_from_sock(sock)
6003            sent = meth(file)
6004            self.assertEqual(sent, self.FILESIZE)
6005            self.assertEqual(file.tell(), self.FILESIZE)
6006
6007    def testRegularFile(self):
6008        conn = self.accept_conn()
6009        data = self.recv_data(conn)
6010        self.assertEqual(len(data), self.FILESIZE)
6011        self.assertEqual(data, self.FILEDATA)
6012
6013    # non regular file
6014
6015    def _testNonRegularFile(self):
6016        address = self.serv.getsockname()
6017        file = io.BytesIO(self.FILEDATA)
6018        with socket.create_connection(address) as sock, file as file:
6019            sent = sock.sendfile(file)
6020            self.assertEqual(sent, self.FILESIZE)
6021            self.assertEqual(file.tell(), self.FILESIZE)
6022            self.assertRaises(socket._GiveupOnSendfile,
6023                              sock._sendfile_use_sendfile, file)
6024
6025    def testNonRegularFile(self):
6026        conn = self.accept_conn()
6027        data = self.recv_data(conn)
6028        self.assertEqual(len(data), self.FILESIZE)
6029        self.assertEqual(data, self.FILEDATA)
6030
6031    # empty file
6032
6033    def _testEmptyFileSend(self):
6034        address = self.serv.getsockname()
6035        filename = os_helper.TESTFN + "2"
6036        with open(filename, 'wb'):
6037            self.addCleanup(os_helper.unlink, filename)
6038        file = open(filename, 'rb')
6039        with socket.create_connection(address) as sock, file as file:
6040            meth = self.meth_from_sock(sock)
6041            sent = meth(file)
6042            self.assertEqual(sent, 0)
6043            self.assertEqual(file.tell(), 0)
6044
6045    def testEmptyFileSend(self):
6046        conn = self.accept_conn()
6047        data = self.recv_data(conn)
6048        self.assertEqual(data, b"")
6049
6050    # offset
6051
6052    def _testOffset(self):
6053        address = self.serv.getsockname()
6054        file = open(os_helper.TESTFN, 'rb')
6055        with socket.create_connection(address) as sock, file as file:
6056            meth = self.meth_from_sock(sock)
6057            sent = meth(file, offset=5000)
6058            self.assertEqual(sent, self.FILESIZE - 5000)
6059            self.assertEqual(file.tell(), self.FILESIZE)
6060
6061    def testOffset(self):
6062        conn = self.accept_conn()
6063        data = self.recv_data(conn)
6064        self.assertEqual(len(data), self.FILESIZE - 5000)
6065        self.assertEqual(data, self.FILEDATA[5000:])
6066
6067    # count
6068
6069    def _testCount(self):
6070        address = self.serv.getsockname()
6071        file = open(os_helper.TESTFN, 'rb')
6072        sock = socket.create_connection(address,
6073                                        timeout=support.LOOPBACK_TIMEOUT)
6074        with sock, file:
6075            count = 5000007
6076            meth = self.meth_from_sock(sock)
6077            sent = meth(file, count=count)
6078            self.assertEqual(sent, count)
6079            self.assertEqual(file.tell(), count)
6080
6081    def testCount(self):
6082        count = 5000007
6083        conn = self.accept_conn()
6084        data = self.recv_data(conn)
6085        self.assertEqual(len(data), count)
6086        self.assertEqual(data, self.FILEDATA[:count])
6087
6088    # count small
6089
6090    def _testCountSmall(self):
6091        address = self.serv.getsockname()
6092        file = open(os_helper.TESTFN, 'rb')
6093        sock = socket.create_connection(address,
6094                                        timeout=support.LOOPBACK_TIMEOUT)
6095        with sock, file:
6096            count = 1
6097            meth = self.meth_from_sock(sock)
6098            sent = meth(file, count=count)
6099            self.assertEqual(sent, count)
6100            self.assertEqual(file.tell(), count)
6101
6102    def testCountSmall(self):
6103        count = 1
6104        conn = self.accept_conn()
6105        data = self.recv_data(conn)
6106        self.assertEqual(len(data), count)
6107        self.assertEqual(data, self.FILEDATA[:count])
6108
6109    # count + offset
6110
6111    def _testCountWithOffset(self):
6112        address = self.serv.getsockname()
6113        file = open(os_helper.TESTFN, 'rb')
6114        with socket.create_connection(address, timeout=2) as sock, file as file:
6115            count = 100007
6116            meth = self.meth_from_sock(sock)
6117            sent = meth(file, offset=2007, count=count)
6118            self.assertEqual(sent, count)
6119            self.assertEqual(file.tell(), count + 2007)
6120
6121    def testCountWithOffset(self):
6122        count = 100007
6123        conn = self.accept_conn()
6124        data = self.recv_data(conn)
6125        self.assertEqual(len(data), count)
6126        self.assertEqual(data, self.FILEDATA[2007:count+2007])
6127
6128    # non blocking sockets are not supposed to work
6129
6130    def _testNonBlocking(self):
6131        address = self.serv.getsockname()
6132        file = open(os_helper.TESTFN, 'rb')
6133        with socket.create_connection(address) as sock, file as file:
6134            sock.setblocking(False)
6135            meth = self.meth_from_sock(sock)
6136            self.assertRaises(ValueError, meth, file)
6137            self.assertRaises(ValueError, sock.sendfile, file)
6138
6139    def testNonBlocking(self):
6140        conn = self.accept_conn()
6141        if conn.recv(8192):
6142            self.fail('was not supposed to receive any data')
6143
6144    # timeout (non-triggered)
6145
6146    def _testWithTimeout(self):
6147        address = self.serv.getsockname()
6148        file = open(os_helper.TESTFN, 'rb')
6149        sock = socket.create_connection(address,
6150                                        timeout=support.LOOPBACK_TIMEOUT)
6151        with sock, file:
6152            meth = self.meth_from_sock(sock)
6153            sent = meth(file)
6154            self.assertEqual(sent, self.FILESIZE)
6155
6156    def testWithTimeout(self):
6157        conn = self.accept_conn()
6158        data = self.recv_data(conn)
6159        self.assertEqual(len(data), self.FILESIZE)
6160        self.assertEqual(data, self.FILEDATA)
6161
6162    # timeout (triggered)
6163
6164    def _testWithTimeoutTriggeredSend(self):
6165        address = self.serv.getsockname()
6166        with open(os_helper.TESTFN, 'rb') as file:
6167            with socket.create_connection(address) as sock:
6168                sock.settimeout(0.01)
6169                meth = self.meth_from_sock(sock)
6170                self.assertRaises(TimeoutError, meth, file)
6171
6172    def testWithTimeoutTriggeredSend(self):
6173        conn = self.accept_conn()
6174        conn.recv(88192)
6175        time.sleep(1)
6176
6177    # errors
6178
6179    def _test_errors(self):
6180        pass
6181
6182    def test_errors(self):
6183        with open(os_helper.TESTFN, 'rb') as file:
6184            with socket.socket(type=socket.SOCK_DGRAM) as s:
6185                meth = self.meth_from_sock(s)
6186                self.assertRaisesRegex(
6187                    ValueError, "SOCK_STREAM", meth, file)
6188        with open(os_helper.TESTFN, encoding="utf-8") as file:
6189            with socket.socket() as s:
6190                meth = self.meth_from_sock(s)
6191                self.assertRaisesRegex(
6192                    ValueError, "binary mode", meth, file)
6193        with open(os_helper.TESTFN, 'rb') as file:
6194            with socket.socket() as s:
6195                meth = self.meth_from_sock(s)
6196                self.assertRaisesRegex(TypeError, "positive integer",
6197                                       meth, file, count='2')
6198                self.assertRaisesRegex(TypeError, "positive integer",
6199                                       meth, file, count=0.1)
6200                self.assertRaisesRegex(ValueError, "positive integer",
6201                                       meth, file, count=0)
6202                self.assertRaisesRegex(ValueError, "positive integer",
6203                                       meth, file, count=-1)
6204
6205
6206@unittest.skipUnless(hasattr(os, "sendfile"),
6207                     'os.sendfile() required for this test.')
6208class SendfileUsingSendfileTest(SendfileUsingSendTest):
6209    """
6210    Test the sendfile() implementation of socket.sendfile().
6211    """
6212    def meth_from_sock(self, sock):
6213        return getattr(sock, "_sendfile_use_sendfile")
6214
6215
6216@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
6217class LinuxKernelCryptoAPI(unittest.TestCase):
6218    # tests for AF_ALG
6219    def create_alg(self, typ, name):
6220        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6221        try:
6222            sock.bind((typ, name))
6223        except FileNotFoundError as e:
6224            # type / algorithm is not available
6225            sock.close()
6226            raise unittest.SkipTest(str(e), typ, name)
6227        else:
6228            return sock
6229
6230    # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY,
6231    # at least on ppc64le architecture
6232    @support.requires_linux_version(4, 5)
6233    def test_sha256(self):
6234        expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
6235                                 "177a9cb410ff61f20015ad")
6236        with self.create_alg('hash', 'sha256') as algo:
6237            op, _ = algo.accept()
6238            with op:
6239                op.sendall(b"abc")
6240                self.assertEqual(op.recv(512), expected)
6241
6242            op, _ = algo.accept()
6243            with op:
6244                op.send(b'a', socket.MSG_MORE)
6245                op.send(b'b', socket.MSG_MORE)
6246                op.send(b'c', socket.MSG_MORE)
6247                op.send(b'')
6248                self.assertEqual(op.recv(512), expected)
6249
6250    def test_hmac_sha1(self):
6251        expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
6252        with self.create_alg('hash', 'hmac(sha1)') as algo:
6253            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
6254            op, _ = algo.accept()
6255            with op:
6256                op.sendall(b"what do ya want for nothing?")
6257                self.assertEqual(op.recv(512), expected)
6258
6259    # Although it should work with 3.19 and newer the test blocks on
6260    # Ubuntu 15.10 with Kernel 4.2.0-19.
6261    @support.requires_linux_version(4, 3)
6262    def test_aes_cbc(self):
6263        key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
6264        iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
6265        msg = b"Single block msg"
6266        ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
6267        msglen = len(msg)
6268        with self.create_alg('skcipher', 'cbc(aes)') as algo:
6269            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6270            op, _ = algo.accept()
6271            with op:
6272                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6273                                 flags=socket.MSG_MORE)
6274                op.sendall(msg)
6275                self.assertEqual(op.recv(msglen), ciphertext)
6276
6277            op, _ = algo.accept()
6278            with op:
6279                op.sendmsg_afalg([ciphertext],
6280                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6281                self.assertEqual(op.recv(msglen), msg)
6282
6283            # long message
6284            multiplier = 1024
6285            longmsg = [msg] * multiplier
6286            op, _ = algo.accept()
6287            with op:
6288                op.sendmsg_afalg(longmsg,
6289                                 op=socket.ALG_OP_ENCRYPT, iv=iv)
6290                enc = op.recv(msglen * multiplier)
6291            self.assertEqual(len(enc), msglen * multiplier)
6292            self.assertEqual(enc[:msglen], ciphertext)
6293
6294            op, _ = algo.accept()
6295            with op:
6296                op.sendmsg_afalg([enc],
6297                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6298                dec = op.recv(msglen * multiplier)
6299            self.assertEqual(len(dec), msglen * multiplier)
6300            self.assertEqual(dec, msg * multiplier)
6301
6302    @support.requires_linux_version(4, 9)  # see issue29324
6303    def test_aead_aes_gcm(self):
6304        key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
6305        iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
6306        plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
6307        assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
6308        expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
6309        expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
6310
6311        taglen = len(expected_tag)
6312        assoclen = len(assoc)
6313
6314        with self.create_alg('aead', 'gcm(aes)') as algo:
6315            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6316            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
6317                            None, taglen)
6318
6319            # send assoc, plain and tag buffer in separate steps
6320            op, _ = algo.accept()
6321            with op:
6322                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6323                                 assoclen=assoclen, flags=socket.MSG_MORE)
6324                op.sendall(assoc, socket.MSG_MORE)
6325                op.sendall(plain)
6326                res = op.recv(assoclen + len(plain) + taglen)
6327                self.assertEqual(expected_ct, res[assoclen:-taglen])
6328                self.assertEqual(expected_tag, res[-taglen:])
6329
6330            # now with msg
6331            op, _ = algo.accept()
6332            with op:
6333                msg = assoc + plain
6334                op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
6335                                 assoclen=assoclen)
6336                res = op.recv(assoclen + len(plain) + taglen)
6337                self.assertEqual(expected_ct, res[assoclen:-taglen])
6338                self.assertEqual(expected_tag, res[-taglen:])
6339
6340            # create anc data manually
6341            pack_uint32 = struct.Struct('I').pack
6342            op, _ = algo.accept()
6343            with op:
6344                msg = assoc + plain
6345                op.sendmsg(
6346                    [msg],
6347                    ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
6348                     [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
6349                     [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
6350                    )
6351                )
6352                res = op.recv(len(msg) + taglen)
6353                self.assertEqual(expected_ct, res[assoclen:-taglen])
6354                self.assertEqual(expected_tag, res[-taglen:])
6355
6356            # decrypt and verify
6357            op, _ = algo.accept()
6358            with op:
6359                msg = assoc + expected_ct + expected_tag
6360                op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
6361                                 assoclen=assoclen)
6362                res = op.recv(len(msg) - taglen)
6363                self.assertEqual(plain, res[assoclen:])
6364
6365    @support.requires_linux_version(4, 3)  # see test_aes_cbc
6366    def test_drbg_pr_sha256(self):
6367        # deterministic random bit generator, prediction resistance, sha256
6368        with self.create_alg('rng', 'drbg_pr_sha256') as algo:
6369            extra_seed = os.urandom(32)
6370            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
6371            op, _ = algo.accept()
6372            with op:
6373                rn = op.recv(32)
6374                self.assertEqual(len(rn), 32)
6375
6376    def test_sendmsg_afalg_args(self):
6377        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6378        with sock:
6379            with self.assertRaises(TypeError):
6380                sock.sendmsg_afalg()
6381
6382            with self.assertRaises(TypeError):
6383                sock.sendmsg_afalg(op=None)
6384
6385            with self.assertRaises(TypeError):
6386                sock.sendmsg_afalg(1)
6387
6388            with self.assertRaises(TypeError):
6389                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
6390
6391            with self.assertRaises(TypeError):
6392                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
6393
6394    def test_length_restriction(self):
6395        # bpo-35050, off-by-one error in length check
6396        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6397        self.addCleanup(sock.close)
6398
6399        # salg_type[14]
6400        with self.assertRaises(FileNotFoundError):
6401            sock.bind(("t" * 13, "name"))
6402        with self.assertRaisesRegex(ValueError, "type too long"):
6403            sock.bind(("t" * 14, "name"))
6404
6405        # salg_name[64]
6406        with self.assertRaises(FileNotFoundError):
6407            sock.bind(("type", "n" * 63))
6408        with self.assertRaisesRegex(ValueError, "name too long"):
6409            sock.bind(("type", "n" * 64))
6410
6411
6412@unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
6413class TestMacOSTCPFlags(unittest.TestCase):
6414    def test_tcp_keepalive(self):
6415        self.assertTrue(socket.TCP_KEEPALIVE)
6416
6417
6418@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
6419class TestMSWindowsTCPFlags(unittest.TestCase):
6420    knownTCPFlags = {
6421                       # available since long time ago
6422                       'TCP_MAXSEG',
6423                       'TCP_NODELAY',
6424                       # available starting with Windows 10 1607
6425                       'TCP_FASTOPEN',
6426                       # available starting with Windows 10 1703
6427                       'TCP_KEEPCNT',
6428                       # available starting with Windows 10 1709
6429                       'TCP_KEEPIDLE',
6430                       'TCP_KEEPINTVL'
6431                       }
6432
6433    def test_new_tcp_flags(self):
6434        provided = [s for s in dir(socket) if s.startswith('TCP')]
6435        unknown = [s for s in provided if s not in self.knownTCPFlags]
6436
6437        self.assertEqual([], unknown,
6438            "New TCP flags were discovered. See bpo-32394 for more information")
6439
6440
6441class CreateServerTest(unittest.TestCase):
6442
6443    def test_address(self):
6444        port = socket_helper.find_unused_port()
6445        with socket.create_server(("127.0.0.1", port)) as sock:
6446            self.assertEqual(sock.getsockname()[0], "127.0.0.1")
6447            self.assertEqual(sock.getsockname()[1], port)
6448        if socket_helper.IPV6_ENABLED:
6449            with socket.create_server(("::1", port),
6450                                      family=socket.AF_INET6) as sock:
6451                self.assertEqual(sock.getsockname()[0], "::1")
6452                self.assertEqual(sock.getsockname()[1], port)
6453
6454    def test_family_and_type(self):
6455        with socket.create_server(("127.0.0.1", 0)) as sock:
6456            self.assertEqual(sock.family, socket.AF_INET)
6457            self.assertEqual(sock.type, socket.SOCK_STREAM)
6458        if socket_helper.IPV6_ENABLED:
6459            with socket.create_server(("::1", 0), family=socket.AF_INET6) as s:
6460                self.assertEqual(s.family, socket.AF_INET6)
6461                self.assertEqual(sock.type, socket.SOCK_STREAM)
6462
6463    def test_reuse_port(self):
6464        if not hasattr(socket, "SO_REUSEPORT"):
6465            with self.assertRaises(ValueError):
6466                socket.create_server(("localhost", 0), reuse_port=True)
6467        else:
6468            with socket.create_server(("localhost", 0)) as sock:
6469                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6470                self.assertEqual(opt, 0)
6471            with socket.create_server(("localhost", 0), reuse_port=True) as sock:
6472                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6473                self.assertNotEqual(opt, 0)
6474
6475    @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
6476                     not hasattr(_socket, 'IPV6_V6ONLY'),
6477                     "IPV6_V6ONLY option not supported")
6478    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6479    def test_ipv6_only_default(self):
6480        with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock:
6481            assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
6482
6483    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6484                     "dualstack_ipv6 not supported")
6485    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6486    def test_dualstack_ipv6_family(self):
6487        with socket.create_server(("::1", 0), family=socket.AF_INET6,
6488                                  dualstack_ipv6=True) as sock:
6489            self.assertEqual(sock.family, socket.AF_INET6)
6490
6491
6492class CreateServerFunctionalTest(unittest.TestCase):
6493    timeout = support.LOOPBACK_TIMEOUT
6494
6495    def echo_server(self, sock):
6496        def run(sock):
6497            with sock:
6498                conn, _ = sock.accept()
6499                with conn:
6500                    event.wait(self.timeout)
6501                    msg = conn.recv(1024)
6502                    if not msg:
6503                        return
6504                    conn.sendall(msg)
6505
6506        event = threading.Event()
6507        sock.settimeout(self.timeout)
6508        thread = threading.Thread(target=run, args=(sock, ))
6509        thread.start()
6510        self.addCleanup(thread.join, self.timeout)
6511        event.set()
6512
6513    def echo_client(self, addr, family):
6514        with socket.socket(family=family) as sock:
6515            sock.settimeout(self.timeout)
6516            sock.connect(addr)
6517            sock.sendall(b'foo')
6518            self.assertEqual(sock.recv(1024), b'foo')
6519
6520    def test_tcp4(self):
6521        port = socket_helper.find_unused_port()
6522        with socket.create_server(("", port)) as sock:
6523            self.echo_server(sock)
6524            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6525
6526    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6527    def test_tcp6(self):
6528        port = socket_helper.find_unused_port()
6529        with socket.create_server(("", port),
6530                                  family=socket.AF_INET6) as sock:
6531            self.echo_server(sock)
6532            self.echo_client(("::1", port), socket.AF_INET6)
6533
6534    # --- dual stack tests
6535
6536    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6537                     "dualstack_ipv6 not supported")
6538    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6539    def test_dual_stack_client_v4(self):
6540        port = socket_helper.find_unused_port()
6541        with socket.create_server(("", port), family=socket.AF_INET6,
6542                                  dualstack_ipv6=True) as sock:
6543            self.echo_server(sock)
6544            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6545
6546    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6547                     "dualstack_ipv6 not supported")
6548    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6549    def test_dual_stack_client_v6(self):
6550        port = socket_helper.find_unused_port()
6551        with socket.create_server(("", port), family=socket.AF_INET6,
6552                                  dualstack_ipv6=True) as sock:
6553            self.echo_server(sock)
6554            self.echo_client(("::1", port), socket.AF_INET6)
6555
6556@requireAttrs(socket, "send_fds")
6557@requireAttrs(socket, "recv_fds")
6558@requireAttrs(socket, "AF_UNIX")
6559class SendRecvFdsTests(unittest.TestCase):
6560    def testSendAndRecvFds(self):
6561        def close_pipes(pipes):
6562            for fd1, fd2 in pipes:
6563                os.close(fd1)
6564                os.close(fd2)
6565
6566        def close_fds(fds):
6567            for fd in fds:
6568                os.close(fd)
6569
6570        # send 10 file descriptors
6571        pipes = [os.pipe() for _ in range(10)]
6572        self.addCleanup(close_pipes, pipes)
6573        fds = [rfd for rfd, wfd in pipes]
6574
6575        # use a UNIX socket pair to exchange file descriptors locally
6576        sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
6577        with sock1, sock2:
6578            socket.send_fds(sock1, [MSG], fds)
6579            # request more data and file descriptors than expected
6580            msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2)
6581            self.addCleanup(close_fds, fds2)
6582
6583        self.assertEqual(msg, MSG)
6584        self.assertEqual(len(fds2), len(fds))
6585        self.assertEqual(flags, 0)
6586        # don't test addr
6587
6588        # test that file descriptors are connected
6589        for index, fds in enumerate(pipes):
6590            rfd, wfd = fds
6591            os.write(wfd, str(index).encode())
6592
6593        for index, rfd in enumerate(fds2):
6594            data = os.read(rfd, 100)
6595            self.assertEqual(data,  str(index).encode())
6596
6597
6598def setUpModule():
6599    thread_info = threading_helper.threading_setup()
6600    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
6601
6602
6603if __name__ == "__main__":
6604    unittest.main()
6605