• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from test.support import socket_helper
4
5import errno
6import io
7import itertools
8import socket
9import select
10import tempfile
11import time
12import traceback
13import queue
14import sys
15import os
16import platform
17import array
18import contextlib
19from weakref import proxy
20import signal
21import math
22import pickle
23import struct
24import random
25import shutil
26import string
27import _thread as thread
28import threading
29try:
30    import multiprocessing
31except ImportError:
32    multiprocessing = False
33try:
34    import fcntl
35except ImportError:
36    fcntl = None
37
38HOST = socket_helper.HOST
39# test unicode string and carriage return
40MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
41
42VSOCKPORT = 1234
43AIX = platform.system() == "AIX"
44
45try:
46    import _socket
47except ImportError:
48    _socket = None
49
50def get_cid():
51    if fcntl is None:
52        return None
53    if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'):
54        return None
55    try:
56        with open("/dev/vsock", "rb") as f:
57            r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, "    ")
58    except OSError:
59        return None
60    else:
61        return struct.unpack("I", r)[0]
62
63def _have_socket_can():
64    """Check whether CAN sockets are supported on this host."""
65    try:
66        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
67    except (AttributeError, OSError):
68        return False
69    else:
70        s.close()
71    return True
72
73def _have_socket_can_isotp():
74    """Check whether CAN ISOTP sockets are supported on this host."""
75    try:
76        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
77    except (AttributeError, OSError):
78        return False
79    else:
80        s.close()
81    return True
82
83def _have_socket_can_j1939():
84    """Check whether CAN J1939 sockets are supported on this host."""
85    try:
86        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939)
87    except (AttributeError, OSError):
88        return False
89    else:
90        s.close()
91    return True
92
93def _have_socket_rds():
94    """Check whether RDS sockets are supported on this host."""
95    try:
96        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
97    except (AttributeError, OSError):
98        return False
99    else:
100        s.close()
101    return True
102
103def _have_socket_alg():
104    """Check whether AF_ALG sockets are supported on this host."""
105    try:
106        s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
107    except (AttributeError, OSError):
108        return False
109    else:
110        s.close()
111    return True
112
113def _have_socket_qipcrtr():
114    """Check whether AF_QIPCRTR sockets are supported on this host."""
115    try:
116        s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0)
117    except (AttributeError, OSError):
118        return False
119    else:
120        s.close()
121    return True
122
123def _have_socket_vsock():
124    """Check whether AF_VSOCK sockets are supported on this host."""
125    ret = get_cid() is not None
126    return ret
127
128
129def _have_socket_bluetooth():
130    """Check whether AF_BLUETOOTH sockets are supported on this host."""
131    try:
132        # RFCOMM is supported by all platforms with bluetooth support. Windows
133        # does not support omitting the protocol.
134        s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
135    except (AttributeError, OSError):
136        return False
137    else:
138        s.close()
139    return True
140
141
142@contextlib.contextmanager
143def socket_setdefaulttimeout(timeout):
144    old_timeout = socket.getdefaulttimeout()
145    try:
146        socket.setdefaulttimeout(timeout)
147        yield
148    finally:
149        socket.setdefaulttimeout(old_timeout)
150
151
152HAVE_SOCKET_CAN = _have_socket_can()
153
154HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
155
156HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939()
157
158HAVE_SOCKET_RDS = _have_socket_rds()
159
160HAVE_SOCKET_ALG = _have_socket_alg()
161
162HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr()
163
164HAVE_SOCKET_VSOCK = _have_socket_vsock()
165
166HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE")
167
168HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
169
170# Size in bytes of the int type
171SIZEOF_INT = array.array("i").itemsize
172
173class SocketTCPTest(unittest.TestCase):
174
175    def setUp(self):
176        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
177        self.port = socket_helper.bind_port(self.serv)
178        self.serv.listen()
179
180    def tearDown(self):
181        self.serv.close()
182        self.serv = None
183
184class SocketUDPTest(unittest.TestCase):
185
186    def setUp(self):
187        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
188        self.port = socket_helper.bind_port(self.serv)
189
190    def tearDown(self):
191        self.serv.close()
192        self.serv = None
193
194class SocketUDPLITETest(SocketUDPTest):
195
196    def setUp(self):
197        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
198        self.port = socket_helper.bind_port(self.serv)
199
200class ThreadSafeCleanupTestCase(unittest.TestCase):
201    """Subclass of unittest.TestCase with thread-safe cleanup methods.
202
203    This subclass protects the addCleanup() and doCleanups() methods
204    with a recursive lock.
205    """
206
207    def __init__(self, *args, **kwargs):
208        super().__init__(*args, **kwargs)
209        self._cleanup_lock = threading.RLock()
210
211    def addCleanup(self, *args, **kwargs):
212        with self._cleanup_lock:
213            return super().addCleanup(*args, **kwargs)
214
215    def doCleanups(self, *args, **kwargs):
216        with self._cleanup_lock:
217            return super().doCleanups(*args, **kwargs)
218
219class SocketCANTest(unittest.TestCase):
220
221    """To be able to run this test, a `vcan0` CAN interface can be created with
222    the following commands:
223    # modprobe vcan
224    # ip link add dev vcan0 type vcan
225    # ifconfig vcan0 up
226    """
227    interface = 'vcan0'
228    bufsize = 128
229
230    """The CAN frame structure is defined in <linux/can.h>:
231
232    struct can_frame {
233        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
234        __u8    can_dlc; /* data length code: 0 .. 8 */
235        __u8    data[8] __attribute__((aligned(8)));
236    };
237    """
238    can_frame_fmt = "=IB3x8s"
239    can_frame_size = struct.calcsize(can_frame_fmt)
240
241    """The Broadcast Management Command frame structure is defined
242    in <linux/can/bcm.h>:
243
244    struct bcm_msg_head {
245        __u32 opcode;
246        __u32 flags;
247        __u32 count;
248        struct timeval ival1, ival2;
249        canid_t can_id;
250        __u32 nframes;
251        struct can_frame frames[0];
252    }
253
254    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
255    `struct can_frame` definition). Must use native not standard types for packing.
256    """
257    bcm_cmd_msg_fmt = "@3I4l2I"
258    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
259
260    def setUp(self):
261        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
262        self.addCleanup(self.s.close)
263        try:
264            self.s.bind((self.interface,))
265        except OSError:
266            self.skipTest('network interface `%s` does not exist' %
267                           self.interface)
268
269
270class SocketRDSTest(unittest.TestCase):
271
272    """To be able to run this test, the `rds` kernel module must be loaded:
273    # modprobe rds
274    """
275    bufsize = 8192
276
277    def setUp(self):
278        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
279        self.addCleanup(self.serv.close)
280        try:
281            self.port = socket_helper.bind_port(self.serv)
282        except OSError:
283            self.skipTest('unable to bind RDS socket')
284
285
286class ThreadableTest:
287    """Threadable Test class
288
289    The ThreadableTest class makes it easy to create a threaded
290    client/server pair from an existing unit test. To create a
291    new threaded class from an existing unit test, use multiple
292    inheritance:
293
294        class NewClass (OldClass, ThreadableTest):
295            pass
296
297    This class defines two new fixture functions with obvious
298    purposes for overriding:
299
300        clientSetUp ()
301        clientTearDown ()
302
303    Any new test functions within the class must then define
304    tests in pairs, where the test name is preceded with a
305    '_' to indicate the client portion of the test. Ex:
306
307        def testFoo(self):
308            # Server portion
309
310        def _testFoo(self):
311            # Client portion
312
313    Any exceptions raised by the clients during their tests
314    are caught and transferred to the main thread to alert
315    the testing framework.
316
317    Note, the server setup function cannot call any blocking
318    functions that rely on the client thread during setup,
319    unless serverExplicitReady() is called just before
320    the blocking call (such as in setting up a client/server
321    connection and performing the accept() in setUp().
322    """
323
324    def __init__(self):
325        # Swap the true setup function
326        self.__setUp = self.setUp
327        self.__tearDown = self.tearDown
328        self.setUp = self._setUp
329        self.tearDown = self._tearDown
330
331    def serverExplicitReady(self):
332        """This method allows the server to explicitly indicate that
333        it wants the client thread to proceed. This is useful if the
334        server is about to execute a blocking routine that is
335        dependent upon the client thread during its setup routine."""
336        self.server_ready.set()
337
338    def _setUp(self):
339        self.wait_threads = support.wait_threads_exit()
340        self.wait_threads.__enter__()
341
342        self.server_ready = threading.Event()
343        self.client_ready = threading.Event()
344        self.done = threading.Event()
345        self.queue = queue.Queue(1)
346        self.server_crashed = False
347
348        # Do some munging to start the client test.
349        methodname = self.id()
350        i = methodname.rfind('.')
351        methodname = methodname[i+1:]
352        test_method = getattr(self, '_' + methodname)
353        self.client_thread = thread.start_new_thread(
354            self.clientRun, (test_method,))
355
356        try:
357            self.__setUp()
358        except:
359            self.server_crashed = True
360            raise
361        finally:
362            self.server_ready.set()
363        self.client_ready.wait()
364
365    def _tearDown(self):
366        self.__tearDown()
367        self.done.wait()
368        self.wait_threads.__exit__(None, None, None)
369
370        if self.queue.qsize():
371            exc = self.queue.get()
372            raise exc
373
374    def clientRun(self, test_func):
375        self.server_ready.wait()
376        try:
377            self.clientSetUp()
378        except BaseException as e:
379            self.queue.put(e)
380            self.clientTearDown()
381            return
382        finally:
383            self.client_ready.set()
384        if self.server_crashed:
385            self.clientTearDown()
386            return
387        if not hasattr(test_func, '__call__'):
388            raise TypeError("test_func must be a callable function")
389        try:
390            test_func()
391        except BaseException as e:
392            self.queue.put(e)
393        finally:
394            self.clientTearDown()
395
396    def clientSetUp(self):
397        raise NotImplementedError("clientSetUp must be implemented.")
398
399    def clientTearDown(self):
400        self.done.set()
401        thread.exit()
402
403class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
404
405    def __init__(self, methodName='runTest'):
406        SocketTCPTest.__init__(self, methodName=methodName)
407        ThreadableTest.__init__(self)
408
409    def clientSetUp(self):
410        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
411
412    def clientTearDown(self):
413        self.cli.close()
414        self.cli = None
415        ThreadableTest.clientTearDown(self)
416
417class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
418
419    def __init__(self, methodName='runTest'):
420        SocketUDPTest.__init__(self, methodName=methodName)
421        ThreadableTest.__init__(self)
422
423    def clientSetUp(self):
424        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
425
426    def clientTearDown(self):
427        self.cli.close()
428        self.cli = None
429        ThreadableTest.clientTearDown(self)
430
431@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
432          'UDPLITE sockets required for this test.')
433class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest):
434
435    def __init__(self, methodName='runTest'):
436        SocketUDPLITETest.__init__(self, methodName=methodName)
437        ThreadableTest.__init__(self)
438
439    def clientSetUp(self):
440        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
441
442    def clientTearDown(self):
443        self.cli.close()
444        self.cli = None
445        ThreadableTest.clientTearDown(self)
446
447class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
448
449    def __init__(self, methodName='runTest'):
450        SocketCANTest.__init__(self, methodName=methodName)
451        ThreadableTest.__init__(self)
452
453    def clientSetUp(self):
454        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
455        try:
456            self.cli.bind((self.interface,))
457        except OSError:
458            # skipTest should not be called here, and will be called in the
459            # server instead
460            pass
461
462    def clientTearDown(self):
463        self.cli.close()
464        self.cli = None
465        ThreadableTest.clientTearDown(self)
466
467class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
468
469    def __init__(self, methodName='runTest'):
470        SocketRDSTest.__init__(self, methodName=methodName)
471        ThreadableTest.__init__(self)
472
473    def clientSetUp(self):
474        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
475        try:
476            # RDS sockets must be bound explicitly to send or receive data
477            self.cli.bind((HOST, 0))
478            self.cli_addr = self.cli.getsockname()
479        except OSError:
480            # skipTest should not be called here, and will be called in the
481            # server instead
482            pass
483
484    def clientTearDown(self):
485        self.cli.close()
486        self.cli = None
487        ThreadableTest.clientTearDown(self)
488
489@unittest.skipIf(fcntl is None, "need fcntl")
490@unittest.skipUnless(HAVE_SOCKET_VSOCK,
491          'VSOCK sockets required for this test.')
492@unittest.skipUnless(get_cid() != 2,
493          "This test can only be run on a virtual guest.")
494class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest):
495
496    def __init__(self, methodName='runTest'):
497        unittest.TestCase.__init__(self, methodName=methodName)
498        ThreadableTest.__init__(self)
499
500    def setUp(self):
501        self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
502        self.addCleanup(self.serv.close)
503        self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT))
504        self.serv.listen()
505        self.serverExplicitReady()
506        self.conn, self.connaddr = self.serv.accept()
507        self.addCleanup(self.conn.close)
508
509    def clientSetUp(self):
510        time.sleep(0.1)
511        self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
512        self.addCleanup(self.cli.close)
513        cid = get_cid()
514        self.cli.connect((cid, VSOCKPORT))
515
516    def testStream(self):
517        msg = self.conn.recv(1024)
518        self.assertEqual(msg, MSG)
519
520    def _testStream(self):
521        self.cli.send(MSG)
522        self.cli.close()
523
524class SocketConnectedTest(ThreadedTCPSocketTest):
525    """Socket tests for client-server connection.
526
527    self.cli_conn is a client socket connected to the server.  The
528    setUp() method guarantees that it is connected to the server.
529    """
530
531    def __init__(self, methodName='runTest'):
532        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
533
534    def setUp(self):
535        ThreadedTCPSocketTest.setUp(self)
536        # Indicate explicitly we're ready for the client thread to
537        # proceed and then perform the blocking call to accept
538        self.serverExplicitReady()
539        conn, addr = self.serv.accept()
540        self.cli_conn = conn
541
542    def tearDown(self):
543        self.cli_conn.close()
544        self.cli_conn = None
545        ThreadedTCPSocketTest.tearDown(self)
546
547    def clientSetUp(self):
548        ThreadedTCPSocketTest.clientSetUp(self)
549        self.cli.connect((HOST, self.port))
550        self.serv_conn = self.cli
551
552    def clientTearDown(self):
553        self.serv_conn.close()
554        self.serv_conn = None
555        ThreadedTCPSocketTest.clientTearDown(self)
556
557class SocketPairTest(unittest.TestCase, ThreadableTest):
558
559    def __init__(self, methodName='runTest'):
560        unittest.TestCase.__init__(self, methodName=methodName)
561        ThreadableTest.__init__(self)
562
563    def setUp(self):
564        self.serv, self.cli = socket.socketpair()
565
566    def tearDown(self):
567        self.serv.close()
568        self.serv = None
569
570    def clientSetUp(self):
571        pass
572
573    def clientTearDown(self):
574        self.cli.close()
575        self.cli = None
576        ThreadableTest.clientTearDown(self)
577
578
579# The following classes are used by the sendmsg()/recvmsg() tests.
580# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
581# gives a drop-in replacement for SocketConnectedTest, but different
582# address families can be used, and the attributes serv_addr and
583# cli_addr will be set to the addresses of the endpoints.
584
585class SocketTestBase(unittest.TestCase):
586    """A base class for socket tests.
587
588    Subclasses must provide methods newSocket() to return a new socket
589    and bindSock(sock) to bind it to an unused address.
590
591    Creates a socket self.serv and sets self.serv_addr to its address.
592    """
593
594    def setUp(self):
595        self.serv = self.newSocket()
596        self.bindServer()
597
598    def bindServer(self):
599        """Bind server socket and set self.serv_addr to its address."""
600        self.bindSock(self.serv)
601        self.serv_addr = self.serv.getsockname()
602
603    def tearDown(self):
604        self.serv.close()
605        self.serv = None
606
607
608class SocketListeningTestMixin(SocketTestBase):
609    """Mixin to listen on the server socket."""
610
611    def setUp(self):
612        super().setUp()
613        self.serv.listen()
614
615
616class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
617                              ThreadableTest):
618    """Mixin to add client socket and allow client/server tests.
619
620    Client socket is self.cli and its address is self.cli_addr.  See
621    ThreadableTest for usage information.
622    """
623
624    def __init__(self, *args, **kwargs):
625        super().__init__(*args, **kwargs)
626        ThreadableTest.__init__(self)
627
628    def clientSetUp(self):
629        self.cli = self.newClientSocket()
630        self.bindClient()
631
632    def newClientSocket(self):
633        """Return a new socket for use as client."""
634        return self.newSocket()
635
636    def bindClient(self):
637        """Bind client socket and set self.cli_addr to its address."""
638        self.bindSock(self.cli)
639        self.cli_addr = self.cli.getsockname()
640
641    def clientTearDown(self):
642        self.cli.close()
643        self.cli = None
644        ThreadableTest.clientTearDown(self)
645
646
647class ConnectedStreamTestMixin(SocketListeningTestMixin,
648                               ThreadedSocketTestMixin):
649    """Mixin to allow client/server stream tests with connected client.
650
651    Server's socket representing connection to client is self.cli_conn
652    and client's connection to server is self.serv_conn.  (Based on
653    SocketConnectedTest.)
654    """
655
656    def setUp(self):
657        super().setUp()
658        # Indicate explicitly we're ready for the client thread to
659        # proceed and then perform the blocking call to accept
660        self.serverExplicitReady()
661        conn, addr = self.serv.accept()
662        self.cli_conn = conn
663
664    def tearDown(self):
665        self.cli_conn.close()
666        self.cli_conn = None
667        super().tearDown()
668
669    def clientSetUp(self):
670        super().clientSetUp()
671        self.cli.connect(self.serv_addr)
672        self.serv_conn = self.cli
673
674    def clientTearDown(self):
675        try:
676            self.serv_conn.close()
677            self.serv_conn = None
678        except AttributeError:
679            pass
680        super().clientTearDown()
681
682
683class UnixSocketTestBase(SocketTestBase):
684    """Base class for Unix-domain socket tests."""
685
686    # This class is used for file descriptor passing tests, so we
687    # create the sockets in a private directory so that other users
688    # can't send anything that might be problematic for a privileged
689    # user running the tests.
690
691    def setUp(self):
692        self.dir_path = tempfile.mkdtemp()
693        self.addCleanup(os.rmdir, self.dir_path)
694        super().setUp()
695
696    def bindSock(self, sock):
697        path = tempfile.mktemp(dir=self.dir_path)
698        socket_helper.bind_unix_socket(sock, path)
699        self.addCleanup(support.unlink, path)
700
701class UnixStreamBase(UnixSocketTestBase):
702    """Base class for Unix-domain SOCK_STREAM tests."""
703
704    def newSocket(self):
705        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
706
707
708class InetTestBase(SocketTestBase):
709    """Base class for IPv4 socket tests."""
710
711    host = HOST
712
713    def setUp(self):
714        super().setUp()
715        self.port = self.serv_addr[1]
716
717    def bindSock(self, sock):
718        socket_helper.bind_port(sock, host=self.host)
719
720class TCPTestBase(InetTestBase):
721    """Base class for TCP-over-IPv4 tests."""
722
723    def newSocket(self):
724        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
725
726class UDPTestBase(InetTestBase):
727    """Base class for UDP-over-IPv4 tests."""
728
729    def newSocket(self):
730        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
731
732class UDPLITETestBase(InetTestBase):
733    """Base class for UDPLITE-over-IPv4 tests."""
734
735    def newSocket(self):
736        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
737
738class SCTPStreamBase(InetTestBase):
739    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
740
741    def newSocket(self):
742        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
743                             socket.IPPROTO_SCTP)
744
745
746class Inet6TestBase(InetTestBase):
747    """Base class for IPv6 socket tests."""
748
749    host = socket_helper.HOSTv6
750
751class UDP6TestBase(Inet6TestBase):
752    """Base class for UDP-over-IPv6 tests."""
753
754    def newSocket(self):
755        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
756
757class UDPLITE6TestBase(Inet6TestBase):
758    """Base class for UDPLITE-over-IPv6 tests."""
759
760    def newSocket(self):
761        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
762
763
764# Test-skipping decorators for use with ThreadableTest.
765
766def skipWithClientIf(condition, reason):
767    """Skip decorated test if condition is true, add client_skip decorator.
768
769    If the decorated object is not a class, sets its attribute
770    "client_skip" to a decorator which will return an empty function
771    if the test is to be skipped, or the original function if it is
772    not.  This can be used to avoid running the client part of a
773    skipped test when using ThreadableTest.
774    """
775    def client_pass(*args, **kwargs):
776        pass
777    def skipdec(obj):
778        retval = unittest.skip(reason)(obj)
779        if not isinstance(obj, type):
780            retval.client_skip = lambda f: client_pass
781        return retval
782    def noskipdec(obj):
783        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
784            obj.client_skip = lambda f: f
785        return obj
786    return skipdec if condition else noskipdec
787
788
789def requireAttrs(obj, *attributes):
790    """Skip decorated test if obj is missing any of the given attributes.
791
792    Sets client_skip attribute as skipWithClientIf() does.
793    """
794    missing = [name for name in attributes if not hasattr(obj, name)]
795    return skipWithClientIf(
796        missing, "don't have " + ", ".join(name for name in missing))
797
798
799def requireSocket(*args):
800    """Skip decorated test if a socket cannot be created with given arguments.
801
802    When an argument is given as a string, will use the value of that
803    attribute of the socket module, or skip the test if it doesn't
804    exist.  Sets client_skip attribute as skipWithClientIf() does.
805    """
806    err = None
807    missing = [obj for obj in args if
808               isinstance(obj, str) and not hasattr(socket, obj)]
809    if missing:
810        err = "don't have " + ", ".join(name for name in missing)
811    else:
812        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
813                    for obj in args]
814        try:
815            s = socket.socket(*callargs)
816        except OSError as e:
817            # XXX: check errno?
818            err = str(e)
819        else:
820            s.close()
821    return skipWithClientIf(
822        err is not None,
823        "can't create socket({0}): {1}".format(
824            ", ".join(str(o) for o in args), err))
825
826
827#######################################################################
828## Begin Tests
829
830class GeneralModuleTests(unittest.TestCase):
831
832    def test_SocketType_is_socketobject(self):
833        import _socket
834        self.assertTrue(socket.SocketType is _socket.socket)
835        s = socket.socket()
836        self.assertIsInstance(s, socket.SocketType)
837        s.close()
838
839    def test_repr(self):
840        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
841        with s:
842            self.assertIn('fd=%i' % s.fileno(), repr(s))
843            self.assertIn('family=%s' % socket.AF_INET, repr(s))
844            self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
845            self.assertIn('proto=0', repr(s))
846            self.assertNotIn('raddr', repr(s))
847            s.bind(('127.0.0.1', 0))
848            self.assertIn('laddr', repr(s))
849            self.assertIn(str(s.getsockname()), repr(s))
850        self.assertIn('[closed]', repr(s))
851        self.assertNotIn('laddr', repr(s))
852
853    @unittest.skipUnless(_socket is not None, 'need _socket module')
854    def test_csocket_repr(self):
855        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
856        try:
857            expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
858                        % (s.fileno(), s.family, s.type, s.proto))
859            self.assertEqual(repr(s), expected)
860        finally:
861            s.close()
862        expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
863                    % (s.family, s.type, s.proto))
864        self.assertEqual(repr(s), expected)
865
866    def test_weakref(self):
867        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
868            p = proxy(s)
869            self.assertEqual(p.fileno(), s.fileno())
870        s = None
871        try:
872            p.fileno()
873        except ReferenceError:
874            pass
875        else:
876            self.fail('Socket proxy still exists')
877
878    def testSocketError(self):
879        # Testing socket module exceptions
880        msg = "Error raising socket exception (%s)."
881        with self.assertRaises(OSError, msg=msg % 'OSError'):
882            raise OSError
883        with self.assertRaises(OSError, msg=msg % 'socket.herror'):
884            raise socket.herror
885        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
886            raise socket.gaierror
887
888    def testSendtoErrors(self):
889        # Testing that sendto doesn't mask failures. See #10169.
890        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
891        self.addCleanup(s.close)
892        s.bind(('', 0))
893        sockname = s.getsockname()
894        # 2 args
895        with self.assertRaises(TypeError) as cm:
896            s.sendto('\u2620', sockname)
897        self.assertEqual(str(cm.exception),
898                         "a bytes-like object is required, not 'str'")
899        with self.assertRaises(TypeError) as cm:
900            s.sendto(5j, sockname)
901        self.assertEqual(str(cm.exception),
902                         "a bytes-like object is required, not 'complex'")
903        with self.assertRaises(TypeError) as cm:
904            s.sendto(b'foo', None)
905        self.assertIn('not NoneType',str(cm.exception))
906        # 3 args
907        with self.assertRaises(TypeError) as cm:
908            s.sendto('\u2620', 0, sockname)
909        self.assertEqual(str(cm.exception),
910                         "a bytes-like object is required, not 'str'")
911        with self.assertRaises(TypeError) as cm:
912            s.sendto(5j, 0, sockname)
913        self.assertEqual(str(cm.exception),
914                         "a bytes-like object is required, not 'complex'")
915        with self.assertRaises(TypeError) as cm:
916            s.sendto(b'foo', 0, None)
917        self.assertIn('not NoneType', str(cm.exception))
918        with self.assertRaises(TypeError) as cm:
919            s.sendto(b'foo', 'bar', sockname)
920        self.assertIn('an integer is required', str(cm.exception))
921        with self.assertRaises(TypeError) as cm:
922            s.sendto(b'foo', None, None)
923        self.assertIn('an integer is required', str(cm.exception))
924        # wrong number of args
925        with self.assertRaises(TypeError) as cm:
926            s.sendto(b'foo')
927        self.assertIn('(1 given)', str(cm.exception))
928        with self.assertRaises(TypeError) as cm:
929            s.sendto(b'foo', 0, sockname, 4)
930        self.assertIn('(4 given)', str(cm.exception))
931
932    def testCrucialConstants(self):
933        # Testing for mission critical constants
934        socket.AF_INET
935        if socket.has_ipv6:
936            socket.AF_INET6
937        socket.SOCK_STREAM
938        socket.SOCK_DGRAM
939        socket.SOCK_RAW
940        socket.SOCK_RDM
941        socket.SOCK_SEQPACKET
942        socket.SOL_SOCKET
943        socket.SO_REUSEADDR
944
945    def testCrucialIpProtoConstants(self):
946        socket.IPPROTO_TCP
947        socket.IPPROTO_UDP
948        if socket.has_ipv6:
949            socket.IPPROTO_IPV6
950
951    @unittest.skipUnless(os.name == "nt", "Windows specific")
952    def testWindowsSpecificConstants(self):
953        socket.IPPROTO_ICLFXBM
954        socket.IPPROTO_ST
955        socket.IPPROTO_CBT
956        socket.IPPROTO_IGP
957        socket.IPPROTO_RDP
958        socket.IPPROTO_PGM
959        socket.IPPROTO_L2TP
960        socket.IPPROTO_SCTP
961
962    @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
963    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
964    def test3542SocketOptions(self):
965        # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542
966        opts = {
967            'IPV6_CHECKSUM',
968            'IPV6_DONTFRAG',
969            'IPV6_DSTOPTS',
970            'IPV6_HOPLIMIT',
971            'IPV6_HOPOPTS',
972            'IPV6_NEXTHOP',
973            'IPV6_PATHMTU',
974            'IPV6_PKTINFO',
975            'IPV6_RECVDSTOPTS',
976            'IPV6_RECVHOPLIMIT',
977            'IPV6_RECVHOPOPTS',
978            'IPV6_RECVPATHMTU',
979            'IPV6_RECVPKTINFO',
980            'IPV6_RECVRTHDR',
981            'IPV6_RECVTCLASS',
982            'IPV6_RTHDR',
983            'IPV6_RTHDRDSTOPTS',
984            'IPV6_RTHDR_TYPE_0',
985            'IPV6_TCLASS',
986            'IPV6_USE_MIN_MTU',
987        }
988        for opt in opts:
989            self.assertTrue(
990                hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'"
991            )
992
993    def testHostnameRes(self):
994        # Testing hostname resolution mechanisms
995        hostname = socket.gethostname()
996        try:
997            ip = socket.gethostbyname(hostname)
998        except OSError:
999            # Probably name lookup wasn't set up right; skip this test
1000            self.skipTest('name lookup failure')
1001        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
1002        try:
1003            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
1004        except OSError:
1005            # Probably a similar problem as above; skip this test
1006            self.skipTest('name lookup failure')
1007        all_host_names = [hostname, hname] + aliases
1008        fqhn = socket.getfqdn(ip)
1009        if not fqhn in all_host_names:
1010            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
1011
1012    def test_host_resolution(self):
1013        for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']:
1014            self.assertEqual(socket.gethostbyname(addr), addr)
1015
1016        # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have
1017        # a matching name entry (e.g. 'ip6-localhost')
1018        for host in [socket_helper.HOSTv4]:
1019            self.assertIn(host, socket.gethostbyaddr(host)[2])
1020
1021    def test_host_resolution_bad_address(self):
1022        # These are all malformed IP addresses and expected not to resolve to
1023        # any result.  But some ISPs, e.g. AWS, may successfully resolve these
1024        # IPs.
1025        explanation = (
1026            "resolving an invalid IP address did not raise OSError; "
1027            "can be caused by a broken DNS server"
1028        )
1029        for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
1030                     '1:1:1:1:1:1:1:1:1']:
1031            with self.assertRaises(OSError, msg=addr):
1032                socket.gethostbyname(addr)
1033            with self.assertRaises(OSError, msg=explanation):
1034                socket.gethostbyaddr(addr)
1035
1036    @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
1037    @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
1038    def test_sethostname(self):
1039        oldhn = socket.gethostname()
1040        try:
1041            socket.sethostname('new')
1042        except OSError as e:
1043            if e.errno == errno.EPERM:
1044                self.skipTest("test should be run as root")
1045            else:
1046                raise
1047        try:
1048            # running test as root!
1049            self.assertEqual(socket.gethostname(), 'new')
1050            # Should work with bytes objects too
1051            socket.sethostname(b'bar')
1052            self.assertEqual(socket.gethostname(), 'bar')
1053        finally:
1054            socket.sethostname(oldhn)
1055
1056    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
1057                         'socket.if_nameindex() not available.')
1058    def testInterfaceNameIndex(self):
1059        interfaces = socket.if_nameindex()
1060        for index, name in interfaces:
1061            self.assertIsInstance(index, int)
1062            self.assertIsInstance(name, str)
1063            # interface indices are non-zero integers
1064            self.assertGreater(index, 0)
1065            _index = socket.if_nametoindex(name)
1066            self.assertIsInstance(_index, int)
1067            self.assertEqual(index, _index)
1068            _name = socket.if_indextoname(index)
1069            self.assertIsInstance(_name, str)
1070            self.assertEqual(name, _name)
1071
1072    @unittest.skipUnless(hasattr(socket, 'if_indextoname'),
1073                         'socket.if_indextoname() not available.')
1074    def testInvalidInterfaceIndexToName(self):
1075        self.assertRaises(OSError, socket.if_indextoname, 0)
1076        self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
1077
1078    @unittest.skipUnless(hasattr(socket, 'if_nametoindex'),
1079                         'socket.if_nametoindex() not available.')
1080    def testInvalidInterfaceNameToIndex(self):
1081        self.assertRaises(TypeError, socket.if_nametoindex, 0)
1082        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
1083
1084    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
1085                         'test needs sys.getrefcount()')
1086    def testRefCountGetNameInfo(self):
1087        # Testing reference count for getnameinfo
1088        try:
1089            # On some versions, this loses a reference
1090            orig = sys.getrefcount(__name__)
1091            socket.getnameinfo(__name__,0)
1092        except TypeError:
1093            if sys.getrefcount(__name__) != orig:
1094                self.fail("socket.getnameinfo loses a reference")
1095
1096    def testInterpreterCrash(self):
1097        # Making sure getnameinfo doesn't crash the interpreter
1098        try:
1099            # On some versions, this crashes the interpreter.
1100            socket.getnameinfo(('x', 0, 0, 0), 0)
1101        except OSError:
1102            pass
1103
1104    def testNtoH(self):
1105        # This just checks that htons etc. are their own inverse,
1106        # when looking at the lower 16 or 32 bits.
1107        sizes = {socket.htonl: 32, socket.ntohl: 32,
1108                 socket.htons: 16, socket.ntohs: 16}
1109        for func, size in sizes.items():
1110            mask = (1<<size) - 1
1111            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
1112                self.assertEqual(i & mask, func(func(i&mask)) & mask)
1113
1114            swapped = func(mask)
1115            self.assertEqual(swapped & mask, mask)
1116            self.assertRaises(OverflowError, func, 1<<34)
1117
1118    @support.cpython_only
1119    def testNtoHErrors(self):
1120        import _testcapi
1121        s_good_values = [0, 1, 2, 0xffff]
1122        l_good_values = s_good_values + [0xffffffff]
1123        l_bad_values = [-1, -2, 1<<32, 1<<1000]
1124        s_bad_values = l_bad_values + [_testcapi.INT_MIN - 1,
1125                                       _testcapi.INT_MAX + 1]
1126        s_deprecated_values = [1<<16, _testcapi.INT_MAX]
1127        for k in s_good_values:
1128            socket.ntohs(k)
1129            socket.htons(k)
1130        for k in l_good_values:
1131            socket.ntohl(k)
1132            socket.htonl(k)
1133        for k in s_bad_values:
1134            self.assertRaises(OverflowError, socket.ntohs, k)
1135            self.assertRaises(OverflowError, socket.htons, k)
1136        for k in l_bad_values:
1137            self.assertRaises(OverflowError, socket.ntohl, k)
1138            self.assertRaises(OverflowError, socket.htonl, k)
1139        for k in s_deprecated_values:
1140            self.assertWarns(DeprecationWarning, socket.ntohs, k)
1141            self.assertWarns(DeprecationWarning, socket.htons, k)
1142
1143    def testGetServBy(self):
1144        eq = self.assertEqual
1145        # Find one service that exists, then check all the related interfaces.
1146        # I've ordered this by protocols that have both a tcp and udp
1147        # protocol, at least for modern Linuxes.
1148        if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
1149            or sys.platform in ('linux', 'darwin')):
1150            # avoid the 'echo' service on this platform, as there is an
1151            # assumption breaking non-standard port/protocol entry
1152            services = ('daytime', 'qotd', 'domain')
1153        else:
1154            services = ('echo', 'daytime', 'domain')
1155        for service in services:
1156            try:
1157                port = socket.getservbyname(service, 'tcp')
1158                break
1159            except OSError:
1160                pass
1161        else:
1162            raise OSError
1163        # Try same call with optional protocol omitted
1164        # Issue #26936: Android getservbyname() was broken before API 23.
1165        if (not hasattr(sys, 'getandroidapilevel') or
1166                sys.getandroidapilevel() >= 23):
1167            port2 = socket.getservbyname(service)
1168            eq(port, port2)
1169        # Try udp, but don't barf if it doesn't exist
1170        try:
1171            udpport = socket.getservbyname(service, 'udp')
1172        except OSError:
1173            udpport = None
1174        else:
1175            eq(udpport, port)
1176        # Now make sure the lookup by port returns the same service name
1177        # Issue #26936: Android getservbyport() is broken.
1178        if not support.is_android:
1179            eq(socket.getservbyport(port2), service)
1180        eq(socket.getservbyport(port, 'tcp'), service)
1181        if udpport is not None:
1182            eq(socket.getservbyport(udpport, 'udp'), service)
1183        # Make sure getservbyport does not accept out of range ports.
1184        self.assertRaises(OverflowError, socket.getservbyport, -1)
1185        self.assertRaises(OverflowError, socket.getservbyport, 65536)
1186
1187    def testDefaultTimeout(self):
1188        # Testing default timeout
1189        # The default timeout should initially be None
1190        self.assertEqual(socket.getdefaulttimeout(), None)
1191        with socket.socket() as s:
1192            self.assertEqual(s.gettimeout(), None)
1193
1194        # Set the default timeout to 10, and see if it propagates
1195        with socket_setdefaulttimeout(10):
1196            self.assertEqual(socket.getdefaulttimeout(), 10)
1197            with socket.socket() as sock:
1198                self.assertEqual(sock.gettimeout(), 10)
1199
1200            # Reset the default timeout to None, and see if it propagates
1201            socket.setdefaulttimeout(None)
1202            self.assertEqual(socket.getdefaulttimeout(), None)
1203            with socket.socket() as sock:
1204                self.assertEqual(sock.gettimeout(), None)
1205
1206        # Check that setting it to an invalid value raises ValueError
1207        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
1208
1209        # Check that setting it to an invalid type raises TypeError
1210        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
1211
1212    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
1213                         'test needs socket.inet_aton()')
1214    def testIPv4_inet_aton_fourbytes(self):
1215        # Test that issue1008086 and issue767150 are fixed.
1216        # It must return 4 bytes.
1217        self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
1218        self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
1219
1220    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1221                         'test needs socket.inet_pton()')
1222    def testIPv4toString(self):
1223        from socket import inet_aton as f, inet_pton, AF_INET
1224        g = lambda a: inet_pton(AF_INET, a)
1225
1226        assertInvalid = lambda func,a: self.assertRaises(
1227            (OSError, ValueError), func, a
1228        )
1229
1230        self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
1231        self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1232        self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1233        self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1234        self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1235        # bpo-29972: inet_pton() doesn't fail on AIX
1236        if not AIX:
1237            assertInvalid(f, '0.0.0.')
1238        assertInvalid(f, '300.0.0.0')
1239        assertInvalid(f, 'a.0.0.0')
1240        assertInvalid(f, '1.2.3.4.5')
1241        assertInvalid(f, '::1')
1242
1243        self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1244        self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1245        self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1246        self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1247        assertInvalid(g, '0.0.0.')
1248        assertInvalid(g, '300.0.0.0')
1249        assertInvalid(g, 'a.0.0.0')
1250        assertInvalid(g, '1.2.3.4.5')
1251        assertInvalid(g, '::1')
1252
1253    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1254                         'test needs socket.inet_pton()')
1255    def testIPv6toString(self):
1256        try:
1257            from socket import inet_pton, AF_INET6, has_ipv6
1258            if not has_ipv6:
1259                self.skipTest('IPv6 not available')
1260        except ImportError:
1261            self.skipTest('could not import needed symbols from socket')
1262
1263        if sys.platform == "win32":
1264            try:
1265                inet_pton(AF_INET6, '::')
1266            except OSError as e:
1267                if e.winerror == 10022:
1268                    self.skipTest('IPv6 might not be supported')
1269
1270        f = lambda a: inet_pton(AF_INET6, a)
1271        assertInvalid = lambda a: self.assertRaises(
1272            (OSError, ValueError), f, a
1273        )
1274
1275        self.assertEqual(b'\x00' * 16, f('::'))
1276        self.assertEqual(b'\x00' * 16, f('0::0'))
1277        self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1278        self.assertEqual(
1279            b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1280            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1281        )
1282        self.assertEqual(
1283            b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1284            f('ad42:abc::127:0:254:2')
1285        )
1286        self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1287        assertInvalid('0x20::')
1288        assertInvalid(':::')
1289        assertInvalid('::0::')
1290        assertInvalid('1::abc::')
1291        assertInvalid('1::abc::def')
1292        assertInvalid('1:2:3:4:5:6')
1293        assertInvalid('1:2:3:4:5:6:')
1294        assertInvalid('1:2:3:4:5:6:7:8:0')
1295        # bpo-29972: inet_pton() doesn't fail on AIX
1296        if not AIX:
1297            assertInvalid('1:2:3:4:5:6:7:8:')
1298
1299        self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1300            f('::254.42.23.64')
1301        )
1302        self.assertEqual(
1303            b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1304            f('42::a29b:254.42.23.64')
1305        )
1306        self.assertEqual(
1307            b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1308            f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1309        )
1310        assertInvalid('255.254.253.252')
1311        assertInvalid('1::260.2.3.0')
1312        assertInvalid('1::0.be.e.0')
1313        assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1314        assertInvalid('::1.2.3.4:0')
1315        assertInvalid('0.100.200.0:3:4:5:6:7:8')
1316
1317    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1318                         'test needs socket.inet_ntop()')
1319    def testStringToIPv4(self):
1320        from socket import inet_ntoa as f, inet_ntop, AF_INET
1321        g = lambda a: inet_ntop(AF_INET, a)
1322        assertInvalid = lambda func,a: self.assertRaises(
1323            (OSError, ValueError), func, a
1324        )
1325
1326        self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1327        self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1328        self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1329        self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1330        assertInvalid(f, b'\x00' * 3)
1331        assertInvalid(f, b'\x00' * 5)
1332        assertInvalid(f, b'\x00' * 16)
1333        self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1334
1335        self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1336        self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1337        self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1338        assertInvalid(g, b'\x00' * 3)
1339        assertInvalid(g, b'\x00' * 5)
1340        assertInvalid(g, b'\x00' * 16)
1341        self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1342
1343    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1344                         'test needs socket.inet_ntop()')
1345    def testStringToIPv6(self):
1346        try:
1347            from socket import inet_ntop, AF_INET6, has_ipv6
1348            if not has_ipv6:
1349                self.skipTest('IPv6 not available')
1350        except ImportError:
1351            self.skipTest('could not import needed symbols from socket')
1352
1353        if sys.platform == "win32":
1354            try:
1355                inet_ntop(AF_INET6, b'\x00' * 16)
1356            except OSError as e:
1357                if e.winerror == 10022:
1358                    self.skipTest('IPv6 might not be supported')
1359
1360        f = lambda a: inet_ntop(AF_INET6, a)
1361        assertInvalid = lambda a: self.assertRaises(
1362            (OSError, ValueError), f, a
1363        )
1364
1365        self.assertEqual('::', f(b'\x00' * 16))
1366        self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1367        self.assertEqual(
1368            'aef:b01:506:1001:ffff:9997:55:170',
1369            f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1370        )
1371        self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1372
1373        assertInvalid(b'\x12' * 15)
1374        assertInvalid(b'\x12' * 17)
1375        assertInvalid(b'\x12' * 4)
1376
1377    # XXX The following don't test module-level functionality...
1378
1379    def testSockName(self):
1380        # Testing getsockname()
1381        port = socket_helper.find_unused_port()
1382        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1383        self.addCleanup(sock.close)
1384        sock.bind(("0.0.0.0", port))
1385        name = sock.getsockname()
1386        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1387        # it reasonable to get the host's addr in addition to 0.0.0.0.
1388        # At least for eCos.  This is required for the S/390 to pass.
1389        try:
1390            my_ip_addr = socket.gethostbyname(socket.gethostname())
1391        except OSError:
1392            # Probably name lookup wasn't set up right; skip this test
1393            self.skipTest('name lookup failure')
1394        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1395        self.assertEqual(name[1], port)
1396
1397    def testGetSockOpt(self):
1398        # Testing getsockopt()
1399        # We know a socket should start without reuse==0
1400        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1401        self.addCleanup(sock.close)
1402        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1403        self.assertFalse(reuse != 0, "initial mode is reuse")
1404
1405    def testSetSockOpt(self):
1406        # Testing setsockopt()
1407        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1408        self.addCleanup(sock.close)
1409        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1410        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1411        self.assertFalse(reuse == 0, "failed to set reuse mode")
1412
1413    def testSendAfterClose(self):
1414        # testing send() after close() with timeout
1415        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1416            sock.settimeout(1)
1417        self.assertRaises(OSError, sock.send, b"spam")
1418
1419    def testCloseException(self):
1420        sock = socket.socket()
1421        sock.bind((socket._LOCALHOST, 0))
1422        socket.socket(fileno=sock.fileno()).close()
1423        try:
1424            sock.close()
1425        except OSError as err:
1426            # Winsock apparently raises ENOTSOCK
1427            self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1428        else:
1429            self.fail("close() should raise EBADF/ENOTSOCK")
1430
1431    def testNewAttributes(self):
1432        # testing .family, .type and .protocol
1433
1434        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1435            self.assertEqual(sock.family, socket.AF_INET)
1436            if hasattr(socket, 'SOCK_CLOEXEC'):
1437                self.assertIn(sock.type,
1438                              (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1439                               socket.SOCK_STREAM))
1440            else:
1441                self.assertEqual(sock.type, socket.SOCK_STREAM)
1442            self.assertEqual(sock.proto, 0)
1443
1444    def test_getsockaddrarg(self):
1445        sock = socket.socket()
1446        self.addCleanup(sock.close)
1447        port = socket_helper.find_unused_port()
1448        big_port = port + 65536
1449        neg_port = port - 65536
1450        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1451        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1452        # Since find_unused_port() is inherently subject to race conditions, we
1453        # call it a couple times if necessary.
1454        for i in itertools.count():
1455            port = socket_helper.find_unused_port()
1456            try:
1457                sock.bind((HOST, port))
1458            except OSError as e:
1459                if e.errno != errno.EADDRINUSE or i == 5:
1460                    raise
1461            else:
1462                break
1463
1464    @unittest.skipUnless(os.name == "nt", "Windows specific")
1465    def test_sock_ioctl(self):
1466        self.assertTrue(hasattr(socket.socket, 'ioctl'))
1467        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1468        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1469        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1470        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1471        s = socket.socket()
1472        self.addCleanup(s.close)
1473        self.assertRaises(ValueError, s.ioctl, -1, None)
1474        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1475
1476    @unittest.skipUnless(os.name == "nt", "Windows specific")
1477    @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1478                         'Loopback fast path support required for this test')
1479    def test_sio_loopback_fast_path(self):
1480        s = socket.socket()
1481        self.addCleanup(s.close)
1482        try:
1483            s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1484        except OSError as exc:
1485            WSAEOPNOTSUPP = 10045
1486            if exc.winerror == WSAEOPNOTSUPP:
1487                self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1488                              "doesn't implemented in this Windows version")
1489            raise
1490        self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1491
1492    def testGetaddrinfo(self):
1493        try:
1494            socket.getaddrinfo('localhost', 80)
1495        except socket.gaierror as err:
1496            if err.errno == socket.EAI_SERVICE:
1497                # see http://bugs.python.org/issue1282647
1498                self.skipTest("buggy libc version")
1499            raise
1500        # len of every sequence is supposed to be == 5
1501        for info in socket.getaddrinfo(HOST, None):
1502            self.assertEqual(len(info), 5)
1503        # host can be a domain name, a string representation of an
1504        # IPv4/v6 address or None
1505        socket.getaddrinfo('localhost', 80)
1506        socket.getaddrinfo('127.0.0.1', 80)
1507        socket.getaddrinfo(None, 80)
1508        if socket_helper.IPV6_ENABLED:
1509            socket.getaddrinfo('::1', 80)
1510        # port can be a string service name such as "http", a numeric
1511        # port number or None
1512        # Issue #26936: Android getaddrinfo() was broken before API level 23.
1513        if (not hasattr(sys, 'getandroidapilevel') or
1514                sys.getandroidapilevel() >= 23):
1515            socket.getaddrinfo(HOST, "http")
1516        socket.getaddrinfo(HOST, 80)
1517        socket.getaddrinfo(HOST, None)
1518        # test family and socktype filters
1519        infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1520        for family, type, _, _, _ in infos:
1521            self.assertEqual(family, socket.AF_INET)
1522            self.assertEqual(str(family), 'AddressFamily.AF_INET')
1523            self.assertEqual(type, socket.SOCK_STREAM)
1524            self.assertEqual(str(type), 'SocketKind.SOCK_STREAM')
1525        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1526        for _, socktype, _, _, _ in infos:
1527            self.assertEqual(socktype, socket.SOCK_STREAM)
1528        # test proto and flags arguments
1529        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1530        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1531        # a server willing to support both IPv4 and IPv6 will
1532        # usually do this
1533        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1534                           socket.AI_PASSIVE)
1535        # test keyword arguments
1536        a = socket.getaddrinfo(HOST, None)
1537        b = socket.getaddrinfo(host=HOST, port=None)
1538        self.assertEqual(a, b)
1539        a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1540        b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1541        self.assertEqual(a, b)
1542        a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1543        b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1544        self.assertEqual(a, b)
1545        a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1546        b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1547        self.assertEqual(a, b)
1548        a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1549        b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1550        self.assertEqual(a, b)
1551        a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1552                               socket.AI_PASSIVE)
1553        b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1554                               type=socket.SOCK_STREAM, proto=0,
1555                               flags=socket.AI_PASSIVE)
1556        self.assertEqual(a, b)
1557        # Issue #6697.
1558        self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1559
1560        # Issue 17269: test workaround for OS X platform bug segfault
1561        if hasattr(socket, 'AI_NUMERICSERV'):
1562            try:
1563                # The arguments here are undefined and the call may succeed
1564                # or fail.  All we care here is that it doesn't segfault.
1565                socket.getaddrinfo("localhost", None, 0, 0, 0,
1566                                   socket.AI_NUMERICSERV)
1567            except socket.gaierror:
1568                pass
1569
1570    def test_getnameinfo(self):
1571        # only IP addresses are allowed
1572        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1573
1574    @unittest.skipUnless(support.is_resource_enabled('network'),
1575                         'network is not enabled')
1576    def test_idna(self):
1577        # Check for internet access before running test
1578        # (issue #12804, issue #25138).
1579        with socket_helper.transient_internet('python.org'):
1580            socket.gethostbyname('python.org')
1581
1582        # these should all be successful
1583        domain = 'испытание.pythontest.net'
1584        socket.gethostbyname(domain)
1585        socket.gethostbyname_ex(domain)
1586        socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1587        # this may not work if the forward lookup chooses the IPv6 address, as that doesn't
1588        # have a reverse entry yet
1589        # socket.gethostbyaddr('испытание.python.org')
1590
1591    def check_sendall_interrupted(self, with_timeout):
1592        # socketpair() is not strictly required, but it makes things easier.
1593        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1594            self.skipTest("signal.alarm and socket.socketpair required for this test")
1595        # Our signal handlers clobber the C errno by calling a math function
1596        # with an invalid domain value.
1597        def ok_handler(*args):
1598            self.assertRaises(ValueError, math.acosh, 0)
1599        def raising_handler(*args):
1600            self.assertRaises(ValueError, math.acosh, 0)
1601            1 // 0
1602        c, s = socket.socketpair()
1603        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1604        try:
1605            if with_timeout:
1606                # Just above the one second minimum for signal.alarm
1607                c.settimeout(1.5)
1608            with self.assertRaises(ZeroDivisionError):
1609                signal.alarm(1)
1610                c.sendall(b"x" * support.SOCK_MAX_SIZE)
1611            if with_timeout:
1612                signal.signal(signal.SIGALRM, ok_handler)
1613                signal.alarm(1)
1614                self.assertRaises(socket.timeout, c.sendall,
1615                                  b"x" * support.SOCK_MAX_SIZE)
1616        finally:
1617            signal.alarm(0)
1618            signal.signal(signal.SIGALRM, old_alarm)
1619            c.close()
1620            s.close()
1621
1622    def test_sendall_interrupted(self):
1623        self.check_sendall_interrupted(False)
1624
1625    def test_sendall_interrupted_with_timeout(self):
1626        self.check_sendall_interrupted(True)
1627
1628    def test_dealloc_warn(self):
1629        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1630        r = repr(sock)
1631        with self.assertWarns(ResourceWarning) as cm:
1632            sock = None
1633            support.gc_collect()
1634        self.assertIn(r, str(cm.warning.args[0]))
1635        # An open socket file object gets dereferenced after the socket
1636        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1637        f = sock.makefile('rb')
1638        r = repr(sock)
1639        sock = None
1640        support.gc_collect()
1641        with self.assertWarns(ResourceWarning):
1642            f = None
1643            support.gc_collect()
1644
1645    def test_name_closed_socketio(self):
1646        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1647            fp = sock.makefile("rb")
1648            fp.close()
1649            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1650
1651    def test_unusable_closed_socketio(self):
1652        with socket.socket() as sock:
1653            fp = sock.makefile("rb", buffering=0)
1654            self.assertTrue(fp.readable())
1655            self.assertFalse(fp.writable())
1656            self.assertFalse(fp.seekable())
1657            fp.close()
1658            self.assertRaises(ValueError, fp.readable)
1659            self.assertRaises(ValueError, fp.writable)
1660            self.assertRaises(ValueError, fp.seekable)
1661
1662    def test_socket_close(self):
1663        sock = socket.socket()
1664        try:
1665            sock.bind((HOST, 0))
1666            socket.close(sock.fileno())
1667            with self.assertRaises(OSError):
1668                sock.listen(1)
1669        finally:
1670            with self.assertRaises(OSError):
1671                # sock.close() fails with EBADF
1672                sock.close()
1673        with self.assertRaises(TypeError):
1674            socket.close(None)
1675        with self.assertRaises(OSError):
1676            socket.close(-1)
1677
1678    def test_makefile_mode(self):
1679        for mode in 'r', 'rb', 'rw', 'w', 'wb':
1680            with self.subTest(mode=mode):
1681                with socket.socket() as sock:
1682                    with sock.makefile(mode) as fp:
1683                        self.assertEqual(fp.mode, mode)
1684
1685    def test_makefile_invalid_mode(self):
1686        for mode in 'rt', 'x', '+', 'a':
1687            with self.subTest(mode=mode):
1688                with socket.socket() as sock:
1689                    with self.assertRaisesRegex(ValueError, 'invalid mode'):
1690                        sock.makefile(mode)
1691
1692    def test_pickle(self):
1693        sock = socket.socket()
1694        with sock:
1695            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1696                self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1697        for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1698            family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1699            self.assertEqual(family, socket.AF_INET)
1700            type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1701            self.assertEqual(type, socket.SOCK_STREAM)
1702
1703    def test_listen_backlog(self):
1704        for backlog in 0, -1:
1705            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1706                srv.bind((HOST, 0))
1707                srv.listen(backlog)
1708
1709        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1710            srv.bind((HOST, 0))
1711            srv.listen()
1712
1713    @support.cpython_only
1714    def test_listen_backlog_overflow(self):
1715        # Issue 15989
1716        import _testcapi
1717        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1718            srv.bind((HOST, 0))
1719            self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1720
1721    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1722    def test_flowinfo(self):
1723        self.assertRaises(OverflowError, socket.getnameinfo,
1724                          (socket_helper.HOSTv6, 0, 0xffffffff), 0)
1725        with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1726            self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10))
1727
1728    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1729    def test_getaddrinfo_ipv6_basic(self):
1730        ((*_, sockaddr),) = socket.getaddrinfo(
1731            'ff02::1de:c0:face:8D',  # Note capital letter `D`.
1732            1234, socket.AF_INET6,
1733            socket.SOCK_DGRAM,
1734            socket.IPPROTO_UDP
1735        )
1736        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
1737
1738    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1739    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1740    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1741    def test_getaddrinfo_ipv6_scopeid_symbolic(self):
1742        # Just pick up any network interface (Linux, Mac OS X)
1743        (ifindex, test_interface) = socket.if_nameindex()[0]
1744        ((*_, sockaddr),) = socket.getaddrinfo(
1745            'ff02::1de:c0:face:8D%' + test_interface,
1746            1234, socket.AF_INET6,
1747            socket.SOCK_DGRAM,
1748            socket.IPPROTO_UDP
1749        )
1750        # Note missing interface name part in IPv6 address
1751        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1752
1753    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1754    @unittest.skipUnless(
1755        sys.platform == 'win32',
1756        'Numeric scope id does not work or undocumented')
1757    def test_getaddrinfo_ipv6_scopeid_numeric(self):
1758        # Also works on Linux and Mac OS X, but is not documented (?)
1759        # Windows, Linux and Max OS X allow nonexistent interface numbers here.
1760        ifindex = 42
1761        ((*_, sockaddr),) = socket.getaddrinfo(
1762            'ff02::1de:c0:face:8D%' + str(ifindex),
1763            1234, socket.AF_INET6,
1764            socket.SOCK_DGRAM,
1765            socket.IPPROTO_UDP
1766        )
1767        # Note missing interface name part in IPv6 address
1768        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1769
1770    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1771    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1772    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1773    def test_getnameinfo_ipv6_scopeid_symbolic(self):
1774        # Just pick up any network interface.
1775        (ifindex, test_interface) = socket.if_nameindex()[0]
1776        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1777        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1778        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
1779
1780    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1781    @unittest.skipUnless( sys.platform == 'win32',
1782        'Numeric scope id does not work or undocumented')
1783    def test_getnameinfo_ipv6_scopeid_numeric(self):
1784        # Also works on Linux (undocumented), but does not work on Mac OS X
1785        # Windows and Linux allow nonexistent interface numbers here.
1786        ifindex = 42
1787        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1788        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1789        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234'))
1790
1791    def test_str_for_enums(self):
1792        # Make sure that the AF_* and SOCK_* constants have enum-like string
1793        # reprs.
1794        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1795            self.assertEqual(str(s.family), 'AddressFamily.AF_INET')
1796            self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM')
1797
1798    def test_socket_consistent_sock_type(self):
1799        SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
1800        SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0)
1801        sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC
1802
1803        with socket.socket(socket.AF_INET, sock_type) as s:
1804            self.assertEqual(s.type, socket.SOCK_STREAM)
1805            s.settimeout(1)
1806            self.assertEqual(s.type, socket.SOCK_STREAM)
1807            s.settimeout(0)
1808            self.assertEqual(s.type, socket.SOCK_STREAM)
1809            s.setblocking(True)
1810            self.assertEqual(s.type, socket.SOCK_STREAM)
1811            s.setblocking(False)
1812            self.assertEqual(s.type, socket.SOCK_STREAM)
1813
1814    def test_unknown_socket_family_repr(self):
1815        # Test that when created with a family that's not one of the known
1816        # AF_*/SOCK_* constants, socket.family just returns the number.
1817        #
1818        # To do this we fool socket.socket into believing it already has an
1819        # open fd because on this path it doesn't actually verify the family and
1820        # type and populates the socket object.
1821        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1822        fd = sock.detach()
1823        unknown_family = max(socket.AddressFamily.__members__.values()) + 1
1824
1825        unknown_type = max(
1826            kind
1827            for name, kind in socket.SocketKind.__members__.items()
1828            if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'}
1829        ) + 1
1830
1831        with socket.socket(
1832                family=unknown_family, type=unknown_type, proto=23,
1833                fileno=fd) as s:
1834            self.assertEqual(s.family, unknown_family)
1835            self.assertEqual(s.type, unknown_type)
1836            # some OS like macOS ignore proto
1837            self.assertIn(s.proto, {0, 23})
1838
1839    @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1840    def test__sendfile_use_sendfile(self):
1841        class File:
1842            def __init__(self, fd):
1843                self.fd = fd
1844
1845            def fileno(self):
1846                return self.fd
1847        with socket.socket() as sock:
1848            fd = os.open(os.curdir, os.O_RDONLY)
1849            os.close(fd)
1850            with self.assertRaises(socket._GiveupOnSendfile):
1851                sock._sendfile_use_sendfile(File(fd))
1852            with self.assertRaises(OverflowError):
1853                sock._sendfile_use_sendfile(File(2**1000))
1854            with self.assertRaises(TypeError):
1855                sock._sendfile_use_sendfile(File(None))
1856
1857    def _test_socket_fileno(self, s, family, stype):
1858        self.assertEqual(s.family, family)
1859        self.assertEqual(s.type, stype)
1860
1861        fd = s.fileno()
1862        s2 = socket.socket(fileno=fd)
1863        self.addCleanup(s2.close)
1864        # detach old fd to avoid double close
1865        s.detach()
1866        self.assertEqual(s2.family, family)
1867        self.assertEqual(s2.type, stype)
1868        self.assertEqual(s2.fileno(), fd)
1869
1870    def test_socket_fileno(self):
1871        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1872        self.addCleanup(s.close)
1873        s.bind((socket_helper.HOST, 0))
1874        self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
1875
1876        if hasattr(socket, "SOCK_DGRAM"):
1877            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1878            self.addCleanup(s.close)
1879            s.bind((socket_helper.HOST, 0))
1880            self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
1881
1882        if socket_helper.IPV6_ENABLED:
1883            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1884            self.addCleanup(s.close)
1885            s.bind((socket_helper.HOSTv6, 0, 0, 0))
1886            self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
1887
1888        if hasattr(socket, "AF_UNIX"):
1889            tmpdir = tempfile.mkdtemp()
1890            self.addCleanup(shutil.rmtree, tmpdir)
1891            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1892            self.addCleanup(s.close)
1893            try:
1894                s.bind(os.path.join(tmpdir, 'socket'))
1895            except PermissionError:
1896                pass
1897            else:
1898                self._test_socket_fileno(s, socket.AF_UNIX,
1899                                         socket.SOCK_STREAM)
1900
1901    def test_socket_fileno_rejects_float(self):
1902        with self.assertRaisesRegex(TypeError, "integer argument expected"):
1903            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5)
1904
1905    def test_socket_fileno_rejects_other_types(self):
1906        with self.assertRaisesRegex(TypeError, "integer is required"):
1907            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo")
1908
1909    def test_socket_fileno_rejects_invalid_socket(self):
1910        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1911            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1)
1912
1913    @unittest.skipIf(os.name == "nt", "Windows disallows -1 only")
1914    def test_socket_fileno_rejects_negative(self):
1915        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1916            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42)
1917
1918    def test_socket_fileno_requires_valid_fd(self):
1919        WSAENOTSOCK = 10038
1920        with self.assertRaises(OSError) as cm:
1921            socket.socket(fileno=support.make_bad_fd())
1922        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1923
1924        with self.assertRaises(OSError) as cm:
1925            socket.socket(
1926                socket.AF_INET,
1927                socket.SOCK_STREAM,
1928                fileno=support.make_bad_fd())
1929        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1930
1931    def test_socket_fileno_requires_socket_fd(self):
1932        with tempfile.NamedTemporaryFile() as afile:
1933            with self.assertRaises(OSError):
1934                socket.socket(fileno=afile.fileno())
1935
1936            with self.assertRaises(OSError) as cm:
1937                socket.socket(
1938                    socket.AF_INET,
1939                    socket.SOCK_STREAM,
1940                    fileno=afile.fileno())
1941            self.assertEqual(cm.exception.errno, errno.ENOTSOCK)
1942
1943
1944@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1945class BasicCANTest(unittest.TestCase):
1946
1947    def testCrucialConstants(self):
1948        socket.AF_CAN
1949        socket.PF_CAN
1950        socket.CAN_RAW
1951
1952    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1953                         'socket.CAN_BCM required for this test.')
1954    def testBCMConstants(self):
1955        socket.CAN_BCM
1956
1957        # opcodes
1958        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task
1959        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task
1960        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task
1961        socket.CAN_BCM_TX_SEND      # send one CAN frame
1962        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription
1963        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription
1964        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription
1965        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request
1966        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0)
1967        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request
1968        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent
1969        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change)
1970
1971        # flags
1972        socket.CAN_BCM_SETTIMER
1973        socket.CAN_BCM_STARTTIMER
1974        socket.CAN_BCM_TX_COUNTEVT
1975        socket.CAN_BCM_TX_ANNOUNCE
1976        socket.CAN_BCM_TX_CP_CAN_ID
1977        socket.CAN_BCM_RX_FILTER_ID
1978        socket.CAN_BCM_RX_CHECK_DLC
1979        socket.CAN_BCM_RX_NO_AUTOTIMER
1980        socket.CAN_BCM_RX_ANNOUNCE_RESUME
1981        socket.CAN_BCM_TX_RESET_MULTI_IDX
1982        socket.CAN_BCM_RX_RTR_FRAME
1983
1984    def testCreateSocket(self):
1985        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1986            pass
1987
1988    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1989                         'socket.CAN_BCM required for this test.')
1990    def testCreateBCMSocket(self):
1991        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
1992            pass
1993
1994    def testBindAny(self):
1995        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1996            address = ('', )
1997            s.bind(address)
1998            self.assertEqual(s.getsockname(), address)
1999
2000    def testTooLongInterfaceName(self):
2001        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2002        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2003            self.assertRaisesRegex(OSError, 'interface name too long',
2004                                   s.bind, ('x' * 1024,))
2005
2006    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
2007                         'socket.CAN_RAW_LOOPBACK required for this test.')
2008    def testLoopback(self):
2009        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2010            for loopback in (0, 1):
2011                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
2012                             loopback)
2013                self.assertEqual(loopback,
2014                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
2015
2016    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
2017                         'socket.CAN_RAW_FILTER required for this test.')
2018    def testFilter(self):
2019        can_id, can_mask = 0x200, 0x700
2020        can_filter = struct.pack("=II", can_id, can_mask)
2021        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2022            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
2023            self.assertEqual(can_filter,
2024                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
2025            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
2026
2027
2028@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
2029class CANTest(ThreadedCANSocketTest):
2030
2031    def __init__(self, methodName='runTest'):
2032        ThreadedCANSocketTest.__init__(self, methodName=methodName)
2033
2034    @classmethod
2035    def build_can_frame(cls, can_id, data):
2036        """Build a CAN frame."""
2037        can_dlc = len(data)
2038        data = data.ljust(8, b'\x00')
2039        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
2040
2041    @classmethod
2042    def dissect_can_frame(cls, frame):
2043        """Dissect a CAN frame."""
2044        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
2045        return (can_id, can_dlc, data[:can_dlc])
2046
2047    def testSendFrame(self):
2048        cf, addr = self.s.recvfrom(self.bufsize)
2049        self.assertEqual(self.cf, cf)
2050        self.assertEqual(addr[0], self.interface)
2051        self.assertEqual(addr[1], socket.AF_CAN)
2052
2053    def _testSendFrame(self):
2054        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
2055        self.cli.send(self.cf)
2056
2057    def testSendMaxFrame(self):
2058        cf, addr = self.s.recvfrom(self.bufsize)
2059        self.assertEqual(self.cf, cf)
2060
2061    def _testSendMaxFrame(self):
2062        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
2063        self.cli.send(self.cf)
2064
2065    def testSendMultiFrames(self):
2066        cf, addr = self.s.recvfrom(self.bufsize)
2067        self.assertEqual(self.cf1, cf)
2068
2069        cf, addr = self.s.recvfrom(self.bufsize)
2070        self.assertEqual(self.cf2, cf)
2071
2072    def _testSendMultiFrames(self):
2073        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
2074        self.cli.send(self.cf1)
2075
2076        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
2077        self.cli.send(self.cf2)
2078
2079    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2080                         'socket.CAN_BCM required for this test.')
2081    def _testBCM(self):
2082        cf, addr = self.cli.recvfrom(self.bufsize)
2083        self.assertEqual(self.cf, cf)
2084        can_id, can_dlc, data = self.dissect_can_frame(cf)
2085        self.assertEqual(self.can_id, can_id)
2086        self.assertEqual(self.data, data)
2087
2088    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2089                         'socket.CAN_BCM required for this test.')
2090    def testBCM(self):
2091        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
2092        self.addCleanup(bcm.close)
2093        bcm.connect((self.interface,))
2094        self.can_id = 0x123
2095        self.data = bytes([0xc0, 0xff, 0xee])
2096        self.cf = self.build_can_frame(self.can_id, self.data)
2097        opcode = socket.CAN_BCM_TX_SEND
2098        flags = 0
2099        count = 0
2100        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
2101        bcm_can_id = 0x0222
2102        nframes = 1
2103        assert len(self.cf) == 16
2104        header = struct.pack(self.bcm_cmd_msg_fmt,
2105                    opcode,
2106                    flags,
2107                    count,
2108                    ival1_seconds,
2109                    ival1_usec,
2110                    ival2_seconds,
2111                    ival2_usec,
2112                    bcm_can_id,
2113                    nframes,
2114                    )
2115        header_plus_frame = header + self.cf
2116        bytes_sent = bcm.send(header_plus_frame)
2117        self.assertEqual(bytes_sent, len(header_plus_frame))
2118
2119
2120@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
2121class ISOTPTest(unittest.TestCase):
2122
2123    def __init__(self, *args, **kwargs):
2124        super().__init__(*args, **kwargs)
2125        self.interface = "vcan0"
2126
2127    def testCrucialConstants(self):
2128        socket.AF_CAN
2129        socket.PF_CAN
2130        socket.CAN_ISOTP
2131        socket.SOCK_DGRAM
2132
2133    def testCreateSocket(self):
2134        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2135            pass
2136
2137    @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
2138                         'socket.CAN_ISOTP required for this test.')
2139    def testCreateISOTPSocket(self):
2140        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2141            pass
2142
2143    def testTooLongInterfaceName(self):
2144        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2145        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2146            with self.assertRaisesRegex(OSError, 'interface name too long'):
2147                s.bind(('x' * 1024, 1, 2))
2148
2149    def testBind(self):
2150        try:
2151            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2152                addr = self.interface, 0x123, 0x456
2153                s.bind(addr)
2154                self.assertEqual(s.getsockname(), addr)
2155        except OSError as e:
2156            if e.errno == errno.ENODEV:
2157                self.skipTest('network interface `%s` does not exist' %
2158                           self.interface)
2159            else:
2160                raise
2161
2162
2163@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.')
2164class J1939Test(unittest.TestCase):
2165
2166    def __init__(self, *args, **kwargs):
2167        super().__init__(*args, **kwargs)
2168        self.interface = "vcan0"
2169
2170    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2171                         'socket.CAN_J1939 required for this test.')
2172    def testJ1939Constants(self):
2173        socket.CAN_J1939
2174
2175        socket.J1939_MAX_UNICAST_ADDR
2176        socket.J1939_IDLE_ADDR
2177        socket.J1939_NO_ADDR
2178        socket.J1939_NO_NAME
2179        socket.J1939_PGN_REQUEST
2180        socket.J1939_PGN_ADDRESS_CLAIMED
2181        socket.J1939_PGN_ADDRESS_COMMANDED
2182        socket.J1939_PGN_PDU1_MAX
2183        socket.J1939_PGN_MAX
2184        socket.J1939_NO_PGN
2185
2186        # J1939 socket options
2187        socket.SO_J1939_FILTER
2188        socket.SO_J1939_PROMISC
2189        socket.SO_J1939_SEND_PRIO
2190        socket.SO_J1939_ERRQUEUE
2191
2192        socket.SCM_J1939_DEST_ADDR
2193        socket.SCM_J1939_DEST_NAME
2194        socket.SCM_J1939_PRIO
2195        socket.SCM_J1939_ERRQUEUE
2196
2197        socket.J1939_NLA_PAD
2198        socket.J1939_NLA_BYTES_ACKED
2199
2200        socket.J1939_EE_INFO_NONE
2201        socket.J1939_EE_INFO_TX_ABORT
2202
2203        socket.J1939_FILTER_MAX
2204
2205    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2206                         'socket.CAN_J1939 required for this test.')
2207    def testCreateJ1939Socket(self):
2208        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2209            pass
2210
2211    def testBind(self):
2212        try:
2213            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2214                addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR
2215                s.bind(addr)
2216                self.assertEqual(s.getsockname(), addr)
2217        except OSError as e:
2218            if e.errno == errno.ENODEV:
2219                self.skipTest('network interface `%s` does not exist' %
2220                           self.interface)
2221            else:
2222                raise
2223
2224
2225@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2226class BasicRDSTest(unittest.TestCase):
2227
2228    def testCrucialConstants(self):
2229        socket.AF_RDS
2230        socket.PF_RDS
2231
2232    def testCreateSocket(self):
2233        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2234            pass
2235
2236    def testSocketBufferSize(self):
2237        bufsize = 16384
2238        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2239            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
2240            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
2241
2242
2243@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2244class RDSTest(ThreadedRDSSocketTest):
2245
2246    def __init__(self, methodName='runTest'):
2247        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
2248
2249    def setUp(self):
2250        super().setUp()
2251        self.evt = threading.Event()
2252
2253    def testSendAndRecv(self):
2254        data, addr = self.serv.recvfrom(self.bufsize)
2255        self.assertEqual(self.data, data)
2256        self.assertEqual(self.cli_addr, addr)
2257
2258    def _testSendAndRecv(self):
2259        self.data = b'spam'
2260        self.cli.sendto(self.data, 0, (HOST, self.port))
2261
2262    def testPeek(self):
2263        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
2264        self.assertEqual(self.data, data)
2265        data, addr = self.serv.recvfrom(self.bufsize)
2266        self.assertEqual(self.data, data)
2267
2268    def _testPeek(self):
2269        self.data = b'spam'
2270        self.cli.sendto(self.data, 0, (HOST, self.port))
2271
2272    @requireAttrs(socket.socket, 'recvmsg')
2273    def testSendAndRecvMsg(self):
2274        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
2275        self.assertEqual(self.data, data)
2276
2277    @requireAttrs(socket.socket, 'sendmsg')
2278    def _testSendAndRecvMsg(self):
2279        self.data = b'hello ' * 10
2280        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
2281
2282    def testSendAndRecvMulti(self):
2283        data, addr = self.serv.recvfrom(self.bufsize)
2284        self.assertEqual(self.data1, data)
2285
2286        data, addr = self.serv.recvfrom(self.bufsize)
2287        self.assertEqual(self.data2, data)
2288
2289    def _testSendAndRecvMulti(self):
2290        self.data1 = b'bacon'
2291        self.cli.sendto(self.data1, 0, (HOST, self.port))
2292
2293        self.data2 = b'egg'
2294        self.cli.sendto(self.data2, 0, (HOST, self.port))
2295
2296    def testSelect(self):
2297        r, w, x = select.select([self.serv], [], [], 3.0)
2298        self.assertIn(self.serv, r)
2299        data, addr = self.serv.recvfrom(self.bufsize)
2300        self.assertEqual(self.data, data)
2301
2302    def _testSelect(self):
2303        self.data = b'select'
2304        self.cli.sendto(self.data, 0, (HOST, self.port))
2305
2306@unittest.skipUnless(HAVE_SOCKET_QIPCRTR,
2307          'QIPCRTR sockets required for this test.')
2308class BasicQIPCRTRTest(unittest.TestCase):
2309
2310    def testCrucialConstants(self):
2311        socket.AF_QIPCRTR
2312
2313    def testCreateSocket(self):
2314        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2315            pass
2316
2317    def testUnbound(self):
2318        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2319            self.assertEqual(s.getsockname()[1], 0)
2320
2321    def testBindSock(self):
2322        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2323            socket_helper.bind_port(s, host=s.getsockname()[0])
2324            self.assertNotEqual(s.getsockname()[1], 0)
2325
2326    def testInvalidBindSock(self):
2327        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2328            self.assertRaises(OSError, socket_helper.bind_port, s, host=-2)
2329
2330    def testAutoBindSock(self):
2331        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2332            s.connect((123, 123))
2333            self.assertNotEqual(s.getsockname()[1], 0)
2334
2335@unittest.skipIf(fcntl is None, "need fcntl")
2336@unittest.skipUnless(HAVE_SOCKET_VSOCK,
2337          'VSOCK sockets required for this test.')
2338class BasicVSOCKTest(unittest.TestCase):
2339
2340    def testCrucialConstants(self):
2341        socket.AF_VSOCK
2342
2343    def testVSOCKConstants(self):
2344        socket.SO_VM_SOCKETS_BUFFER_SIZE
2345        socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE
2346        socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE
2347        socket.VMADDR_CID_ANY
2348        socket.VMADDR_PORT_ANY
2349        socket.VMADDR_CID_HOST
2350        socket.VM_SOCKETS_INVALID_VERSION
2351        socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID
2352
2353    def testCreateSocket(self):
2354        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2355            pass
2356
2357    def testSocketBufferSize(self):
2358        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2359            orig_max = s.getsockopt(socket.AF_VSOCK,
2360                                    socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)
2361            orig = s.getsockopt(socket.AF_VSOCK,
2362                                socket.SO_VM_SOCKETS_BUFFER_SIZE)
2363            orig_min = s.getsockopt(socket.AF_VSOCK,
2364                                    socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)
2365
2366            s.setsockopt(socket.AF_VSOCK,
2367                         socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2)
2368            s.setsockopt(socket.AF_VSOCK,
2369                         socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2)
2370            s.setsockopt(socket.AF_VSOCK,
2371                         socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2)
2372
2373            self.assertEqual(orig_max * 2,
2374                             s.getsockopt(socket.AF_VSOCK,
2375                             socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE))
2376            self.assertEqual(orig * 2,
2377                             s.getsockopt(socket.AF_VSOCK,
2378                             socket.SO_VM_SOCKETS_BUFFER_SIZE))
2379            self.assertEqual(orig_min * 2,
2380                             s.getsockopt(socket.AF_VSOCK,
2381                             socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE))
2382
2383
2384@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH,
2385                     'Bluetooth sockets required for this test.')
2386class BasicBluetoothTest(unittest.TestCase):
2387
2388    def testBluetoothConstants(self):
2389        socket.BDADDR_ANY
2390        socket.BDADDR_LOCAL
2391        socket.AF_BLUETOOTH
2392        socket.BTPROTO_RFCOMM
2393
2394        if sys.platform != "win32":
2395            socket.BTPROTO_HCI
2396            socket.SOL_HCI
2397            socket.BTPROTO_L2CAP
2398
2399            if not sys.platform.startswith("freebsd"):
2400                socket.BTPROTO_SCO
2401
2402    def testCreateRfcommSocket(self):
2403        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
2404            pass
2405
2406    @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets")
2407    def testCreateL2capSocket(self):
2408        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s:
2409            pass
2410
2411    @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets")
2412    def testCreateHciSocket(self):
2413        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
2414            pass
2415
2416    @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"),
2417                     "windows and freebsd do not support SCO sockets")
2418    def testCreateScoSocket(self):
2419        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
2420            pass
2421
2422
2423class BasicTCPTest(SocketConnectedTest):
2424
2425    def __init__(self, methodName='runTest'):
2426        SocketConnectedTest.__init__(self, methodName=methodName)
2427
2428    def testRecv(self):
2429        # Testing large receive over TCP
2430        msg = self.cli_conn.recv(1024)
2431        self.assertEqual(msg, MSG)
2432
2433    def _testRecv(self):
2434        self.serv_conn.send(MSG)
2435
2436    def testOverFlowRecv(self):
2437        # Testing receive in chunks over TCP
2438        seg1 = self.cli_conn.recv(len(MSG) - 3)
2439        seg2 = self.cli_conn.recv(1024)
2440        msg = seg1 + seg2
2441        self.assertEqual(msg, MSG)
2442
2443    def _testOverFlowRecv(self):
2444        self.serv_conn.send(MSG)
2445
2446    def testRecvFrom(self):
2447        # Testing large recvfrom() over TCP
2448        msg, addr = self.cli_conn.recvfrom(1024)
2449        self.assertEqual(msg, MSG)
2450
2451    def _testRecvFrom(self):
2452        self.serv_conn.send(MSG)
2453
2454    def testOverFlowRecvFrom(self):
2455        # Testing recvfrom() in chunks over TCP
2456        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
2457        seg2, addr = self.cli_conn.recvfrom(1024)
2458        msg = seg1 + seg2
2459        self.assertEqual(msg, MSG)
2460
2461    def _testOverFlowRecvFrom(self):
2462        self.serv_conn.send(MSG)
2463
2464    def testSendAll(self):
2465        # Testing sendall() with a 2048 byte string over TCP
2466        msg = b''
2467        while 1:
2468            read = self.cli_conn.recv(1024)
2469            if not read:
2470                break
2471            msg += read
2472        self.assertEqual(msg, b'f' * 2048)
2473
2474    def _testSendAll(self):
2475        big_chunk = b'f' * 2048
2476        self.serv_conn.sendall(big_chunk)
2477
2478    def testFromFd(self):
2479        # Testing fromfd()
2480        fd = self.cli_conn.fileno()
2481        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
2482        self.addCleanup(sock.close)
2483        self.assertIsInstance(sock, socket.socket)
2484        msg = sock.recv(1024)
2485        self.assertEqual(msg, MSG)
2486
2487    def _testFromFd(self):
2488        self.serv_conn.send(MSG)
2489
2490    def testDup(self):
2491        # Testing dup()
2492        sock = self.cli_conn.dup()
2493        self.addCleanup(sock.close)
2494        msg = sock.recv(1024)
2495        self.assertEqual(msg, MSG)
2496
2497    def _testDup(self):
2498        self.serv_conn.send(MSG)
2499
2500    def testShutdown(self):
2501        # Testing shutdown()
2502        msg = self.cli_conn.recv(1024)
2503        self.assertEqual(msg, MSG)
2504        # wait for _testShutdown to finish: on OS X, when the server
2505        # closes the connection the client also becomes disconnected,
2506        # and the client's shutdown call will fail. (Issue #4397.)
2507        self.done.wait()
2508
2509    def _testShutdown(self):
2510        self.serv_conn.send(MSG)
2511        self.serv_conn.shutdown(2)
2512
2513    testShutdown_overflow = support.cpython_only(testShutdown)
2514
2515    @support.cpython_only
2516    def _testShutdown_overflow(self):
2517        import _testcapi
2518        self.serv_conn.send(MSG)
2519        # Issue 15989
2520        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2521                          _testcapi.INT_MAX + 1)
2522        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2523                          2 + (_testcapi.UINT_MAX + 1))
2524        self.serv_conn.shutdown(2)
2525
2526    def testDetach(self):
2527        # Testing detach()
2528        fileno = self.cli_conn.fileno()
2529        f = self.cli_conn.detach()
2530        self.assertEqual(f, fileno)
2531        # cli_conn cannot be used anymore...
2532        self.assertTrue(self.cli_conn._closed)
2533        self.assertRaises(OSError, self.cli_conn.recv, 1024)
2534        self.cli_conn.close()
2535        # ...but we can create another socket using the (still open)
2536        # file descriptor
2537        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
2538        self.addCleanup(sock.close)
2539        msg = sock.recv(1024)
2540        self.assertEqual(msg, MSG)
2541
2542    def _testDetach(self):
2543        self.serv_conn.send(MSG)
2544
2545
2546class BasicUDPTest(ThreadedUDPSocketTest):
2547
2548    def __init__(self, methodName='runTest'):
2549        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
2550
2551    def testSendtoAndRecv(self):
2552        # Testing sendto() and Recv() over UDP
2553        msg = self.serv.recv(len(MSG))
2554        self.assertEqual(msg, MSG)
2555
2556    def _testSendtoAndRecv(self):
2557        self.cli.sendto(MSG, 0, (HOST, self.port))
2558
2559    def testRecvFrom(self):
2560        # Testing recvfrom() over UDP
2561        msg, addr = self.serv.recvfrom(len(MSG))
2562        self.assertEqual(msg, MSG)
2563
2564    def _testRecvFrom(self):
2565        self.cli.sendto(MSG, 0, (HOST, self.port))
2566
2567    def testRecvFromNegative(self):
2568        # Negative lengths passed to recvfrom should give ValueError.
2569        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2570
2571    def _testRecvFromNegative(self):
2572        self.cli.sendto(MSG, 0, (HOST, self.port))
2573
2574
2575@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
2576          'UDPLITE sockets required for this test.')
2577class BasicUDPLITETest(ThreadedUDPLITESocketTest):
2578
2579    def __init__(self, methodName='runTest'):
2580        ThreadedUDPLITESocketTest.__init__(self, methodName=methodName)
2581
2582    def testSendtoAndRecv(self):
2583        # Testing sendto() and Recv() over UDPLITE
2584        msg = self.serv.recv(len(MSG))
2585        self.assertEqual(msg, MSG)
2586
2587    def _testSendtoAndRecv(self):
2588        self.cli.sendto(MSG, 0, (HOST, self.port))
2589
2590    def testRecvFrom(self):
2591        # Testing recvfrom() over UDPLITE
2592        msg, addr = self.serv.recvfrom(len(MSG))
2593        self.assertEqual(msg, MSG)
2594
2595    def _testRecvFrom(self):
2596        self.cli.sendto(MSG, 0, (HOST, self.port))
2597
2598    def testRecvFromNegative(self):
2599        # Negative lengths passed to recvfrom should give ValueError.
2600        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2601
2602    def _testRecvFromNegative(self):
2603        self.cli.sendto(MSG, 0, (HOST, self.port))
2604
2605# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
2606# same test code is used with different families and types of socket
2607# (e.g. stream, datagram), and tests using recvmsg() are repeated
2608# using recvmsg_into().
2609#
2610# The generic test classes such as SendmsgTests and
2611# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
2612# supplied with sockets cli_sock and serv_sock representing the
2613# client's and the server's end of the connection respectively, and
2614# attributes cli_addr and serv_addr holding their (numeric where
2615# appropriate) addresses.
2616#
2617# The final concrete test classes combine these with subclasses of
2618# SocketTestBase which set up client and server sockets of a specific
2619# type, and with subclasses of SendrecvmsgBase such as
2620# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
2621# sockets to cli_sock and serv_sock and override the methods and
2622# attributes of SendrecvmsgBase to fill in destination addresses if
2623# needed when sending, check for specific flags in msg_flags, etc.
2624#
2625# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
2626# recvmsg_into().
2627
2628# XXX: like the other datagram (UDP) tests in this module, the code
2629# here assumes that datagram delivery on the local machine will be
2630# reliable.
2631
2632class SendrecvmsgBase(ThreadSafeCleanupTestCase):
2633    # Base class for sendmsg()/recvmsg() tests.
2634
2635    # Time in seconds to wait before considering a test failed, or
2636    # None for no timeout.  Not all tests actually set a timeout.
2637    fail_timeout = support.LOOPBACK_TIMEOUT
2638
2639    def setUp(self):
2640        self.misc_event = threading.Event()
2641        super().setUp()
2642
2643    def sendToServer(self, msg):
2644        # Send msg to the server.
2645        return self.cli_sock.send(msg)
2646
2647    # Tuple of alternative default arguments for sendmsg() when called
2648    # via sendmsgToServer() (e.g. to include a destination address).
2649    sendmsg_to_server_defaults = ()
2650
2651    def sendmsgToServer(self, *args):
2652        # Call sendmsg() on self.cli_sock with the given arguments,
2653        # filling in any arguments which are not supplied with the
2654        # corresponding items of self.sendmsg_to_server_defaults, if
2655        # any.
2656        return self.cli_sock.sendmsg(
2657            *(args + self.sendmsg_to_server_defaults[len(args):]))
2658
2659    def doRecvmsg(self, sock, bufsize, *args):
2660        # Call recvmsg() on sock with given arguments and return its
2661        # result.  Should be used for tests which can use either
2662        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
2663        # this method with one which emulates it using recvmsg_into(),
2664        # thus allowing the same test to be used for both methods.
2665        result = sock.recvmsg(bufsize, *args)
2666        self.registerRecvmsgResult(result)
2667        return result
2668
2669    def registerRecvmsgResult(self, result):
2670        # Called by doRecvmsg() with the return value of recvmsg() or
2671        # recvmsg_into().  Can be overridden to arrange cleanup based
2672        # on the returned ancillary data, for instance.
2673        pass
2674
2675    def checkRecvmsgAddress(self, addr1, addr2):
2676        # Called to compare the received address with the address of
2677        # the peer.
2678        self.assertEqual(addr1, addr2)
2679
2680    # Flags that are normally unset in msg_flags
2681    msg_flags_common_unset = 0
2682    for name in ("MSG_CTRUNC", "MSG_OOB"):
2683        msg_flags_common_unset |= getattr(socket, name, 0)
2684
2685    # Flags that are normally set
2686    msg_flags_common_set = 0
2687
2688    # Flags set when a complete record has been received (e.g. MSG_EOR
2689    # for SCTP)
2690    msg_flags_eor_indicator = 0
2691
2692    # Flags set when a complete record has not been received
2693    # (e.g. MSG_TRUNC for datagram sockets)
2694    msg_flags_non_eor_indicator = 0
2695
2696    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2697        # Method to check the value of msg_flags returned by recvmsg[_into]().
2698        #
2699        # Checks that all bits in msg_flags_common_set attribute are
2700        # set in "flags" and all bits in msg_flags_common_unset are
2701        # unset.
2702        #
2703        # The "eor" argument specifies whether the flags should
2704        # indicate that a full record (or datagram) has been received.
2705        # If "eor" is None, no checks are done; otherwise, checks
2706        # that:
2707        #
2708        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
2709        #    set and all bits in msg_flags_non_eor_indicator are unset
2710        #
2711        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
2712        #    are set and all bits in msg_flags_eor_indicator are unset
2713        #
2714        # If "checkset" and/or "checkunset" are supplied, they require
2715        # the given bits to be set or unset respectively, overriding
2716        # what the attributes require for those bits.
2717        #
2718        # If any bits are set in "ignore", they will not be checked,
2719        # regardless of the other inputs.
2720        #
2721        # Will raise Exception if the inputs require a bit to be both
2722        # set and unset, and it is not ignored.
2723
2724        defaultset = self.msg_flags_common_set
2725        defaultunset = self.msg_flags_common_unset
2726
2727        if eor:
2728            defaultset |= self.msg_flags_eor_indicator
2729            defaultunset |= self.msg_flags_non_eor_indicator
2730        elif eor is not None:
2731            defaultset |= self.msg_flags_non_eor_indicator
2732            defaultunset |= self.msg_flags_eor_indicator
2733
2734        # Function arguments override defaults
2735        defaultset &= ~checkunset
2736        defaultunset &= ~checkset
2737
2738        # Merge arguments with remaining defaults, and check for conflicts
2739        checkset |= defaultset
2740        checkunset |= defaultunset
2741        inboth = checkset & checkunset & ~ignore
2742        if inboth:
2743            raise Exception("contradictory set, unset requirements for flags "
2744                            "{0:#x}".format(inboth))
2745
2746        # Compare with given msg_flags value
2747        mask = (checkset | checkunset) & ~ignore
2748        self.assertEqual(flags & mask, checkset & mask)
2749
2750
2751class RecvmsgIntoMixin(SendrecvmsgBase):
2752    # Mixin to implement doRecvmsg() using recvmsg_into().
2753
2754    def doRecvmsg(self, sock, bufsize, *args):
2755        buf = bytearray(bufsize)
2756        result = sock.recvmsg_into([buf], *args)
2757        self.registerRecvmsgResult(result)
2758        self.assertGreaterEqual(result[0], 0)
2759        self.assertLessEqual(result[0], bufsize)
2760        return (bytes(buf[:result[0]]),) + result[1:]
2761
2762
2763class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2764    # Defines flags to be checked in msg_flags for datagram sockets.
2765
2766    @property
2767    def msg_flags_non_eor_indicator(self):
2768        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2769
2770
2771class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2772    # Defines flags to be checked in msg_flags for SCTP sockets.
2773
2774    @property
2775    def msg_flags_eor_indicator(self):
2776        return super().msg_flags_eor_indicator | socket.MSG_EOR
2777
2778
2779class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2780    # Base class for tests on connectionless-mode sockets.  Users must
2781    # supply sockets on attributes cli and serv to be mapped to
2782    # cli_sock and serv_sock respectively.
2783
2784    @property
2785    def serv_sock(self):
2786        return self.serv
2787
2788    @property
2789    def cli_sock(self):
2790        return self.cli
2791
2792    @property
2793    def sendmsg_to_server_defaults(self):
2794        return ([], [], 0, self.serv_addr)
2795
2796    def sendToServer(self, msg):
2797        return self.cli_sock.sendto(msg, self.serv_addr)
2798
2799
2800class SendrecvmsgConnectedBase(SendrecvmsgBase):
2801    # Base class for tests on connected sockets.  Users must supply
2802    # sockets on attributes serv_conn and cli_conn (representing the
2803    # connections *to* the server and the client), to be mapped to
2804    # cli_sock and serv_sock respectively.
2805
2806    @property
2807    def serv_sock(self):
2808        return self.cli_conn
2809
2810    @property
2811    def cli_sock(self):
2812        return self.serv_conn
2813
2814    def checkRecvmsgAddress(self, addr1, addr2):
2815        # Address is currently "unspecified" for a connected socket,
2816        # so we don't examine it
2817        pass
2818
2819
2820class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2821    # Base class to set a timeout on server's socket.
2822
2823    def setUp(self):
2824        super().setUp()
2825        self.serv_sock.settimeout(self.fail_timeout)
2826
2827
2828class SendmsgTests(SendrecvmsgServerTimeoutBase):
2829    # Tests for sendmsg() which can use any socket type and do not
2830    # involve recvmsg() or recvmsg_into().
2831
2832    def testSendmsg(self):
2833        # Send a simple message with sendmsg().
2834        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2835
2836    def _testSendmsg(self):
2837        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2838
2839    def testSendmsgDataGenerator(self):
2840        # Send from buffer obtained from a generator (not a sequence).
2841        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2842
2843    def _testSendmsgDataGenerator(self):
2844        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2845                         len(MSG))
2846
2847    def testSendmsgAncillaryGenerator(self):
2848        # Gather (empty) ancillary data from a generator.
2849        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2850
2851    def _testSendmsgAncillaryGenerator(self):
2852        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2853                         len(MSG))
2854
2855    def testSendmsgArray(self):
2856        # Send data from an array instead of the usual bytes object.
2857        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2858
2859    def _testSendmsgArray(self):
2860        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2861                         len(MSG))
2862
2863    def testSendmsgGather(self):
2864        # Send message data from more than one buffer (gather write).
2865        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2866
2867    def _testSendmsgGather(self):
2868        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2869
2870    def testSendmsgBadArgs(self):
2871        # Check that sendmsg() rejects invalid arguments.
2872        self.assertEqual(self.serv_sock.recv(1000), b"done")
2873
2874    def _testSendmsgBadArgs(self):
2875        self.assertRaises(TypeError, self.cli_sock.sendmsg)
2876        self.assertRaises(TypeError, self.sendmsgToServer,
2877                          b"not in an iterable")
2878        self.assertRaises(TypeError, self.sendmsgToServer,
2879                          object())
2880        self.assertRaises(TypeError, self.sendmsgToServer,
2881                          [object()])
2882        self.assertRaises(TypeError, self.sendmsgToServer,
2883                          [MSG, object()])
2884        self.assertRaises(TypeError, self.sendmsgToServer,
2885                          [MSG], object())
2886        self.assertRaises(TypeError, self.sendmsgToServer,
2887                          [MSG], [], object())
2888        self.assertRaises(TypeError, self.sendmsgToServer,
2889                          [MSG], [], 0, object())
2890        self.sendToServer(b"done")
2891
2892    def testSendmsgBadCmsg(self):
2893        # Check that invalid ancillary data items are rejected.
2894        self.assertEqual(self.serv_sock.recv(1000), b"done")
2895
2896    def _testSendmsgBadCmsg(self):
2897        self.assertRaises(TypeError, self.sendmsgToServer,
2898                          [MSG], [object()])
2899        self.assertRaises(TypeError, self.sendmsgToServer,
2900                          [MSG], [(object(), 0, b"data")])
2901        self.assertRaises(TypeError, self.sendmsgToServer,
2902                          [MSG], [(0, object(), b"data")])
2903        self.assertRaises(TypeError, self.sendmsgToServer,
2904                          [MSG], [(0, 0, object())])
2905        self.assertRaises(TypeError, self.sendmsgToServer,
2906                          [MSG], [(0, 0)])
2907        self.assertRaises(TypeError, self.sendmsgToServer,
2908                          [MSG], [(0, 0, b"data", 42)])
2909        self.sendToServer(b"done")
2910
2911    @requireAttrs(socket, "CMSG_SPACE")
2912    def testSendmsgBadMultiCmsg(self):
2913        # Check that invalid ancillary data items are rejected when
2914        # more than one item is present.
2915        self.assertEqual(self.serv_sock.recv(1000), b"done")
2916
2917    @testSendmsgBadMultiCmsg.client_skip
2918    def _testSendmsgBadMultiCmsg(self):
2919        self.assertRaises(TypeError, self.sendmsgToServer,
2920                          [MSG], [0, 0, b""])
2921        self.assertRaises(TypeError, self.sendmsgToServer,
2922                          [MSG], [(0, 0, b""), object()])
2923        self.sendToServer(b"done")
2924
2925    def testSendmsgExcessCmsgReject(self):
2926        # Check that sendmsg() rejects excess ancillary data items
2927        # when the number that can be sent is limited.
2928        self.assertEqual(self.serv_sock.recv(1000), b"done")
2929
2930    def _testSendmsgExcessCmsgReject(self):
2931        if not hasattr(socket, "CMSG_SPACE"):
2932            # Can only send one item
2933            with self.assertRaises(OSError) as cm:
2934                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
2935            self.assertIsNone(cm.exception.errno)
2936        self.sendToServer(b"done")
2937
2938    def testSendmsgAfterClose(self):
2939        # Check that sendmsg() fails on a closed socket.
2940        pass
2941
2942    def _testSendmsgAfterClose(self):
2943        self.cli_sock.close()
2944        self.assertRaises(OSError, self.sendmsgToServer, [MSG])
2945
2946
2947class SendmsgStreamTests(SendmsgTests):
2948    # Tests for sendmsg() which require a stream socket and do not
2949    # involve recvmsg() or recvmsg_into().
2950
2951    def testSendmsgExplicitNoneAddr(self):
2952        # Check that peer address can be specified as None.
2953        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2954
2955    def _testSendmsgExplicitNoneAddr(self):
2956        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
2957
2958    def testSendmsgTimeout(self):
2959        # Check that timeout works with sendmsg().
2960        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2961        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2962
2963    def _testSendmsgTimeout(self):
2964        try:
2965            self.cli_sock.settimeout(0.03)
2966            try:
2967                while True:
2968                    self.sendmsgToServer([b"a"*512])
2969            except socket.timeout:
2970                pass
2971            except OSError as exc:
2972                if exc.errno != errno.ENOMEM:
2973                    raise
2974                # bpo-33937 the test randomly fails on Travis CI with
2975                # "OSError: [Errno 12] Cannot allocate memory"
2976            else:
2977                self.fail("socket.timeout not raised")
2978        finally:
2979            self.misc_event.set()
2980
2981    # XXX: would be nice to have more tests for sendmsg flags argument.
2982
2983    # Linux supports MSG_DONTWAIT when sending, but in general, it
2984    # only works when receiving.  Could add other platforms if they
2985    # support it too.
2986    @skipWithClientIf(sys.platform not in {"linux"},
2987                      "MSG_DONTWAIT not known to work on this platform when "
2988                      "sending")
2989    def testSendmsgDontWait(self):
2990        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
2991        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2992        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2993
2994    @testSendmsgDontWait.client_skip
2995    def _testSendmsgDontWait(self):
2996        try:
2997            with self.assertRaises(OSError) as cm:
2998                while True:
2999                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
3000            # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI
3001            # with "OSError: [Errno 12] Cannot allocate memory"
3002            self.assertIn(cm.exception.errno,
3003                          (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM))
3004        finally:
3005            self.misc_event.set()
3006
3007
3008class SendmsgConnectionlessTests(SendmsgTests):
3009    # Tests for sendmsg() which require a connectionless-mode
3010    # (e.g. datagram) socket, and do not involve recvmsg() or
3011    # recvmsg_into().
3012
3013    def testSendmsgNoDestAddr(self):
3014        # Check that sendmsg() fails when no destination address is
3015        # given for unconnected socket.
3016        pass
3017
3018    def _testSendmsgNoDestAddr(self):
3019        self.assertRaises(OSError, self.cli_sock.sendmsg,
3020                          [MSG])
3021        self.assertRaises(OSError, self.cli_sock.sendmsg,
3022                          [MSG], [], 0, None)
3023
3024
3025class RecvmsgGenericTests(SendrecvmsgBase):
3026    # Tests for recvmsg() which can also be emulated using
3027    # recvmsg_into(), and can use any socket type.
3028
3029    def testRecvmsg(self):
3030        # Receive a simple message with recvmsg[_into]().
3031        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3032        self.assertEqual(msg, MSG)
3033        self.checkRecvmsgAddress(addr, self.cli_addr)
3034        self.assertEqual(ancdata, [])
3035        self.checkFlags(flags, eor=True)
3036
3037    def _testRecvmsg(self):
3038        self.sendToServer(MSG)
3039
3040    def testRecvmsgExplicitDefaults(self):
3041        # Test recvmsg[_into]() with default arguments provided explicitly.
3042        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3043                                                   len(MSG), 0, 0)
3044        self.assertEqual(msg, MSG)
3045        self.checkRecvmsgAddress(addr, self.cli_addr)
3046        self.assertEqual(ancdata, [])
3047        self.checkFlags(flags, eor=True)
3048
3049    def _testRecvmsgExplicitDefaults(self):
3050        self.sendToServer(MSG)
3051
3052    def testRecvmsgShorter(self):
3053        # Receive a message smaller than buffer.
3054        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3055                                                   len(MSG) + 42)
3056        self.assertEqual(msg, MSG)
3057        self.checkRecvmsgAddress(addr, self.cli_addr)
3058        self.assertEqual(ancdata, [])
3059        self.checkFlags(flags, eor=True)
3060
3061    def _testRecvmsgShorter(self):
3062        self.sendToServer(MSG)
3063
3064    def testRecvmsgTrunc(self):
3065        # Receive part of message, check for truncation indicators.
3066        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3067                                                   len(MSG) - 3)
3068        self.assertEqual(msg, MSG[:-3])
3069        self.checkRecvmsgAddress(addr, self.cli_addr)
3070        self.assertEqual(ancdata, [])
3071        self.checkFlags(flags, eor=False)
3072
3073    def _testRecvmsgTrunc(self):
3074        self.sendToServer(MSG)
3075
3076    def testRecvmsgShortAncillaryBuf(self):
3077        # Test ancillary data buffer too small to hold any ancillary data.
3078        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3079                                                   len(MSG), 1)
3080        self.assertEqual(msg, MSG)
3081        self.checkRecvmsgAddress(addr, self.cli_addr)
3082        self.assertEqual(ancdata, [])
3083        self.checkFlags(flags, eor=True)
3084
3085    def _testRecvmsgShortAncillaryBuf(self):
3086        self.sendToServer(MSG)
3087
3088    def testRecvmsgLongAncillaryBuf(self):
3089        # Test large ancillary data buffer.
3090        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3091                                                   len(MSG), 10240)
3092        self.assertEqual(msg, MSG)
3093        self.checkRecvmsgAddress(addr, self.cli_addr)
3094        self.assertEqual(ancdata, [])
3095        self.checkFlags(flags, eor=True)
3096
3097    def _testRecvmsgLongAncillaryBuf(self):
3098        self.sendToServer(MSG)
3099
3100    def testRecvmsgAfterClose(self):
3101        # Check that recvmsg[_into]() fails on a closed socket.
3102        self.serv_sock.close()
3103        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
3104
3105    def _testRecvmsgAfterClose(self):
3106        pass
3107
3108    def testRecvmsgTimeout(self):
3109        # Check that timeout works.
3110        try:
3111            self.serv_sock.settimeout(0.03)
3112            self.assertRaises(socket.timeout,
3113                              self.doRecvmsg, self.serv_sock, len(MSG))
3114        finally:
3115            self.misc_event.set()
3116
3117    def _testRecvmsgTimeout(self):
3118        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3119
3120    @requireAttrs(socket, "MSG_PEEK")
3121    def testRecvmsgPeek(self):
3122        # Check that MSG_PEEK in flags enables examination of pending
3123        # data without consuming it.
3124
3125        # Receive part of data with MSG_PEEK.
3126        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3127                                                   len(MSG) - 3, 0,
3128                                                   socket.MSG_PEEK)
3129        self.assertEqual(msg, MSG[:-3])
3130        self.checkRecvmsgAddress(addr, self.cli_addr)
3131        self.assertEqual(ancdata, [])
3132        # Ignoring MSG_TRUNC here (so this test is the same for stream
3133        # and datagram sockets).  Some wording in POSIX seems to
3134        # suggest that it needn't be set when peeking, but that may
3135        # just be a slip.
3136        self.checkFlags(flags, eor=False,
3137                        ignore=getattr(socket, "MSG_TRUNC", 0))
3138
3139        # Receive all data with MSG_PEEK.
3140        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3141                                                   len(MSG), 0,
3142                                                   socket.MSG_PEEK)
3143        self.assertEqual(msg, MSG)
3144        self.checkRecvmsgAddress(addr, self.cli_addr)
3145        self.assertEqual(ancdata, [])
3146        self.checkFlags(flags, eor=True)
3147
3148        # Check that the same data can still be received normally.
3149        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3150        self.assertEqual(msg, MSG)
3151        self.checkRecvmsgAddress(addr, self.cli_addr)
3152        self.assertEqual(ancdata, [])
3153        self.checkFlags(flags, eor=True)
3154
3155    @testRecvmsgPeek.client_skip
3156    def _testRecvmsgPeek(self):
3157        self.sendToServer(MSG)
3158
3159    @requireAttrs(socket.socket, "sendmsg")
3160    def testRecvmsgFromSendmsg(self):
3161        # Test receiving with recvmsg[_into]() when message is sent
3162        # using sendmsg().
3163        self.serv_sock.settimeout(self.fail_timeout)
3164        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3165        self.assertEqual(msg, MSG)
3166        self.checkRecvmsgAddress(addr, self.cli_addr)
3167        self.assertEqual(ancdata, [])
3168        self.checkFlags(flags, eor=True)
3169
3170    @testRecvmsgFromSendmsg.client_skip
3171    def _testRecvmsgFromSendmsg(self):
3172        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
3173
3174
3175class RecvmsgGenericStreamTests(RecvmsgGenericTests):
3176    # Tests which require a stream socket and can use either recvmsg()
3177    # or recvmsg_into().
3178
3179    def testRecvmsgEOF(self):
3180        # Receive end-of-stream indicator (b"", peer socket closed).
3181        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3182        self.assertEqual(msg, b"")
3183        self.checkRecvmsgAddress(addr, self.cli_addr)
3184        self.assertEqual(ancdata, [])
3185        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
3186
3187    def _testRecvmsgEOF(self):
3188        self.cli_sock.close()
3189
3190    def testRecvmsgOverflow(self):
3191        # Receive a message in more than one chunk.
3192        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3193                                                    len(MSG) - 3)
3194        self.checkRecvmsgAddress(addr, self.cli_addr)
3195        self.assertEqual(ancdata, [])
3196        self.checkFlags(flags, eor=False)
3197
3198        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3199        self.checkRecvmsgAddress(addr, self.cli_addr)
3200        self.assertEqual(ancdata, [])
3201        self.checkFlags(flags, eor=True)
3202
3203        msg = seg1 + seg2
3204        self.assertEqual(msg, MSG)
3205
3206    def _testRecvmsgOverflow(self):
3207        self.sendToServer(MSG)
3208
3209
3210class RecvmsgTests(RecvmsgGenericTests):
3211    # Tests for recvmsg() which can use any socket type.
3212
3213    def testRecvmsgBadArgs(self):
3214        # Check that recvmsg() rejects invalid arguments.
3215        self.assertRaises(TypeError, self.serv_sock.recvmsg)
3216        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3217                          -1, 0, 0)
3218        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3219                          len(MSG), -1, 0)
3220        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3221                          [bytearray(10)], 0, 0)
3222        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3223                          object(), 0, 0)
3224        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3225                          len(MSG), object(), 0)
3226        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3227                          len(MSG), 0, object())
3228
3229        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
3230        self.assertEqual(msg, MSG)
3231        self.checkRecvmsgAddress(addr, self.cli_addr)
3232        self.assertEqual(ancdata, [])
3233        self.checkFlags(flags, eor=True)
3234
3235    def _testRecvmsgBadArgs(self):
3236        self.sendToServer(MSG)
3237
3238
3239class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
3240    # Tests for recvmsg_into() which can use any socket type.
3241
3242    def testRecvmsgIntoBadArgs(self):
3243        # Check that recvmsg_into() rejects invalid arguments.
3244        buf = bytearray(len(MSG))
3245        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
3246        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3247                          len(MSG), 0, 0)
3248        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3249                          buf, 0, 0)
3250        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3251                          [object()], 0, 0)
3252        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3253                          [b"I'm not writable"], 0, 0)
3254        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3255                          [buf, object()], 0, 0)
3256        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
3257                          [buf], -1, 0)
3258        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3259                          [buf], object(), 0)
3260        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3261                          [buf], 0, object())
3262
3263        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
3264        self.assertEqual(nbytes, len(MSG))
3265        self.assertEqual(buf, bytearray(MSG))
3266        self.checkRecvmsgAddress(addr, self.cli_addr)
3267        self.assertEqual(ancdata, [])
3268        self.checkFlags(flags, eor=True)
3269
3270    def _testRecvmsgIntoBadArgs(self):
3271        self.sendToServer(MSG)
3272
3273    def testRecvmsgIntoGenerator(self):
3274        # Receive into buffer obtained from a generator (not a sequence).
3275        buf = bytearray(len(MSG))
3276        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3277            (o for o in [buf]))
3278        self.assertEqual(nbytes, len(MSG))
3279        self.assertEqual(buf, bytearray(MSG))
3280        self.checkRecvmsgAddress(addr, self.cli_addr)
3281        self.assertEqual(ancdata, [])
3282        self.checkFlags(flags, eor=True)
3283
3284    def _testRecvmsgIntoGenerator(self):
3285        self.sendToServer(MSG)
3286
3287    def testRecvmsgIntoArray(self):
3288        # Receive into an array rather than the usual bytearray.
3289        buf = array.array("B", [0] * len(MSG))
3290        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
3291        self.assertEqual(nbytes, len(MSG))
3292        self.assertEqual(buf.tobytes(), MSG)
3293        self.checkRecvmsgAddress(addr, self.cli_addr)
3294        self.assertEqual(ancdata, [])
3295        self.checkFlags(flags, eor=True)
3296
3297    def _testRecvmsgIntoArray(self):
3298        self.sendToServer(MSG)
3299
3300    def testRecvmsgIntoScatter(self):
3301        # Receive into multiple buffers (scatter write).
3302        b1 = bytearray(b"----")
3303        b2 = bytearray(b"0123456789")
3304        b3 = bytearray(b"--------------")
3305        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3306            [b1, memoryview(b2)[2:9], b3])
3307        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
3308        self.assertEqual(b1, bytearray(b"Mary"))
3309        self.assertEqual(b2, bytearray(b"01 had a 9"))
3310        self.assertEqual(b3, bytearray(b"little lamb---"))
3311        self.checkRecvmsgAddress(addr, self.cli_addr)
3312        self.assertEqual(ancdata, [])
3313        self.checkFlags(flags, eor=True)
3314
3315    def _testRecvmsgIntoScatter(self):
3316        self.sendToServer(b"Mary had a little lamb")
3317
3318
3319class CmsgMacroTests(unittest.TestCase):
3320    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
3321    # assumptions used by sendmsg() and recvmsg[_into](), which share
3322    # code with these functions.
3323
3324    # Match the definition in socketmodule.c
3325    try:
3326        import _testcapi
3327    except ImportError:
3328        socklen_t_limit = 0x7fffffff
3329    else:
3330        socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
3331
3332    @requireAttrs(socket, "CMSG_LEN")
3333    def testCMSG_LEN(self):
3334        # Test CMSG_LEN() with various valid and invalid values,
3335        # checking the assumptions used by recvmsg() and sendmsg().
3336        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
3337        values = list(range(257)) + list(range(toobig - 257, toobig))
3338
3339        # struct cmsghdr has at least three members, two of which are ints
3340        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
3341        for n in values:
3342            ret = socket.CMSG_LEN(n)
3343            # This is how recvmsg() calculates the data size
3344            self.assertEqual(ret - socket.CMSG_LEN(0), n)
3345            self.assertLessEqual(ret, self.socklen_t_limit)
3346
3347        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
3348        # sendmsg() shares code with these functions, and requires
3349        # that it reject values over the limit.
3350        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
3351        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
3352
3353    @requireAttrs(socket, "CMSG_SPACE")
3354    def testCMSG_SPACE(self):
3355        # Test CMSG_SPACE() with various valid and invalid values,
3356        # checking the assumptions used by sendmsg().
3357        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
3358        values = list(range(257)) + list(range(toobig - 257, toobig))
3359
3360        last = socket.CMSG_SPACE(0)
3361        # struct cmsghdr has at least three members, two of which are ints
3362        self.assertGreater(last, array.array("i").itemsize * 2)
3363        for n in values:
3364            ret = socket.CMSG_SPACE(n)
3365            self.assertGreaterEqual(ret, last)
3366            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
3367            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
3368            self.assertLessEqual(ret, self.socklen_t_limit)
3369            last = ret
3370
3371        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
3372        # sendmsg() shares code with these functions, and requires
3373        # that it reject values over the limit.
3374        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
3375        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
3376
3377
3378class SCMRightsTest(SendrecvmsgServerTimeoutBase):
3379    # Tests for file descriptor passing on Unix-domain sockets.
3380
3381    # Invalid file descriptor value that's unlikely to evaluate to a
3382    # real FD even if one of its bytes is replaced with a different
3383    # value (which shouldn't actually happen).
3384    badfd = -0x5555
3385
3386    def newFDs(self, n):
3387        # Return a list of n file descriptors for newly-created files
3388        # containing their list indices as ASCII numbers.
3389        fds = []
3390        for i in range(n):
3391            fd, path = tempfile.mkstemp()
3392            self.addCleanup(os.unlink, path)
3393            self.addCleanup(os.close, fd)
3394            os.write(fd, str(i).encode())
3395            fds.append(fd)
3396        return fds
3397
3398    def checkFDs(self, fds):
3399        # Check that the file descriptors in the given list contain
3400        # their correct list indices as ASCII numbers.
3401        for n, fd in enumerate(fds):
3402            os.lseek(fd, 0, os.SEEK_SET)
3403            self.assertEqual(os.read(fd, 1024), str(n).encode())
3404
3405    def registerRecvmsgResult(self, result):
3406        self.addCleanup(self.closeRecvmsgFDs, result)
3407
3408    def closeRecvmsgFDs(self, recvmsg_result):
3409        # Close all file descriptors specified in the ancillary data
3410        # of the given return value from recvmsg() or recvmsg_into().
3411        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
3412            if (cmsg_level == socket.SOL_SOCKET and
3413                    cmsg_type == socket.SCM_RIGHTS):
3414                fds = array.array("i")
3415                fds.frombytes(cmsg_data[:
3416                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3417                for fd in fds:
3418                    os.close(fd)
3419
3420    def createAndSendFDs(self, n):
3421        # Send n new file descriptors created by newFDs() to the
3422        # server, with the constant MSG as the non-ancillary data.
3423        self.assertEqual(
3424            self.sendmsgToServer([MSG],
3425                                 [(socket.SOL_SOCKET,
3426                                   socket.SCM_RIGHTS,
3427                                   array.array("i", self.newFDs(n)))]),
3428            len(MSG))
3429
3430    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
3431        # Check that constant MSG was received with numfds file
3432        # descriptors in a maximum of maxcmsgs control messages (which
3433        # must contain only complete integers).  By default, check
3434        # that MSG_CTRUNC is unset, but ignore any flags in
3435        # ignoreflags.
3436        msg, ancdata, flags, addr = result
3437        self.assertEqual(msg, MSG)
3438        self.checkRecvmsgAddress(addr, self.cli_addr)
3439        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3440                        ignore=ignoreflags)
3441
3442        self.assertIsInstance(ancdata, list)
3443        self.assertLessEqual(len(ancdata), maxcmsgs)
3444        fds = array.array("i")
3445        for item in ancdata:
3446            self.assertIsInstance(item, tuple)
3447            cmsg_level, cmsg_type, cmsg_data = item
3448            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3449            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3450            self.assertIsInstance(cmsg_data, bytes)
3451            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
3452            fds.frombytes(cmsg_data)
3453
3454        self.assertEqual(len(fds), numfds)
3455        self.checkFDs(fds)
3456
3457    def testFDPassSimple(self):
3458        # Pass a single FD (array read from bytes object).
3459        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
3460                                               len(MSG), 10240))
3461
3462    def _testFDPassSimple(self):
3463        self.assertEqual(
3464            self.sendmsgToServer(
3465                [MSG],
3466                [(socket.SOL_SOCKET,
3467                  socket.SCM_RIGHTS,
3468                  array.array("i", self.newFDs(1)).tobytes())]),
3469            len(MSG))
3470
3471    def testMultipleFDPass(self):
3472        # Pass multiple FDs in a single array.
3473        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
3474                                               len(MSG), 10240))
3475
3476    def _testMultipleFDPass(self):
3477        self.createAndSendFDs(4)
3478
3479    @requireAttrs(socket, "CMSG_SPACE")
3480    def testFDPassCMSG_SPACE(self):
3481        # Test using CMSG_SPACE() to calculate ancillary buffer size.
3482        self.checkRecvmsgFDs(
3483            4, self.doRecvmsg(self.serv_sock, len(MSG),
3484                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
3485
3486    @testFDPassCMSG_SPACE.client_skip
3487    def _testFDPassCMSG_SPACE(self):
3488        self.createAndSendFDs(4)
3489
3490    def testFDPassCMSG_LEN(self):
3491        # Test using CMSG_LEN() to calculate ancillary buffer size.
3492        self.checkRecvmsgFDs(1,
3493                             self.doRecvmsg(self.serv_sock, len(MSG),
3494                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
3495                             # RFC 3542 says implementations may set
3496                             # MSG_CTRUNC if there isn't enough space
3497                             # for trailing padding.
3498                             ignoreflags=socket.MSG_CTRUNC)
3499
3500    def _testFDPassCMSG_LEN(self):
3501        self.createAndSendFDs(1)
3502
3503    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3504    @unittest.skipIf(AIX, "skipping, see issue #22397")
3505    @requireAttrs(socket, "CMSG_SPACE")
3506    def testFDPassSeparate(self):
3507        # Pass two FDs in two separate arrays.  Arrays may be combined
3508        # into a single control message by the OS.
3509        self.checkRecvmsgFDs(2,
3510                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
3511                             maxcmsgs=2)
3512
3513    @testFDPassSeparate.client_skip
3514    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3515    @unittest.skipIf(AIX, "skipping, see issue #22397")
3516    def _testFDPassSeparate(self):
3517        fd0, fd1 = self.newFDs(2)
3518        self.assertEqual(
3519            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3520                                          socket.SCM_RIGHTS,
3521                                          array.array("i", [fd0])),
3522                                         (socket.SOL_SOCKET,
3523                                          socket.SCM_RIGHTS,
3524                                          array.array("i", [fd1]))]),
3525            len(MSG))
3526
3527    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3528    @unittest.skipIf(AIX, "skipping, see issue #22397")
3529    @requireAttrs(socket, "CMSG_SPACE")
3530    def testFDPassSeparateMinSpace(self):
3531        # Pass two FDs in two separate arrays, receiving them into the
3532        # minimum space for two arrays.
3533        num_fds = 2
3534        self.checkRecvmsgFDs(num_fds,
3535                             self.doRecvmsg(self.serv_sock, len(MSG),
3536                                            socket.CMSG_SPACE(SIZEOF_INT) +
3537                                            socket.CMSG_LEN(SIZEOF_INT * num_fds)),
3538                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
3539
3540    @testFDPassSeparateMinSpace.client_skip
3541    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3542    @unittest.skipIf(AIX, "skipping, see issue #22397")
3543    def _testFDPassSeparateMinSpace(self):
3544        fd0, fd1 = self.newFDs(2)
3545        self.assertEqual(
3546            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3547                                          socket.SCM_RIGHTS,
3548                                          array.array("i", [fd0])),
3549                                         (socket.SOL_SOCKET,
3550                                          socket.SCM_RIGHTS,
3551                                          array.array("i", [fd1]))]),
3552            len(MSG))
3553
3554    def sendAncillaryIfPossible(self, msg, ancdata):
3555        # Try to send msg and ancdata to server, but if the system
3556        # call fails, just send msg with no ancillary data.
3557        try:
3558            nbytes = self.sendmsgToServer([msg], ancdata)
3559        except OSError as e:
3560            # Check that it was the system call that failed
3561            self.assertIsInstance(e.errno, int)
3562            nbytes = self.sendmsgToServer([msg])
3563        self.assertEqual(nbytes, len(msg))
3564
3565    @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
3566    def testFDPassEmpty(self):
3567        # Try to pass an empty FD array.  Can receive either no array
3568        # or an empty array.
3569        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
3570                                               len(MSG), 10240),
3571                             ignoreflags=socket.MSG_CTRUNC)
3572
3573    def _testFDPassEmpty(self):
3574        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
3575                                            socket.SCM_RIGHTS,
3576                                            b"")])
3577
3578    def testFDPassPartialInt(self):
3579        # Try to pass a truncated FD array.
3580        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3581                                                   len(MSG), 10240)
3582        self.assertEqual(msg, MSG)
3583        self.checkRecvmsgAddress(addr, self.cli_addr)
3584        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3585        self.assertLessEqual(len(ancdata), 1)
3586        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3587            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3588            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3589            self.assertLess(len(cmsg_data), SIZEOF_INT)
3590
3591    def _testFDPassPartialInt(self):
3592        self.sendAncillaryIfPossible(
3593            MSG,
3594            [(socket.SOL_SOCKET,
3595              socket.SCM_RIGHTS,
3596              array.array("i", [self.badfd]).tobytes()[:-1])])
3597
3598    @requireAttrs(socket, "CMSG_SPACE")
3599    def testFDPassPartialIntInMiddle(self):
3600        # Try to pass two FD arrays, the first of which is truncated.
3601        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3602                                                   len(MSG), 10240)
3603        self.assertEqual(msg, MSG)
3604        self.checkRecvmsgAddress(addr, self.cli_addr)
3605        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3606        self.assertLessEqual(len(ancdata), 2)
3607        fds = array.array("i")
3608        # Arrays may have been combined in a single control message
3609        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3610            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3611            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3612            fds.frombytes(cmsg_data[:
3613                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3614        self.assertLessEqual(len(fds), 2)
3615        self.checkFDs(fds)
3616
3617    @testFDPassPartialIntInMiddle.client_skip
3618    def _testFDPassPartialIntInMiddle(self):
3619        fd0, fd1 = self.newFDs(2)
3620        self.sendAncillaryIfPossible(
3621            MSG,
3622            [(socket.SOL_SOCKET,
3623              socket.SCM_RIGHTS,
3624              array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
3625             (socket.SOL_SOCKET,
3626              socket.SCM_RIGHTS,
3627              array.array("i", [fd1]))])
3628
3629    def checkTruncatedHeader(self, result, ignoreflags=0):
3630        # Check that no ancillary data items are returned when data is
3631        # truncated inside the cmsghdr structure.
3632        msg, ancdata, flags, addr = result
3633        self.assertEqual(msg, MSG)
3634        self.checkRecvmsgAddress(addr, self.cli_addr)
3635        self.assertEqual(ancdata, [])
3636        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3637                        ignore=ignoreflags)
3638
3639    def testCmsgTruncNoBufSize(self):
3640        # Check that no ancillary data is received when no buffer size
3641        # is specified.
3642        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
3643                                  # BSD seems to set MSG_CTRUNC only
3644                                  # if an item has been partially
3645                                  # received.
3646                                  ignoreflags=socket.MSG_CTRUNC)
3647
3648    def _testCmsgTruncNoBufSize(self):
3649        self.createAndSendFDs(1)
3650
3651    def testCmsgTrunc0(self):
3652        # Check that no ancillary data is received when buffer size is 0.
3653        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
3654                                  ignoreflags=socket.MSG_CTRUNC)
3655
3656    def _testCmsgTrunc0(self):
3657        self.createAndSendFDs(1)
3658
3659    # Check that no ancillary data is returned for various non-zero
3660    # (but still too small) buffer sizes.
3661
3662    def testCmsgTrunc1(self):
3663        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
3664
3665    def _testCmsgTrunc1(self):
3666        self.createAndSendFDs(1)
3667
3668    def testCmsgTrunc2Int(self):
3669        # The cmsghdr structure has at least three members, two of
3670        # which are ints, so we still shouldn't see any ancillary
3671        # data.
3672        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3673                                                 SIZEOF_INT * 2))
3674
3675    def _testCmsgTrunc2Int(self):
3676        self.createAndSendFDs(1)
3677
3678    def testCmsgTruncLen0Minus1(self):
3679        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3680                                                 socket.CMSG_LEN(0) - 1))
3681
3682    def _testCmsgTruncLen0Minus1(self):
3683        self.createAndSendFDs(1)
3684
3685    # The following tests try to truncate the control message in the
3686    # middle of the FD array.
3687
3688    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3689        # Check that file descriptor data is truncated to between
3690        # mindata and maxdata bytes when received with buffer size
3691        # ancbuf, and that any complete file descriptor numbers are
3692        # valid.
3693        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3694                                                   len(MSG), ancbuf)
3695        self.assertEqual(msg, MSG)
3696        self.checkRecvmsgAddress(addr, self.cli_addr)
3697        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3698
3699        if mindata == 0 and ancdata == []:
3700            return
3701        self.assertEqual(len(ancdata), 1)
3702        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3703        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3704        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3705        self.assertGreaterEqual(len(cmsg_data), mindata)
3706        self.assertLessEqual(len(cmsg_data), maxdata)
3707        fds = array.array("i")
3708        fds.frombytes(cmsg_data[:
3709                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3710        self.checkFDs(fds)
3711
3712    def testCmsgTruncLen0(self):
3713        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3714
3715    def _testCmsgTruncLen0(self):
3716        self.createAndSendFDs(1)
3717
3718    def testCmsgTruncLen0Plus1(self):
3719        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3720
3721    def _testCmsgTruncLen0Plus1(self):
3722        self.createAndSendFDs(2)
3723
3724    def testCmsgTruncLen1(self):
3725        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3726                                 maxdata=SIZEOF_INT)
3727
3728    def _testCmsgTruncLen1(self):
3729        self.createAndSendFDs(2)
3730
3731    def testCmsgTruncLen2Minus1(self):
3732        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3733                                 maxdata=(2 * SIZEOF_INT) - 1)
3734
3735    def _testCmsgTruncLen2Minus1(self):
3736        self.createAndSendFDs(2)
3737
3738
3739class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3740    # Test sendmsg() and recvmsg[_into]() using the ancillary data
3741    # features of the RFC 3542 Advanced Sockets API for IPv6.
3742    # Currently we can only handle certain data items (e.g. traffic
3743    # class, hop limit, MTU discovery and fragmentation settings)
3744    # without resorting to unportable means such as the struct module,
3745    # but the tests here are aimed at testing the ancillary data
3746    # handling in sendmsg() and recvmsg() rather than the IPv6 API
3747    # itself.
3748
3749    # Test value to use when setting hop limit of packet
3750    hop_limit = 2
3751
3752    # Test value to use when setting traffic class of packet.
3753    # -1 means "use kernel default".
3754    traffic_class = -1
3755
3756    def ancillaryMapping(self, ancdata):
3757        # Given ancillary data list ancdata, return a mapping from
3758        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3759        # Check that no (level, type) pair appears more than once.
3760        d = {}
3761        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3762            self.assertNotIn((cmsg_level, cmsg_type), d)
3763            d[(cmsg_level, cmsg_type)] = cmsg_data
3764        return d
3765
3766    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3767        # Receive hop limit into ancbufsize bytes of ancillary data
3768        # space.  Check that data is MSG, ancillary data is not
3769        # truncated (but ignore any flags in ignoreflags), and hop
3770        # limit is between 0 and maxhop inclusive.
3771        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3772                                  socket.IPV6_RECVHOPLIMIT, 1)
3773        self.misc_event.set()
3774        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3775                                                   len(MSG), ancbufsize)
3776
3777        self.assertEqual(msg, MSG)
3778        self.checkRecvmsgAddress(addr, self.cli_addr)
3779        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3780                        ignore=ignoreflags)
3781
3782        self.assertEqual(len(ancdata), 1)
3783        self.assertIsInstance(ancdata[0], tuple)
3784        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3785        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3786        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3787        self.assertIsInstance(cmsg_data, bytes)
3788        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3789        a = array.array("i")
3790        a.frombytes(cmsg_data)
3791        self.assertGreaterEqual(a[0], 0)
3792        self.assertLessEqual(a[0], maxhop)
3793
3794    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3795    def testRecvHopLimit(self):
3796        # Test receiving the packet hop limit as ancillary data.
3797        self.checkHopLimit(ancbufsize=10240)
3798
3799    @testRecvHopLimit.client_skip
3800    def _testRecvHopLimit(self):
3801        # Need to wait until server has asked to receive ancillary
3802        # data, as implementations are not required to buffer it
3803        # otherwise.
3804        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3805        self.sendToServer(MSG)
3806
3807    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3808    def testRecvHopLimitCMSG_SPACE(self):
3809        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3810        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3811
3812    @testRecvHopLimitCMSG_SPACE.client_skip
3813    def _testRecvHopLimitCMSG_SPACE(self):
3814        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3815        self.sendToServer(MSG)
3816
3817    # Could test receiving into buffer sized using CMSG_LEN, but RFC
3818    # 3542 says portable applications must provide space for trailing
3819    # padding.  Implementations may set MSG_CTRUNC if there isn't
3820    # enough space for the padding.
3821
3822    @requireAttrs(socket.socket, "sendmsg")
3823    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3824    def testSetHopLimit(self):
3825        # Test setting hop limit on outgoing packet and receiving it
3826        # at the other end.
3827        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3828
3829    @testSetHopLimit.client_skip
3830    def _testSetHopLimit(self):
3831        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3832        self.assertEqual(
3833            self.sendmsgToServer([MSG],
3834                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3835                                   array.array("i", [self.hop_limit]))]),
3836            len(MSG))
3837
3838    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3839                                     ignoreflags=0):
3840        # Receive traffic class and hop limit into ancbufsize bytes of
3841        # ancillary data space.  Check that data is MSG, ancillary
3842        # data is not truncated (but ignore any flags in ignoreflags),
3843        # and traffic class and hop limit are in range (hop limit no
3844        # more than maxhop).
3845        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3846                                  socket.IPV6_RECVHOPLIMIT, 1)
3847        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3848                                  socket.IPV6_RECVTCLASS, 1)
3849        self.misc_event.set()
3850        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3851                                                   len(MSG), ancbufsize)
3852
3853        self.assertEqual(msg, MSG)
3854        self.checkRecvmsgAddress(addr, self.cli_addr)
3855        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3856                        ignore=ignoreflags)
3857        self.assertEqual(len(ancdata), 2)
3858        ancmap = self.ancillaryMapping(ancdata)
3859
3860        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3861        self.assertEqual(len(tcdata), SIZEOF_INT)
3862        a = array.array("i")
3863        a.frombytes(tcdata)
3864        self.assertGreaterEqual(a[0], 0)
3865        self.assertLessEqual(a[0], 255)
3866
3867        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3868        self.assertEqual(len(hldata), SIZEOF_INT)
3869        a = array.array("i")
3870        a.frombytes(hldata)
3871        self.assertGreaterEqual(a[0], 0)
3872        self.assertLessEqual(a[0], maxhop)
3873
3874    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3875                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3876    def testRecvTrafficClassAndHopLimit(self):
3877        # Test receiving traffic class and hop limit as ancillary data.
3878        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3879
3880    @testRecvTrafficClassAndHopLimit.client_skip
3881    def _testRecvTrafficClassAndHopLimit(self):
3882        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3883        self.sendToServer(MSG)
3884
3885    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3886                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3887    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3888        # Test receiving traffic class and hop limit, using
3889        # CMSG_SPACE() to calculate buffer size.
3890        self.checkTrafficClassAndHopLimit(
3891            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3892
3893    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3894    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3895        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3896        self.sendToServer(MSG)
3897
3898    @requireAttrs(socket.socket, "sendmsg")
3899    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3900                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3901    def testSetTrafficClassAndHopLimit(self):
3902        # Test setting traffic class and hop limit on outgoing packet,
3903        # and receiving them at the other end.
3904        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3905                                          maxhop=self.hop_limit)
3906
3907    @testSetTrafficClassAndHopLimit.client_skip
3908    def _testSetTrafficClassAndHopLimit(self):
3909        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3910        self.assertEqual(
3911            self.sendmsgToServer([MSG],
3912                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3913                                   array.array("i", [self.traffic_class])),
3914                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3915                                   array.array("i", [self.hop_limit]))]),
3916            len(MSG))
3917
3918    @requireAttrs(socket.socket, "sendmsg")
3919    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3920                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3921    def testOddCmsgSize(self):
3922        # Try to send ancillary data with first item one byte too
3923        # long.  Fall back to sending with correct size if this fails,
3924        # and check that second item was handled correctly.
3925        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3926                                          maxhop=self.hop_limit)
3927
3928    @testOddCmsgSize.client_skip
3929    def _testOddCmsgSize(self):
3930        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3931        try:
3932            nbytes = self.sendmsgToServer(
3933                [MSG],
3934                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3935                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
3936                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3937                  array.array("i", [self.hop_limit]))])
3938        except OSError as e:
3939            self.assertIsInstance(e.errno, int)
3940            nbytes = self.sendmsgToServer(
3941                [MSG],
3942                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3943                  array.array("i", [self.traffic_class])),
3944                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3945                  array.array("i", [self.hop_limit]))])
3946            self.assertEqual(nbytes, len(MSG))
3947
3948    # Tests for proper handling of truncated ancillary data
3949
3950    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
3951        # Receive hop limit into ancbufsize bytes of ancillary data
3952        # space, which should be too small to contain the ancillary
3953        # data header (if ancbufsize is None, pass no second argument
3954        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
3955        # (unless included in ignoreflags), and no ancillary data is
3956        # returned.
3957        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3958                                  socket.IPV6_RECVHOPLIMIT, 1)
3959        self.misc_event.set()
3960        args = () if ancbufsize is None else (ancbufsize,)
3961        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3962                                                   len(MSG), *args)
3963
3964        self.assertEqual(msg, MSG)
3965        self.checkRecvmsgAddress(addr, self.cli_addr)
3966        self.assertEqual(ancdata, [])
3967        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3968                        ignore=ignoreflags)
3969
3970    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3971    def testCmsgTruncNoBufSize(self):
3972        # Check that no ancillary data is received when no ancillary
3973        # buffer size is provided.
3974        self.checkHopLimitTruncatedHeader(ancbufsize=None,
3975                                          # BSD seems to set
3976                                          # MSG_CTRUNC only if an item
3977                                          # has been partially
3978                                          # received.
3979                                          ignoreflags=socket.MSG_CTRUNC)
3980
3981    @testCmsgTruncNoBufSize.client_skip
3982    def _testCmsgTruncNoBufSize(self):
3983        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3984        self.sendToServer(MSG)
3985
3986    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3987    def testSingleCmsgTrunc0(self):
3988        # Check that no ancillary data is received when ancillary
3989        # buffer size is zero.
3990        self.checkHopLimitTruncatedHeader(ancbufsize=0,
3991                                          ignoreflags=socket.MSG_CTRUNC)
3992
3993    @testSingleCmsgTrunc0.client_skip
3994    def _testSingleCmsgTrunc0(self):
3995        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3996        self.sendToServer(MSG)
3997
3998    # Check that no ancillary data is returned for various non-zero
3999    # (but still too small) buffer sizes.
4000
4001    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4002    def testSingleCmsgTrunc1(self):
4003        self.checkHopLimitTruncatedHeader(ancbufsize=1)
4004
4005    @testSingleCmsgTrunc1.client_skip
4006    def _testSingleCmsgTrunc1(self):
4007        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4008        self.sendToServer(MSG)
4009
4010    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4011    def testSingleCmsgTrunc2Int(self):
4012        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
4013
4014    @testSingleCmsgTrunc2Int.client_skip
4015    def _testSingleCmsgTrunc2Int(self):
4016        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4017        self.sendToServer(MSG)
4018
4019    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4020    def testSingleCmsgTruncLen0Minus1(self):
4021        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
4022
4023    @testSingleCmsgTruncLen0Minus1.client_skip
4024    def _testSingleCmsgTruncLen0Minus1(self):
4025        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4026        self.sendToServer(MSG)
4027
4028    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4029    def testSingleCmsgTruncInData(self):
4030        # Test truncation of a control message inside its associated
4031        # data.  The message may be returned with its data truncated,
4032        # or not returned at all.
4033        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4034                                  socket.IPV6_RECVHOPLIMIT, 1)
4035        self.misc_event.set()
4036        msg, ancdata, flags, addr = self.doRecvmsg(
4037            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
4038
4039        self.assertEqual(msg, MSG)
4040        self.checkRecvmsgAddress(addr, self.cli_addr)
4041        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4042
4043        self.assertLessEqual(len(ancdata), 1)
4044        if ancdata:
4045            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4046            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4047            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
4048            self.assertLess(len(cmsg_data), SIZEOF_INT)
4049
4050    @testSingleCmsgTruncInData.client_skip
4051    def _testSingleCmsgTruncInData(self):
4052        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4053        self.sendToServer(MSG)
4054
4055    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
4056        # Receive traffic class and hop limit into ancbufsize bytes of
4057        # ancillary data space, which should be large enough to
4058        # contain the first item, but too small to contain the header
4059        # of the second.  Check that data is MSG, MSG_CTRUNC is set
4060        # (unless included in ignoreflags), and only one ancillary
4061        # data item is returned.
4062        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4063                                  socket.IPV6_RECVHOPLIMIT, 1)
4064        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4065                                  socket.IPV6_RECVTCLASS, 1)
4066        self.misc_event.set()
4067        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
4068                                                   len(MSG), ancbufsize)
4069
4070        self.assertEqual(msg, MSG)
4071        self.checkRecvmsgAddress(addr, self.cli_addr)
4072        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
4073                        ignore=ignoreflags)
4074
4075        self.assertEqual(len(ancdata), 1)
4076        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4077        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4078        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
4079        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4080        a = array.array("i")
4081        a.frombytes(cmsg_data)
4082        self.assertGreaterEqual(a[0], 0)
4083        self.assertLessEqual(a[0], 255)
4084
4085    # Try the above test with various buffer sizes.
4086
4087    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4088                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4089    def testSecondCmsgTrunc0(self):
4090        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
4091                                        ignoreflags=socket.MSG_CTRUNC)
4092
4093    @testSecondCmsgTrunc0.client_skip
4094    def _testSecondCmsgTrunc0(self):
4095        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4096        self.sendToServer(MSG)
4097
4098    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4099                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4100    def testSecondCmsgTrunc1(self):
4101        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
4102
4103    @testSecondCmsgTrunc1.client_skip
4104    def _testSecondCmsgTrunc1(self):
4105        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4106        self.sendToServer(MSG)
4107
4108    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4109                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4110    def testSecondCmsgTrunc2Int(self):
4111        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4112                                        2 * SIZEOF_INT)
4113
4114    @testSecondCmsgTrunc2Int.client_skip
4115    def _testSecondCmsgTrunc2Int(self):
4116        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4117        self.sendToServer(MSG)
4118
4119    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4120                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4121    def testSecondCmsgTruncLen0Minus1(self):
4122        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4123                                        socket.CMSG_LEN(0) - 1)
4124
4125    @testSecondCmsgTruncLen0Minus1.client_skip
4126    def _testSecondCmsgTruncLen0Minus1(self):
4127        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4128        self.sendToServer(MSG)
4129
4130    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4131                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4132    def testSecomdCmsgTruncInData(self):
4133        # Test truncation of the second of two control messages inside
4134        # its associated data.
4135        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4136                                  socket.IPV6_RECVHOPLIMIT, 1)
4137        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4138                                  socket.IPV6_RECVTCLASS, 1)
4139        self.misc_event.set()
4140        msg, ancdata, flags, addr = self.doRecvmsg(
4141            self.serv_sock, len(MSG),
4142            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
4143
4144        self.assertEqual(msg, MSG)
4145        self.checkRecvmsgAddress(addr, self.cli_addr)
4146        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4147
4148        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
4149
4150        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4151        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4152        cmsg_types.remove(cmsg_type)
4153        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4154        a = array.array("i")
4155        a.frombytes(cmsg_data)
4156        self.assertGreaterEqual(a[0], 0)
4157        self.assertLessEqual(a[0], 255)
4158
4159        if ancdata:
4160            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4161            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4162            cmsg_types.remove(cmsg_type)
4163            self.assertLess(len(cmsg_data), SIZEOF_INT)
4164
4165        self.assertEqual(ancdata, [])
4166
4167    @testSecomdCmsgTruncInData.client_skip
4168    def _testSecomdCmsgTruncInData(self):
4169        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4170        self.sendToServer(MSG)
4171
4172
4173# Derive concrete test classes for different socket types.
4174
4175class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
4176                             SendrecvmsgConnectionlessBase,
4177                             ThreadedSocketTestMixin, UDPTestBase):
4178    pass
4179
4180@requireAttrs(socket.socket, "sendmsg")
4181class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
4182    pass
4183
4184@requireAttrs(socket.socket, "recvmsg")
4185class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
4186    pass
4187
4188@requireAttrs(socket.socket, "recvmsg_into")
4189class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
4190    pass
4191
4192
4193class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
4194                              SendrecvmsgConnectionlessBase,
4195                              ThreadedSocketTestMixin, UDP6TestBase):
4196
4197    def checkRecvmsgAddress(self, addr1, addr2):
4198        # Called to compare the received address with the address of
4199        # the peer, ignoring scope ID
4200        self.assertEqual(addr1[:-1], addr2[:-1])
4201
4202@requireAttrs(socket.socket, "sendmsg")
4203@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4204@requireSocket("AF_INET6", "SOCK_DGRAM")
4205class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
4206    pass
4207
4208@requireAttrs(socket.socket, "recvmsg")
4209@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4210@requireSocket("AF_INET6", "SOCK_DGRAM")
4211class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
4212    pass
4213
4214@requireAttrs(socket.socket, "recvmsg_into")
4215@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4216@requireSocket("AF_INET6", "SOCK_DGRAM")
4217class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
4218    pass
4219
4220@requireAttrs(socket.socket, "recvmsg")
4221@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4222@requireAttrs(socket, "IPPROTO_IPV6")
4223@requireSocket("AF_INET6", "SOCK_DGRAM")
4224class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
4225                                      SendrecvmsgUDP6TestBase):
4226    pass
4227
4228@requireAttrs(socket.socket, "recvmsg_into")
4229@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4230@requireAttrs(socket, "IPPROTO_IPV6")
4231@requireSocket("AF_INET6", "SOCK_DGRAM")
4232class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
4233                                          RFC3542AncillaryTest,
4234                                          SendrecvmsgUDP6TestBase):
4235    pass
4236
4237
4238@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4239          'UDPLITE sockets required for this test.')
4240class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase,
4241                             SendrecvmsgConnectionlessBase,
4242                             ThreadedSocketTestMixin, UDPLITETestBase):
4243    pass
4244
4245@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4246          'UDPLITE sockets required for this test.')
4247@requireAttrs(socket.socket, "sendmsg")
4248class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase):
4249    pass
4250
4251@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4252          'UDPLITE sockets required for this test.')
4253@requireAttrs(socket.socket, "recvmsg")
4254class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase):
4255    pass
4256
4257@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4258          'UDPLITE sockets required for this test.')
4259@requireAttrs(socket.socket, "recvmsg_into")
4260class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase):
4261    pass
4262
4263
4264@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4265          'UDPLITE sockets required for this test.')
4266class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase,
4267                              SendrecvmsgConnectionlessBase,
4268                              ThreadedSocketTestMixin, UDPLITE6TestBase):
4269
4270    def checkRecvmsgAddress(self, addr1, addr2):
4271        # Called to compare the received address with the address of
4272        # the peer, ignoring scope ID
4273        self.assertEqual(addr1[:-1], addr2[:-1])
4274
4275@requireAttrs(socket.socket, "sendmsg")
4276@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4277@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4278          'UDPLITE sockets required for this test.')
4279@requireSocket("AF_INET6", "SOCK_DGRAM")
4280class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase):
4281    pass
4282
4283@requireAttrs(socket.socket, "recvmsg")
4284@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4285@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4286          'UDPLITE sockets required for this test.')
4287@requireSocket("AF_INET6", "SOCK_DGRAM")
4288class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase):
4289    pass
4290
4291@requireAttrs(socket.socket, "recvmsg_into")
4292@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4293@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4294          'UDPLITE sockets required for this test.')
4295@requireSocket("AF_INET6", "SOCK_DGRAM")
4296class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase):
4297    pass
4298
4299@requireAttrs(socket.socket, "recvmsg")
4300@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4301@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4302          'UDPLITE sockets required for this test.')
4303@requireAttrs(socket, "IPPROTO_IPV6")
4304@requireSocket("AF_INET6", "SOCK_DGRAM")
4305class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest,
4306                                      SendrecvmsgUDPLITE6TestBase):
4307    pass
4308
4309@requireAttrs(socket.socket, "recvmsg_into")
4310@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4311@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4312          'UDPLITE sockets required for this test.')
4313@requireAttrs(socket, "IPPROTO_IPV6")
4314@requireSocket("AF_INET6", "SOCK_DGRAM")
4315class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin,
4316                                          RFC3542AncillaryTest,
4317                                          SendrecvmsgUDPLITE6TestBase):
4318    pass
4319
4320
4321class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
4322                             ConnectedStreamTestMixin, TCPTestBase):
4323    pass
4324
4325@requireAttrs(socket.socket, "sendmsg")
4326class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
4327    pass
4328
4329@requireAttrs(socket.socket, "recvmsg")
4330class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
4331                     SendrecvmsgTCPTestBase):
4332    pass
4333
4334@requireAttrs(socket.socket, "recvmsg_into")
4335class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4336                         SendrecvmsgTCPTestBase):
4337    pass
4338
4339
4340class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
4341                                    SendrecvmsgConnectedBase,
4342                                    ConnectedStreamTestMixin, SCTPStreamBase):
4343    pass
4344
4345@requireAttrs(socket.socket, "sendmsg")
4346@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4347@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4348class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
4349    pass
4350
4351@requireAttrs(socket.socket, "recvmsg")
4352@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4353@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4354class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4355                            SendrecvmsgSCTPStreamTestBase):
4356
4357    def testRecvmsgEOF(self):
4358        try:
4359            super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
4360        except OSError as e:
4361            if e.errno != errno.ENOTCONN:
4362                raise
4363            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4364
4365@requireAttrs(socket.socket, "recvmsg_into")
4366@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4367@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4368class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4369                                SendrecvmsgSCTPStreamTestBase):
4370
4371    def testRecvmsgEOF(self):
4372        try:
4373            super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
4374        except OSError as e:
4375            if e.errno != errno.ENOTCONN:
4376                raise
4377            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4378
4379
4380class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
4381                                    ConnectedStreamTestMixin, UnixStreamBase):
4382    pass
4383
4384@requireAttrs(socket.socket, "sendmsg")
4385@requireAttrs(socket, "AF_UNIX")
4386class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
4387    pass
4388
4389@requireAttrs(socket.socket, "recvmsg")
4390@requireAttrs(socket, "AF_UNIX")
4391class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4392                            SendrecvmsgUnixStreamTestBase):
4393    pass
4394
4395@requireAttrs(socket.socket, "recvmsg_into")
4396@requireAttrs(socket, "AF_UNIX")
4397class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4398                                SendrecvmsgUnixStreamTestBase):
4399    pass
4400
4401@requireAttrs(socket.socket, "sendmsg", "recvmsg")
4402@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4403class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
4404    pass
4405
4406@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
4407@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4408class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
4409                                     SendrecvmsgUnixStreamTestBase):
4410    pass
4411
4412
4413# Test interrupting the interruptible send/receive methods with a
4414# signal when a timeout is set.  These tests avoid having multiple
4415# threads alive during the test so that the OS cannot deliver the
4416# signal to the wrong one.
4417
4418class InterruptedTimeoutBase(unittest.TestCase):
4419    # Base class for interrupted send/receive tests.  Installs an
4420    # empty handler for SIGALRM and removes it on teardown, along with
4421    # any scheduled alarms.
4422
4423    def setUp(self):
4424        super().setUp()
4425        orig_alrm_handler = signal.signal(signal.SIGALRM,
4426                                          lambda signum, frame: 1 / 0)
4427        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
4428
4429    # Timeout for socket operations
4430    timeout = support.LOOPBACK_TIMEOUT
4431
4432    # Provide setAlarm() method to schedule delivery of SIGALRM after
4433    # given number of seconds, or cancel it if zero, and an
4434    # appropriate time value to use.  Use setitimer() if available.
4435    if hasattr(signal, "setitimer"):
4436        alarm_time = 0.05
4437
4438        def setAlarm(self, seconds):
4439            signal.setitimer(signal.ITIMER_REAL, seconds)
4440    else:
4441        # Old systems may deliver the alarm up to one second early
4442        alarm_time = 2
4443
4444        def setAlarm(self, seconds):
4445            signal.alarm(seconds)
4446
4447
4448# Require siginterrupt() in order to ensure that system calls are
4449# interrupted by default.
4450@requireAttrs(signal, "siginterrupt")
4451@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4452                     "Don't have signal.alarm or signal.setitimer")
4453class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
4454    # Test interrupting the recv*() methods with signals when a
4455    # timeout is set.
4456
4457    def setUp(self):
4458        super().setUp()
4459        self.serv.settimeout(self.timeout)
4460
4461    def checkInterruptedRecv(self, func, *args, **kwargs):
4462        # Check that func(*args, **kwargs) raises
4463        # errno of EINTR when interrupted by a signal.
4464        try:
4465            self.setAlarm(self.alarm_time)
4466            with self.assertRaises(ZeroDivisionError) as cm:
4467                func(*args, **kwargs)
4468        finally:
4469            self.setAlarm(0)
4470
4471    def testInterruptedRecvTimeout(self):
4472        self.checkInterruptedRecv(self.serv.recv, 1024)
4473
4474    def testInterruptedRecvIntoTimeout(self):
4475        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
4476
4477    def testInterruptedRecvfromTimeout(self):
4478        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
4479
4480    def testInterruptedRecvfromIntoTimeout(self):
4481        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
4482
4483    @requireAttrs(socket.socket, "recvmsg")
4484    def testInterruptedRecvmsgTimeout(self):
4485        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
4486
4487    @requireAttrs(socket.socket, "recvmsg_into")
4488    def testInterruptedRecvmsgIntoTimeout(self):
4489        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
4490
4491
4492# Require siginterrupt() in order to ensure that system calls are
4493# interrupted by default.
4494@requireAttrs(signal, "siginterrupt")
4495@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4496                     "Don't have signal.alarm or signal.setitimer")
4497class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
4498                                 ThreadSafeCleanupTestCase,
4499                                 SocketListeningTestMixin, TCPTestBase):
4500    # Test interrupting the interruptible send*() methods with signals
4501    # when a timeout is set.
4502
4503    def setUp(self):
4504        super().setUp()
4505        self.serv_conn = self.newSocket()
4506        self.addCleanup(self.serv_conn.close)
4507        # Use a thread to complete the connection, but wait for it to
4508        # terminate before running the test, so that there is only one
4509        # thread to accept the signal.
4510        cli_thread = threading.Thread(target=self.doConnect)
4511        cli_thread.start()
4512        self.cli_conn, addr = self.serv.accept()
4513        self.addCleanup(self.cli_conn.close)
4514        cli_thread.join()
4515        self.serv_conn.settimeout(self.timeout)
4516
4517    def doConnect(self):
4518        self.serv_conn.connect(self.serv_addr)
4519
4520    def checkInterruptedSend(self, func, *args, **kwargs):
4521        # Check that func(*args, **kwargs), run in a loop, raises
4522        # OSError with an errno of EINTR when interrupted by a
4523        # signal.
4524        try:
4525            with self.assertRaises(ZeroDivisionError) as cm:
4526                while True:
4527                    self.setAlarm(self.alarm_time)
4528                    func(*args, **kwargs)
4529        finally:
4530            self.setAlarm(0)
4531
4532    # Issue #12958: The following tests have problems on OS X prior to 10.7
4533    @support.requires_mac_ver(10, 7)
4534    def testInterruptedSendTimeout(self):
4535        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
4536
4537    @support.requires_mac_ver(10, 7)
4538    def testInterruptedSendtoTimeout(self):
4539        # Passing an actual address here as Python's wrapper for
4540        # sendto() doesn't allow passing a zero-length one; POSIX
4541        # requires that the address is ignored since the socket is
4542        # connection-mode, however.
4543        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
4544                                  self.serv_addr)
4545
4546    @support.requires_mac_ver(10, 7)
4547    @requireAttrs(socket.socket, "sendmsg")
4548    def testInterruptedSendmsgTimeout(self):
4549        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
4550
4551
4552class TCPCloserTest(ThreadedTCPSocketTest):
4553
4554    def testClose(self):
4555        conn, addr = self.serv.accept()
4556        conn.close()
4557
4558        sd = self.cli
4559        read, write, err = select.select([sd], [], [], 1.0)
4560        self.assertEqual(read, [sd])
4561        self.assertEqual(sd.recv(1), b'')
4562
4563        # Calling close() many times should be safe.
4564        conn.close()
4565        conn.close()
4566
4567    def _testClose(self):
4568        self.cli.connect((HOST, self.port))
4569        time.sleep(1.0)
4570
4571
4572class BasicSocketPairTest(SocketPairTest):
4573
4574    def __init__(self, methodName='runTest'):
4575        SocketPairTest.__init__(self, methodName=methodName)
4576
4577    def _check_defaults(self, sock):
4578        self.assertIsInstance(sock, socket.socket)
4579        if hasattr(socket, 'AF_UNIX'):
4580            self.assertEqual(sock.family, socket.AF_UNIX)
4581        else:
4582            self.assertEqual(sock.family, socket.AF_INET)
4583        self.assertEqual(sock.type, socket.SOCK_STREAM)
4584        self.assertEqual(sock.proto, 0)
4585
4586    def _testDefaults(self):
4587        self._check_defaults(self.cli)
4588
4589    def testDefaults(self):
4590        self._check_defaults(self.serv)
4591
4592    def testRecv(self):
4593        msg = self.serv.recv(1024)
4594        self.assertEqual(msg, MSG)
4595
4596    def _testRecv(self):
4597        self.cli.send(MSG)
4598
4599    def testSend(self):
4600        self.serv.send(MSG)
4601
4602    def _testSend(self):
4603        msg = self.cli.recv(1024)
4604        self.assertEqual(msg, MSG)
4605
4606
4607class NonBlockingTCPTests(ThreadedTCPSocketTest):
4608
4609    def __init__(self, methodName='runTest'):
4610        self.event = threading.Event()
4611        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
4612
4613    def assert_sock_timeout(self, sock, timeout):
4614        self.assertEqual(self.serv.gettimeout(), timeout)
4615
4616        blocking = (timeout != 0.0)
4617        self.assertEqual(sock.getblocking(), blocking)
4618
4619        if fcntl is not None:
4620            # When a Python socket has a non-zero timeout, it's switched
4621            # internally to a non-blocking mode. Later, sock.sendall(),
4622            # sock.recv(), and other socket operations use a select() call and
4623            # handle EWOULDBLOCK/EGAIN on all socket operations. That's how
4624            # timeouts are enforced.
4625            fd_blocking = (timeout is None)
4626
4627            flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK)
4628            self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking)
4629
4630    def testSetBlocking(self):
4631        # Test setblocking() and settimeout() methods
4632        self.serv.setblocking(True)
4633        self.assert_sock_timeout(self.serv, None)
4634
4635        self.serv.setblocking(False)
4636        self.assert_sock_timeout(self.serv, 0.0)
4637
4638        self.serv.settimeout(None)
4639        self.assert_sock_timeout(self.serv, None)
4640
4641        self.serv.settimeout(0)
4642        self.assert_sock_timeout(self.serv, 0)
4643
4644        self.serv.settimeout(10)
4645        self.assert_sock_timeout(self.serv, 10)
4646
4647        self.serv.settimeout(0)
4648        self.assert_sock_timeout(self.serv, 0)
4649
4650    def _testSetBlocking(self):
4651        pass
4652
4653    @support.cpython_only
4654    def testSetBlocking_overflow(self):
4655        # Issue 15989
4656        import _testcapi
4657        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
4658            self.skipTest('needs UINT_MAX < ULONG_MAX')
4659
4660        self.serv.setblocking(False)
4661        self.assertEqual(self.serv.gettimeout(), 0.0)
4662
4663        self.serv.setblocking(_testcapi.UINT_MAX + 1)
4664        self.assertIsNone(self.serv.gettimeout())
4665
4666    _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
4667
4668    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
4669                         'test needs socket.SOCK_NONBLOCK')
4670    @support.requires_linux_version(2, 6, 28)
4671    def testInitNonBlocking(self):
4672        # create a socket with SOCK_NONBLOCK
4673        self.serv.close()
4674        self.serv = socket.socket(socket.AF_INET,
4675                                  socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
4676        self.assert_sock_timeout(self.serv, 0)
4677
4678    def _testInitNonBlocking(self):
4679        pass
4680
4681    def testInheritFlagsBlocking(self):
4682        # bpo-7995: accept() on a listening socket with a timeout and the
4683        # default timeout is None, the resulting socket must be blocking.
4684        with socket_setdefaulttimeout(None):
4685            self.serv.settimeout(10)
4686            conn, addr = self.serv.accept()
4687            self.addCleanup(conn.close)
4688            self.assertIsNone(conn.gettimeout())
4689
4690    def _testInheritFlagsBlocking(self):
4691        self.cli.connect((HOST, self.port))
4692
4693    def testInheritFlagsTimeout(self):
4694        # bpo-7995: accept() on a listening socket with a timeout and the
4695        # default timeout is None, the resulting socket must inherit
4696        # the default timeout.
4697        default_timeout = 20.0
4698        with socket_setdefaulttimeout(default_timeout):
4699            self.serv.settimeout(10)
4700            conn, addr = self.serv.accept()
4701            self.addCleanup(conn.close)
4702            self.assertEqual(conn.gettimeout(), default_timeout)
4703
4704    def _testInheritFlagsTimeout(self):
4705        self.cli.connect((HOST, self.port))
4706
4707    def testAccept(self):
4708        # Testing non-blocking accept
4709        self.serv.setblocking(False)
4710
4711        # connect() didn't start: non-blocking accept() fails
4712        start_time = time.monotonic()
4713        with self.assertRaises(BlockingIOError):
4714            conn, addr = self.serv.accept()
4715        dt = time.monotonic() - start_time
4716        self.assertLess(dt, 1.0)
4717
4718        self.event.set()
4719
4720        read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT)
4721        if self.serv not in read:
4722            self.fail("Error trying to do accept after select.")
4723
4724        # connect() completed: non-blocking accept() doesn't block
4725        conn, addr = self.serv.accept()
4726        self.addCleanup(conn.close)
4727        self.assertIsNone(conn.gettimeout())
4728
4729    def _testAccept(self):
4730        # don't connect before event is set to check
4731        # that non-blocking accept() raises BlockingIOError
4732        self.event.wait()
4733
4734        self.cli.connect((HOST, self.port))
4735
4736    def testRecv(self):
4737        # Testing non-blocking recv
4738        conn, addr = self.serv.accept()
4739        self.addCleanup(conn.close)
4740        conn.setblocking(False)
4741
4742        # the server didn't send data yet: non-blocking recv() fails
4743        with self.assertRaises(BlockingIOError):
4744            msg = conn.recv(len(MSG))
4745
4746        self.event.set()
4747
4748        read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT)
4749        if conn not in read:
4750            self.fail("Error during select call to non-blocking socket.")
4751
4752        # the server sent data yet: non-blocking recv() doesn't block
4753        msg = conn.recv(len(MSG))
4754        self.assertEqual(msg, MSG)
4755
4756    def _testRecv(self):
4757        self.cli.connect((HOST, self.port))
4758
4759        # don't send anything before event is set to check
4760        # that non-blocking recv() raises BlockingIOError
4761        self.event.wait()
4762
4763        # send data: recv() will no longer block
4764        self.cli.sendall(MSG)
4765
4766
4767class FileObjectClassTestCase(SocketConnectedTest):
4768    """Unit tests for the object returned by socket.makefile()
4769
4770    self.read_file is the io object returned by makefile() on
4771    the client connection.  You can read from this file to
4772    get output from the server.
4773
4774    self.write_file is the io object returned by makefile() on the
4775    server connection.  You can write to this file to send output
4776    to the client.
4777    """
4778
4779    bufsize = -1 # Use default buffer size
4780    encoding = 'utf-8'
4781    errors = 'strict'
4782    newline = None
4783
4784    read_mode = 'rb'
4785    read_msg = MSG
4786    write_mode = 'wb'
4787    write_msg = MSG
4788
4789    def __init__(self, methodName='runTest'):
4790        SocketConnectedTest.__init__(self, methodName=methodName)
4791
4792    def setUp(self):
4793        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4794            threading.Event() for i in range(4)]
4795        SocketConnectedTest.setUp(self)
4796        self.read_file = self.cli_conn.makefile(
4797            self.read_mode, self.bufsize,
4798            encoding = self.encoding,
4799            errors = self.errors,
4800            newline = self.newline)
4801
4802    def tearDown(self):
4803        self.serv_finished.set()
4804        self.read_file.close()
4805        self.assertTrue(self.read_file.closed)
4806        self.read_file = None
4807        SocketConnectedTest.tearDown(self)
4808
4809    def clientSetUp(self):
4810        SocketConnectedTest.clientSetUp(self)
4811        self.write_file = self.serv_conn.makefile(
4812            self.write_mode, self.bufsize,
4813            encoding = self.encoding,
4814            errors = self.errors,
4815            newline = self.newline)
4816
4817    def clientTearDown(self):
4818        self.cli_finished.set()
4819        self.write_file.close()
4820        self.assertTrue(self.write_file.closed)
4821        self.write_file = None
4822        SocketConnectedTest.clientTearDown(self)
4823
4824    def testReadAfterTimeout(self):
4825        # Issue #7322: A file object must disallow further reads
4826        # after a timeout has occurred.
4827        self.cli_conn.settimeout(1)
4828        self.read_file.read(3)
4829        # First read raises a timeout
4830        self.assertRaises(socket.timeout, self.read_file.read, 1)
4831        # Second read is disallowed
4832        with self.assertRaises(OSError) as ctx:
4833            self.read_file.read(1)
4834        self.assertIn("cannot read from timed out object", str(ctx.exception))
4835
4836    def _testReadAfterTimeout(self):
4837        self.write_file.write(self.write_msg[0:3])
4838        self.write_file.flush()
4839        self.serv_finished.wait()
4840
4841    def testSmallRead(self):
4842        # Performing small file read test
4843        first_seg = self.read_file.read(len(self.read_msg)-3)
4844        second_seg = self.read_file.read(3)
4845        msg = first_seg + second_seg
4846        self.assertEqual(msg, self.read_msg)
4847
4848    def _testSmallRead(self):
4849        self.write_file.write(self.write_msg)
4850        self.write_file.flush()
4851
4852    def testFullRead(self):
4853        # read until EOF
4854        msg = self.read_file.read()
4855        self.assertEqual(msg, self.read_msg)
4856
4857    def _testFullRead(self):
4858        self.write_file.write(self.write_msg)
4859        self.write_file.close()
4860
4861    def testUnbufferedRead(self):
4862        # Performing unbuffered file read test
4863        buf = type(self.read_msg)()
4864        while 1:
4865            char = self.read_file.read(1)
4866            if not char:
4867                break
4868            buf += char
4869        self.assertEqual(buf, self.read_msg)
4870
4871    def _testUnbufferedRead(self):
4872        self.write_file.write(self.write_msg)
4873        self.write_file.flush()
4874
4875    def testReadline(self):
4876        # Performing file readline test
4877        line = self.read_file.readline()
4878        self.assertEqual(line, self.read_msg)
4879
4880    def _testReadline(self):
4881        self.write_file.write(self.write_msg)
4882        self.write_file.flush()
4883
4884    def testCloseAfterMakefile(self):
4885        # The file returned by makefile should keep the socket open.
4886        self.cli_conn.close()
4887        # read until EOF
4888        msg = self.read_file.read()
4889        self.assertEqual(msg, self.read_msg)
4890
4891    def _testCloseAfterMakefile(self):
4892        self.write_file.write(self.write_msg)
4893        self.write_file.flush()
4894
4895    def testMakefileAfterMakefileClose(self):
4896        self.read_file.close()
4897        msg = self.cli_conn.recv(len(MSG))
4898        if isinstance(self.read_msg, str):
4899            msg = msg.decode()
4900        self.assertEqual(msg, self.read_msg)
4901
4902    def _testMakefileAfterMakefileClose(self):
4903        self.write_file.write(self.write_msg)
4904        self.write_file.flush()
4905
4906    def testClosedAttr(self):
4907        self.assertTrue(not self.read_file.closed)
4908
4909    def _testClosedAttr(self):
4910        self.assertTrue(not self.write_file.closed)
4911
4912    def testAttributes(self):
4913        self.assertEqual(self.read_file.mode, self.read_mode)
4914        self.assertEqual(self.read_file.name, self.cli_conn.fileno())
4915
4916    def _testAttributes(self):
4917        self.assertEqual(self.write_file.mode, self.write_mode)
4918        self.assertEqual(self.write_file.name, self.serv_conn.fileno())
4919
4920    def testRealClose(self):
4921        self.read_file.close()
4922        self.assertRaises(ValueError, self.read_file.fileno)
4923        self.cli_conn.close()
4924        self.assertRaises(OSError, self.cli_conn.getsockname)
4925
4926    def _testRealClose(self):
4927        pass
4928
4929
4930class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
4931
4932    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
4933
4934    In this case (and in this case only), it should be possible to
4935    create a file object, read a line from it, create another file
4936    object, read another line from it, without loss of data in the
4937    first file object's buffer.  Note that http.client relies on this
4938    when reading multiple requests from the same socket."""
4939
4940    bufsize = 0 # Use unbuffered mode
4941
4942    def testUnbufferedReadline(self):
4943        # Read a line, create a new file object, read another line with it
4944        line = self.read_file.readline() # first line
4945        self.assertEqual(line, b"A. " + self.write_msg) # first line
4946        self.read_file = self.cli_conn.makefile('rb', 0)
4947        line = self.read_file.readline() # second line
4948        self.assertEqual(line, b"B. " + self.write_msg) # second line
4949
4950    def _testUnbufferedReadline(self):
4951        self.write_file.write(b"A. " + self.write_msg)
4952        self.write_file.write(b"B. " + self.write_msg)
4953        self.write_file.flush()
4954
4955    def testMakefileClose(self):
4956        # The file returned by makefile should keep the socket open...
4957        self.cli_conn.close()
4958        msg = self.cli_conn.recv(1024)
4959        self.assertEqual(msg, self.read_msg)
4960        # ...until the file is itself closed
4961        self.read_file.close()
4962        self.assertRaises(OSError, self.cli_conn.recv, 1024)
4963
4964    def _testMakefileClose(self):
4965        self.write_file.write(self.write_msg)
4966        self.write_file.flush()
4967
4968    def testMakefileCloseSocketDestroy(self):
4969        refcount_before = sys.getrefcount(self.cli_conn)
4970        self.read_file.close()
4971        refcount_after = sys.getrefcount(self.cli_conn)
4972        self.assertEqual(refcount_before - 1, refcount_after)
4973
4974    def _testMakefileCloseSocketDestroy(self):
4975        pass
4976
4977    # Non-blocking ops
4978    # NOTE: to set `read_file` as non-blocking, we must call
4979    # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
4980
4981    def testSmallReadNonBlocking(self):
4982        self.cli_conn.setblocking(False)
4983        self.assertEqual(self.read_file.readinto(bytearray(10)), None)
4984        self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
4985        self.evt1.set()
4986        self.evt2.wait(1.0)
4987        first_seg = self.read_file.read(len(self.read_msg) - 3)
4988        if first_seg is None:
4989            # Data not arrived (can happen under Windows), wait a bit
4990            time.sleep(0.5)
4991            first_seg = self.read_file.read(len(self.read_msg) - 3)
4992        buf = bytearray(10)
4993        n = self.read_file.readinto(buf)
4994        self.assertEqual(n, 3)
4995        msg = first_seg + buf[:n]
4996        self.assertEqual(msg, self.read_msg)
4997        self.assertEqual(self.read_file.readinto(bytearray(16)), None)
4998        self.assertEqual(self.read_file.read(1), None)
4999
5000    def _testSmallReadNonBlocking(self):
5001        self.evt1.wait(1.0)
5002        self.write_file.write(self.write_msg)
5003        self.write_file.flush()
5004        self.evt2.set()
5005        # Avoid closing the socket before the server test has finished,
5006        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
5007        self.serv_finished.wait(5.0)
5008
5009    def testWriteNonBlocking(self):
5010        self.cli_finished.wait(5.0)
5011        # The client thread can't skip directly - the SkipTest exception
5012        # would appear as a failure.
5013        if self.serv_skipped:
5014            self.skipTest(self.serv_skipped)
5015
5016    def _testWriteNonBlocking(self):
5017        self.serv_skipped = None
5018        self.serv_conn.setblocking(False)
5019        # Try to saturate the socket buffer pipe with repeated large writes.
5020        BIG = b"x" * support.SOCK_MAX_SIZE
5021        LIMIT = 10
5022        # The first write() succeeds since a chunk of data can be buffered
5023        n = self.write_file.write(BIG)
5024        self.assertGreater(n, 0)
5025        for i in range(LIMIT):
5026            n = self.write_file.write(BIG)
5027            if n is None:
5028                # Succeeded
5029                break
5030            self.assertGreater(n, 0)
5031        else:
5032            # Let us know that this test didn't manage to establish
5033            # the expected conditions. This is not a failure in itself but,
5034            # if it happens repeatedly, the test should be fixed.
5035            self.serv_skipped = "failed to saturate the socket buffer"
5036
5037
5038class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5039
5040    bufsize = 1 # Default-buffered for reading; line-buffered for writing
5041
5042
5043class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5044
5045    bufsize = 2 # Exercise the buffering code
5046
5047
5048class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
5049    """Tests for socket.makefile() in text mode (rather than binary)"""
5050
5051    read_mode = 'r'
5052    read_msg = MSG.decode('utf-8')
5053    write_mode = 'wb'
5054    write_msg = MSG
5055    newline = ''
5056
5057
5058class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
5059    """Tests for socket.makefile() in text mode (rather than binary)"""
5060
5061    read_mode = 'rb'
5062    read_msg = MSG
5063    write_mode = 'w'
5064    write_msg = MSG.decode('utf-8')
5065    newline = ''
5066
5067
5068class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
5069    """Tests for socket.makefile() in text mode (rather than binary)"""
5070
5071    read_mode = 'r'
5072    read_msg = MSG.decode('utf-8')
5073    write_mode = 'w'
5074    write_msg = MSG.decode('utf-8')
5075    newline = ''
5076
5077
5078class NetworkConnectionTest(object):
5079    """Prove network connection."""
5080
5081    def clientSetUp(self):
5082        # We're inherited below by BasicTCPTest2, which also inherits
5083        # BasicTCPTest, which defines self.port referenced below.
5084        self.cli = socket.create_connection((HOST, self.port))
5085        self.serv_conn = self.cli
5086
5087class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
5088    """Tests that NetworkConnection does not break existing TCP functionality.
5089    """
5090
5091class NetworkConnectionNoServer(unittest.TestCase):
5092
5093    class MockSocket(socket.socket):
5094        def connect(self, *args):
5095            raise socket.timeout('timed out')
5096
5097    @contextlib.contextmanager
5098    def mocked_socket_module(self):
5099        """Return a socket which times out on connect"""
5100        old_socket = socket.socket
5101        socket.socket = self.MockSocket
5102        try:
5103            yield
5104        finally:
5105            socket.socket = old_socket
5106
5107    def test_connect(self):
5108        port = socket_helper.find_unused_port()
5109        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
5110        self.addCleanup(cli.close)
5111        with self.assertRaises(OSError) as cm:
5112            cli.connect((HOST, port))
5113        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
5114
5115    def test_create_connection(self):
5116        # Issue #9792: errors raised by create_connection() should have
5117        # a proper errno attribute.
5118        port = socket_helper.find_unused_port()
5119        with self.assertRaises(OSError) as cm:
5120            socket.create_connection((HOST, port))
5121
5122        # Issue #16257: create_connection() calls getaddrinfo() against
5123        # 'localhost'.  This may result in an IPV6 addr being returned
5124        # as well as an IPV4 one:
5125        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
5126        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
5127        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
5128        #
5129        # create_connection() enumerates through all the addresses returned
5130        # and if it doesn't successfully bind to any of them, it propagates
5131        # the last exception it encountered.
5132        #
5133        # On Solaris, ENETUNREACH is returned in this circumstance instead
5134        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
5135        # expected errnos.
5136        expected_errnos = socket_helper.get_socket_conn_refused_errs()
5137        self.assertIn(cm.exception.errno, expected_errnos)
5138
5139    def test_create_connection_timeout(self):
5140        # Issue #9792: create_connection() should not recast timeout errors
5141        # as generic socket errors.
5142        with self.mocked_socket_module():
5143            try:
5144                socket.create_connection((HOST, 1234))
5145            except socket.timeout:
5146                pass
5147            except OSError as exc:
5148                if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
5149                    raise
5150            else:
5151                self.fail('socket.timeout not raised')
5152
5153
5154class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
5155
5156    def __init__(self, methodName='runTest'):
5157        SocketTCPTest.__init__(self, methodName=methodName)
5158        ThreadableTest.__init__(self)
5159
5160    def clientSetUp(self):
5161        self.source_port = socket_helper.find_unused_port()
5162
5163    def clientTearDown(self):
5164        self.cli.close()
5165        self.cli = None
5166        ThreadableTest.clientTearDown(self)
5167
5168    def _justAccept(self):
5169        conn, addr = self.serv.accept()
5170        conn.close()
5171
5172    testFamily = _justAccept
5173    def _testFamily(self):
5174        self.cli = socket.create_connection((HOST, self.port),
5175                            timeout=support.LOOPBACK_TIMEOUT)
5176        self.addCleanup(self.cli.close)
5177        self.assertEqual(self.cli.family, 2)
5178
5179    testSourceAddress = _justAccept
5180    def _testSourceAddress(self):
5181        self.cli = socket.create_connection((HOST, self.port),
5182                            timeout=support.LOOPBACK_TIMEOUT,
5183                            source_address=('', self.source_port))
5184        self.addCleanup(self.cli.close)
5185        self.assertEqual(self.cli.getsockname()[1], self.source_port)
5186        # The port number being used is sufficient to show that the bind()
5187        # call happened.
5188
5189    testTimeoutDefault = _justAccept
5190    def _testTimeoutDefault(self):
5191        # passing no explicit timeout uses socket's global default
5192        self.assertTrue(socket.getdefaulttimeout() is None)
5193        socket.setdefaulttimeout(42)
5194        try:
5195            self.cli = socket.create_connection((HOST, self.port))
5196            self.addCleanup(self.cli.close)
5197        finally:
5198            socket.setdefaulttimeout(None)
5199        self.assertEqual(self.cli.gettimeout(), 42)
5200
5201    testTimeoutNone = _justAccept
5202    def _testTimeoutNone(self):
5203        # None timeout means the same as sock.settimeout(None)
5204        self.assertTrue(socket.getdefaulttimeout() is None)
5205        socket.setdefaulttimeout(30)
5206        try:
5207            self.cli = socket.create_connection((HOST, self.port), timeout=None)
5208            self.addCleanup(self.cli.close)
5209        finally:
5210            socket.setdefaulttimeout(None)
5211        self.assertEqual(self.cli.gettimeout(), None)
5212
5213    testTimeoutValueNamed = _justAccept
5214    def _testTimeoutValueNamed(self):
5215        self.cli = socket.create_connection((HOST, self.port), timeout=30)
5216        self.assertEqual(self.cli.gettimeout(), 30)
5217
5218    testTimeoutValueNonamed = _justAccept
5219    def _testTimeoutValueNonamed(self):
5220        self.cli = socket.create_connection((HOST, self.port), 30)
5221        self.addCleanup(self.cli.close)
5222        self.assertEqual(self.cli.gettimeout(), 30)
5223
5224
5225class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
5226
5227    def __init__(self, methodName='runTest'):
5228        SocketTCPTest.__init__(self, methodName=methodName)
5229        ThreadableTest.__init__(self)
5230
5231    def clientSetUp(self):
5232        pass
5233
5234    def clientTearDown(self):
5235        self.cli.close()
5236        self.cli = None
5237        ThreadableTest.clientTearDown(self)
5238
5239    def testInsideTimeout(self):
5240        conn, addr = self.serv.accept()
5241        self.addCleanup(conn.close)
5242        time.sleep(3)
5243        conn.send(b"done!")
5244    testOutsideTimeout = testInsideTimeout
5245
5246    def _testInsideTimeout(self):
5247        self.cli = sock = socket.create_connection((HOST, self.port))
5248        data = sock.recv(5)
5249        self.assertEqual(data, b"done!")
5250
5251    def _testOutsideTimeout(self):
5252        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
5253        self.assertRaises(socket.timeout, lambda: sock.recv(5))
5254
5255
5256class TCPTimeoutTest(SocketTCPTest):
5257
5258    def testTCPTimeout(self):
5259        def raise_timeout(*args, **kwargs):
5260            self.serv.settimeout(1.0)
5261            self.serv.accept()
5262        self.assertRaises(socket.timeout, raise_timeout,
5263                              "Error generating a timeout exception (TCP)")
5264
5265    def testTimeoutZero(self):
5266        ok = False
5267        try:
5268            self.serv.settimeout(0.0)
5269            foo = self.serv.accept()
5270        except socket.timeout:
5271            self.fail("caught timeout instead of error (TCP)")
5272        except OSError:
5273            ok = True
5274        except:
5275            self.fail("caught unexpected exception (TCP)")
5276        if not ok:
5277            self.fail("accept() returned success when we did not expect it")
5278
5279    @unittest.skipUnless(hasattr(signal, 'alarm'),
5280                         'test needs signal.alarm()')
5281    def testInterruptedTimeout(self):
5282        # XXX I don't know how to do this test on MSWindows or any other
5283        # platform that doesn't support signal.alarm() or os.kill(), though
5284        # the bug should have existed on all platforms.
5285        self.serv.settimeout(5.0)   # must be longer than alarm
5286        class Alarm(Exception):
5287            pass
5288        def alarm_handler(signal, frame):
5289            raise Alarm
5290        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
5291        try:
5292            try:
5293                signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
5294                foo = self.serv.accept()
5295            except socket.timeout:
5296                self.fail("caught timeout instead of Alarm")
5297            except Alarm:
5298                pass
5299            except:
5300                self.fail("caught other exception instead of Alarm:"
5301                          " %s(%s):\n%s" %
5302                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
5303            else:
5304                self.fail("nothing caught")
5305            finally:
5306                signal.alarm(0)         # shut off alarm
5307        except Alarm:
5308            self.fail("got Alarm in wrong place")
5309        finally:
5310            # no alarm can be pending.  Safe to restore old handler.
5311            signal.signal(signal.SIGALRM, old_alarm)
5312
5313class UDPTimeoutTest(SocketUDPTest):
5314
5315    def testUDPTimeout(self):
5316        def raise_timeout(*args, **kwargs):
5317            self.serv.settimeout(1.0)
5318            self.serv.recv(1024)
5319        self.assertRaises(socket.timeout, raise_timeout,
5320                              "Error generating a timeout exception (UDP)")
5321
5322    def testTimeoutZero(self):
5323        ok = False
5324        try:
5325            self.serv.settimeout(0.0)
5326            foo = self.serv.recv(1024)
5327        except socket.timeout:
5328            self.fail("caught timeout instead of error (UDP)")
5329        except OSError:
5330            ok = True
5331        except:
5332            self.fail("caught unexpected exception (UDP)")
5333        if not ok:
5334            self.fail("recv() returned success when we did not expect it")
5335
5336@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
5337          'UDPLITE sockets required for this test.')
5338class UDPLITETimeoutTest(SocketUDPLITETest):
5339
5340    def testUDPLITETimeout(self):
5341        def raise_timeout(*args, **kwargs):
5342            self.serv.settimeout(1.0)
5343            self.serv.recv(1024)
5344        self.assertRaises(socket.timeout, raise_timeout,
5345                              "Error generating a timeout exception (UDPLITE)")
5346
5347    def testTimeoutZero(self):
5348        ok = False
5349        try:
5350            self.serv.settimeout(0.0)
5351            foo = self.serv.recv(1024)
5352        except socket.timeout:
5353            self.fail("caught timeout instead of error (UDPLITE)")
5354        except OSError:
5355            ok = True
5356        except:
5357            self.fail("caught unexpected exception (UDPLITE)")
5358        if not ok:
5359            self.fail("recv() returned success when we did not expect it")
5360
5361class TestExceptions(unittest.TestCase):
5362
5363    def testExceptionTree(self):
5364        self.assertTrue(issubclass(OSError, Exception))
5365        self.assertTrue(issubclass(socket.herror, OSError))
5366        self.assertTrue(issubclass(socket.gaierror, OSError))
5367        self.assertTrue(issubclass(socket.timeout, OSError))
5368
5369    def test_setblocking_invalidfd(self):
5370        # Regression test for issue #28471
5371
5372        sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
5373        sock = socket.socket(
5374            socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
5375        sock0.close()
5376        self.addCleanup(sock.detach)
5377
5378        with self.assertRaises(OSError):
5379            sock.setblocking(False)
5380
5381
5382@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
5383class TestLinuxAbstractNamespace(unittest.TestCase):
5384
5385    UNIX_PATH_MAX = 108
5386
5387    def testLinuxAbstractNamespace(self):
5388        address = b"\x00python-test-hello\x00\xff"
5389        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
5390            s1.bind(address)
5391            s1.listen()
5392            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
5393                s2.connect(s1.getsockname())
5394                with s1.accept()[0] as s3:
5395                    self.assertEqual(s1.getsockname(), address)
5396                    self.assertEqual(s2.getpeername(), address)
5397
5398    def testMaxName(self):
5399        address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
5400        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5401            s.bind(address)
5402            self.assertEqual(s.getsockname(), address)
5403
5404    def testNameOverflow(self):
5405        address = "\x00" + "h" * self.UNIX_PATH_MAX
5406        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5407            self.assertRaises(OSError, s.bind, address)
5408
5409    def testStrName(self):
5410        # Check that an abstract name can be passed as a string.
5411        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5412        try:
5413            s.bind("\x00python\x00test\x00")
5414            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5415        finally:
5416            s.close()
5417
5418    def testBytearrayName(self):
5419        # Check that an abstract name can be passed as a bytearray.
5420        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5421            s.bind(bytearray(b"\x00python\x00test\x00"))
5422            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5423
5424@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
5425class TestUnixDomain(unittest.TestCase):
5426
5427    def setUp(self):
5428        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5429
5430    def tearDown(self):
5431        self.sock.close()
5432
5433    def encoded(self, path):
5434        # Return the given path encoded in the file system encoding,
5435        # or skip the test if this is not possible.
5436        try:
5437            return os.fsencode(path)
5438        except UnicodeEncodeError:
5439            self.skipTest(
5440                "Pathname {0!a} cannot be represented in file "
5441                "system encoding {1!r}".format(
5442                    path, sys.getfilesystemencoding()))
5443
5444    def bind(self, sock, path):
5445        # Bind the socket
5446        try:
5447            socket_helper.bind_unix_socket(sock, path)
5448        except OSError as e:
5449            if str(e) == "AF_UNIX path too long":
5450                self.skipTest(
5451                    "Pathname {0!a} is too long to serve as an AF_UNIX path"
5452                    .format(path))
5453            else:
5454                raise
5455
5456    def testUnbound(self):
5457        # Issue #30205 (note getsockname() can return None on OS X)
5458        self.assertIn(self.sock.getsockname(), ('', None))
5459
5460    def testStrAddr(self):
5461        # Test binding to and retrieving a normal string pathname.
5462        path = os.path.abspath(support.TESTFN)
5463        self.bind(self.sock, path)
5464        self.addCleanup(support.unlink, path)
5465        self.assertEqual(self.sock.getsockname(), path)
5466
5467    def testBytesAddr(self):
5468        # Test binding to a bytes pathname.
5469        path = os.path.abspath(support.TESTFN)
5470        self.bind(self.sock, self.encoded(path))
5471        self.addCleanup(support.unlink, path)
5472        self.assertEqual(self.sock.getsockname(), path)
5473
5474    def testSurrogateescapeBind(self):
5475        # Test binding to a valid non-ASCII pathname, with the
5476        # non-ASCII bytes supplied using surrogateescape encoding.
5477        path = os.path.abspath(support.TESTFN_UNICODE)
5478        b = self.encoded(path)
5479        self.bind(self.sock, b.decode("ascii", "surrogateescape"))
5480        self.addCleanup(support.unlink, path)
5481        self.assertEqual(self.sock.getsockname(), path)
5482
5483    def testUnencodableAddr(self):
5484        # Test binding to a pathname that cannot be encoded in the
5485        # file system encoding.
5486        if support.TESTFN_UNENCODABLE is None:
5487            self.skipTest("No unencodable filename available")
5488        path = os.path.abspath(support.TESTFN_UNENCODABLE)
5489        self.bind(self.sock, path)
5490        self.addCleanup(support.unlink, path)
5491        self.assertEqual(self.sock.getsockname(), path)
5492
5493
5494class BufferIOTest(SocketConnectedTest):
5495    """
5496    Test the buffer versions of socket.recv() and socket.send().
5497    """
5498    def __init__(self, methodName='runTest'):
5499        SocketConnectedTest.__init__(self, methodName=methodName)
5500
5501    def testRecvIntoArray(self):
5502        buf = array.array("B", [0] * len(MSG))
5503        nbytes = self.cli_conn.recv_into(buf)
5504        self.assertEqual(nbytes, len(MSG))
5505        buf = buf.tobytes()
5506        msg = buf[:len(MSG)]
5507        self.assertEqual(msg, MSG)
5508
5509    def _testRecvIntoArray(self):
5510        buf = bytes(MSG)
5511        self.serv_conn.send(buf)
5512
5513    def testRecvIntoBytearray(self):
5514        buf = bytearray(1024)
5515        nbytes = self.cli_conn.recv_into(buf)
5516        self.assertEqual(nbytes, len(MSG))
5517        msg = buf[:len(MSG)]
5518        self.assertEqual(msg, MSG)
5519
5520    _testRecvIntoBytearray = _testRecvIntoArray
5521
5522    def testRecvIntoMemoryview(self):
5523        buf = bytearray(1024)
5524        nbytes = self.cli_conn.recv_into(memoryview(buf))
5525        self.assertEqual(nbytes, len(MSG))
5526        msg = buf[:len(MSG)]
5527        self.assertEqual(msg, MSG)
5528
5529    _testRecvIntoMemoryview = _testRecvIntoArray
5530
5531    def testRecvFromIntoArray(self):
5532        buf = array.array("B", [0] * len(MSG))
5533        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5534        self.assertEqual(nbytes, len(MSG))
5535        buf = buf.tobytes()
5536        msg = buf[:len(MSG)]
5537        self.assertEqual(msg, MSG)
5538
5539    def _testRecvFromIntoArray(self):
5540        buf = bytes(MSG)
5541        self.serv_conn.send(buf)
5542
5543    def testRecvFromIntoBytearray(self):
5544        buf = bytearray(1024)
5545        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5546        self.assertEqual(nbytes, len(MSG))
5547        msg = buf[:len(MSG)]
5548        self.assertEqual(msg, MSG)
5549
5550    _testRecvFromIntoBytearray = _testRecvFromIntoArray
5551
5552    def testRecvFromIntoMemoryview(self):
5553        buf = bytearray(1024)
5554        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
5555        self.assertEqual(nbytes, len(MSG))
5556        msg = buf[:len(MSG)]
5557        self.assertEqual(msg, MSG)
5558
5559    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
5560
5561    def testRecvFromIntoSmallBuffer(self):
5562        # See issue #20246.
5563        buf = bytearray(8)
5564        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
5565
5566    def _testRecvFromIntoSmallBuffer(self):
5567        self.serv_conn.send(MSG)
5568
5569    def testRecvFromIntoEmptyBuffer(self):
5570        buf = bytearray()
5571        self.cli_conn.recvfrom_into(buf)
5572        self.cli_conn.recvfrom_into(buf, 0)
5573
5574    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
5575
5576
5577TIPC_STYPE = 2000
5578TIPC_LOWER = 200
5579TIPC_UPPER = 210
5580
5581def isTipcAvailable():
5582    """Check if the TIPC module is loaded
5583
5584    The TIPC module is not loaded automatically on Ubuntu and probably
5585    other Linux distros.
5586    """
5587    if not hasattr(socket, "AF_TIPC"):
5588        return False
5589    try:
5590        f = open("/proc/modules")
5591    except (FileNotFoundError, IsADirectoryError, PermissionError):
5592        # It's ok if the file does not exist, is a directory or if we
5593        # have not the permission to read it.
5594        return False
5595    with f:
5596        for line in f:
5597            if line.startswith("tipc "):
5598                return True
5599    return False
5600
5601@unittest.skipUnless(isTipcAvailable(),
5602                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5603class TIPCTest(unittest.TestCase):
5604    def testRDM(self):
5605        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5606        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5607        self.addCleanup(srv.close)
5608        self.addCleanup(cli.close)
5609
5610        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5611        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5612                TIPC_LOWER, TIPC_UPPER)
5613        srv.bind(srvaddr)
5614
5615        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5616                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5617        cli.sendto(MSG, sendaddr)
5618
5619        msg, recvaddr = srv.recvfrom(1024)
5620
5621        self.assertEqual(cli.getsockname(), recvaddr)
5622        self.assertEqual(msg, MSG)
5623
5624
5625@unittest.skipUnless(isTipcAvailable(),
5626                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5627class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
5628    def __init__(self, methodName = 'runTest'):
5629        unittest.TestCase.__init__(self, methodName = methodName)
5630        ThreadableTest.__init__(self)
5631
5632    def setUp(self):
5633        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5634        self.addCleanup(self.srv.close)
5635        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5636        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5637                TIPC_LOWER, TIPC_UPPER)
5638        self.srv.bind(srvaddr)
5639        self.srv.listen()
5640        self.serverExplicitReady()
5641        self.conn, self.connaddr = self.srv.accept()
5642        self.addCleanup(self.conn.close)
5643
5644    def clientSetUp(self):
5645        # There is a hittable race between serverExplicitReady() and the
5646        # accept() call; sleep a little while to avoid it, otherwise
5647        # we could get an exception
5648        time.sleep(0.1)
5649        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5650        self.addCleanup(self.cli.close)
5651        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5652                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5653        self.cli.connect(addr)
5654        self.cliaddr = self.cli.getsockname()
5655
5656    def testStream(self):
5657        msg = self.conn.recv(1024)
5658        self.assertEqual(msg, MSG)
5659        self.assertEqual(self.cliaddr, self.connaddr)
5660
5661    def _testStream(self):
5662        self.cli.send(MSG)
5663        self.cli.close()
5664
5665
5666class ContextManagersTest(ThreadedTCPSocketTest):
5667
5668    def _testSocketClass(self):
5669        # base test
5670        with socket.socket() as sock:
5671            self.assertFalse(sock._closed)
5672        self.assertTrue(sock._closed)
5673        # close inside with block
5674        with socket.socket() as sock:
5675            sock.close()
5676        self.assertTrue(sock._closed)
5677        # exception inside with block
5678        with socket.socket() as sock:
5679            self.assertRaises(OSError, sock.sendall, b'foo')
5680        self.assertTrue(sock._closed)
5681
5682    def testCreateConnectionBase(self):
5683        conn, addr = self.serv.accept()
5684        self.addCleanup(conn.close)
5685        data = conn.recv(1024)
5686        conn.sendall(data)
5687
5688    def _testCreateConnectionBase(self):
5689        address = self.serv.getsockname()
5690        with socket.create_connection(address) as sock:
5691            self.assertFalse(sock._closed)
5692            sock.sendall(b'foo')
5693            self.assertEqual(sock.recv(1024), b'foo')
5694        self.assertTrue(sock._closed)
5695
5696    def testCreateConnectionClose(self):
5697        conn, addr = self.serv.accept()
5698        self.addCleanup(conn.close)
5699        data = conn.recv(1024)
5700        conn.sendall(data)
5701
5702    def _testCreateConnectionClose(self):
5703        address = self.serv.getsockname()
5704        with socket.create_connection(address) as sock:
5705            sock.close()
5706        self.assertTrue(sock._closed)
5707        self.assertRaises(OSError, sock.sendall, b'foo')
5708
5709
5710class InheritanceTest(unittest.TestCase):
5711    @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
5712                         "SOCK_CLOEXEC not defined")
5713    @support.requires_linux_version(2, 6, 28)
5714    def test_SOCK_CLOEXEC(self):
5715        with socket.socket(socket.AF_INET,
5716                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
5717            self.assertEqual(s.type, socket.SOCK_STREAM)
5718            self.assertFalse(s.get_inheritable())
5719
5720    def test_default_inheritable(self):
5721        sock = socket.socket()
5722        with sock:
5723            self.assertEqual(sock.get_inheritable(), False)
5724
5725    def test_dup(self):
5726        sock = socket.socket()
5727        with sock:
5728            newsock = sock.dup()
5729            sock.close()
5730            with newsock:
5731                self.assertEqual(newsock.get_inheritable(), False)
5732
5733    def test_set_inheritable(self):
5734        sock = socket.socket()
5735        with sock:
5736            sock.set_inheritable(True)
5737            self.assertEqual(sock.get_inheritable(), True)
5738
5739            sock.set_inheritable(False)
5740            self.assertEqual(sock.get_inheritable(), False)
5741
5742    @unittest.skipIf(fcntl is None, "need fcntl")
5743    def test_get_inheritable_cloexec(self):
5744        sock = socket.socket()
5745        with sock:
5746            fd = sock.fileno()
5747            self.assertEqual(sock.get_inheritable(), False)
5748
5749            # clear FD_CLOEXEC flag
5750            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
5751            flags &= ~fcntl.FD_CLOEXEC
5752            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
5753
5754            self.assertEqual(sock.get_inheritable(), True)
5755
5756    @unittest.skipIf(fcntl is None, "need fcntl")
5757    def test_set_inheritable_cloexec(self):
5758        sock = socket.socket()
5759        with sock:
5760            fd = sock.fileno()
5761            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5762                             fcntl.FD_CLOEXEC)
5763
5764            sock.set_inheritable(True)
5765            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5766                             0)
5767
5768
5769    def test_socketpair(self):
5770        s1, s2 = socket.socketpair()
5771        self.addCleanup(s1.close)
5772        self.addCleanup(s2.close)
5773        self.assertEqual(s1.get_inheritable(), False)
5774        self.assertEqual(s2.get_inheritable(), False)
5775
5776
5777@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
5778                     "SOCK_NONBLOCK not defined")
5779class NonblockConstantTest(unittest.TestCase):
5780    def checkNonblock(self, s, nonblock=True, timeout=0.0):
5781        if nonblock:
5782            self.assertEqual(s.type, socket.SOCK_STREAM)
5783            self.assertEqual(s.gettimeout(), timeout)
5784            self.assertTrue(
5785                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5786            if timeout == 0:
5787                # timeout == 0: means that getblocking() must be False.
5788                self.assertFalse(s.getblocking())
5789            else:
5790                # If timeout > 0, the socket will be in a "blocking" mode
5791                # from the standpoint of the Python API.  For Python socket
5792                # object, "blocking" means that operations like 'sock.recv()'
5793                # will block.  Internally, file descriptors for
5794                # "blocking" Python sockets *with timeouts* are in a
5795                # *non-blocking* mode, and 'sock.recv()' uses 'select()'
5796                # and handles EWOULDBLOCK/EAGAIN to enforce the timeout.
5797                self.assertTrue(s.getblocking())
5798        else:
5799            self.assertEqual(s.type, socket.SOCK_STREAM)
5800            self.assertEqual(s.gettimeout(), None)
5801            self.assertFalse(
5802                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5803            self.assertTrue(s.getblocking())
5804
5805    @support.requires_linux_version(2, 6, 28)
5806    def test_SOCK_NONBLOCK(self):
5807        # a lot of it seems silly and redundant, but I wanted to test that
5808        # changing back and forth worked ok
5809        with socket.socket(socket.AF_INET,
5810                           socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
5811            self.checkNonblock(s)
5812            s.setblocking(True)
5813            self.checkNonblock(s, nonblock=False)
5814            s.setblocking(False)
5815            self.checkNonblock(s)
5816            s.settimeout(None)
5817            self.checkNonblock(s, nonblock=False)
5818            s.settimeout(2.0)
5819            self.checkNonblock(s, timeout=2.0)
5820            s.setblocking(True)
5821            self.checkNonblock(s, nonblock=False)
5822        # defaulttimeout
5823        t = socket.getdefaulttimeout()
5824        socket.setdefaulttimeout(0.0)
5825        with socket.socket() as s:
5826            self.checkNonblock(s)
5827        socket.setdefaulttimeout(None)
5828        with socket.socket() as s:
5829            self.checkNonblock(s, False)
5830        socket.setdefaulttimeout(2.0)
5831        with socket.socket() as s:
5832            self.checkNonblock(s, timeout=2.0)
5833        socket.setdefaulttimeout(None)
5834        with socket.socket() as s:
5835            self.checkNonblock(s, False)
5836        socket.setdefaulttimeout(t)
5837
5838
5839@unittest.skipUnless(os.name == "nt", "Windows specific")
5840@unittest.skipUnless(multiprocessing, "need multiprocessing")
5841class TestSocketSharing(SocketTCPTest):
5842    # This must be classmethod and not staticmethod or multiprocessing
5843    # won't be able to bootstrap it.
5844    @classmethod
5845    def remoteProcessServer(cls, q):
5846        # Recreate socket from shared data
5847        sdata = q.get()
5848        message = q.get()
5849
5850        s = socket.fromshare(sdata)
5851        s2, c = s.accept()
5852
5853        # Send the message
5854        s2.sendall(message)
5855        s2.close()
5856        s.close()
5857
5858    def testShare(self):
5859        # Transfer the listening server socket to another process
5860        # and service it from there.
5861
5862        # Create process:
5863        q = multiprocessing.Queue()
5864        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
5865        p.start()
5866
5867        # Get the shared socket data
5868        data = self.serv.share(p.pid)
5869
5870        # Pass the shared socket to the other process
5871        addr = self.serv.getsockname()
5872        self.serv.close()
5873        q.put(data)
5874
5875        # The data that the server will send us
5876        message = b"slapmahfro"
5877        q.put(message)
5878
5879        # Connect
5880        s = socket.create_connection(addr)
5881        #  listen for the data
5882        m = []
5883        while True:
5884            data = s.recv(100)
5885            if not data:
5886                break
5887            m.append(data)
5888        s.close()
5889        received = b"".join(m)
5890        self.assertEqual(received, message)
5891        p.join()
5892
5893    def testShareLength(self):
5894        data = self.serv.share(os.getpid())
5895        self.assertRaises(ValueError, socket.fromshare, data[:-1])
5896        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
5897
5898    def compareSockets(self, org, other):
5899        # socket sharing is expected to work only for blocking socket
5900        # since the internal python timeout value isn't transferred.
5901        self.assertEqual(org.gettimeout(), None)
5902        self.assertEqual(org.gettimeout(), other.gettimeout())
5903
5904        self.assertEqual(org.family, other.family)
5905        self.assertEqual(org.type, other.type)
5906        # If the user specified "0" for proto, then
5907        # internally windows will have picked the correct value.
5908        # Python introspection on the socket however will still return
5909        # 0.  For the shared socket, the python value is recreated
5910        # from the actual value, so it may not compare correctly.
5911        if org.proto != 0:
5912            self.assertEqual(org.proto, other.proto)
5913
5914    def testShareLocal(self):
5915        data = self.serv.share(os.getpid())
5916        s = socket.fromshare(data)
5917        try:
5918            self.compareSockets(self.serv, s)
5919        finally:
5920            s.close()
5921
5922    def testTypes(self):
5923        families = [socket.AF_INET, socket.AF_INET6]
5924        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
5925        for f in families:
5926            for t in types:
5927                try:
5928                    source = socket.socket(f, t)
5929                except OSError:
5930                    continue # This combination is not supported
5931                try:
5932                    data = source.share(os.getpid())
5933                    shared = socket.fromshare(data)
5934                    try:
5935                        self.compareSockets(source, shared)
5936                    finally:
5937                        shared.close()
5938                finally:
5939                    source.close()
5940
5941
5942class SendfileUsingSendTest(ThreadedTCPSocketTest):
5943    """
5944    Test the send() implementation of socket.sendfile().
5945    """
5946
5947    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
5948    BUFSIZE = 8192
5949    FILEDATA = b""
5950    TIMEOUT = support.LOOPBACK_TIMEOUT
5951
5952    @classmethod
5953    def setUpClass(cls):
5954        def chunks(total, step):
5955            assert total >= step
5956            while total > step:
5957                yield step
5958                total -= step
5959            if total:
5960                yield total
5961
5962        chunk = b"".join([random.choice(string.ascii_letters).encode()
5963                          for i in range(cls.BUFSIZE)])
5964        with open(support.TESTFN, 'wb') as f:
5965            for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
5966                f.write(chunk)
5967        with open(support.TESTFN, 'rb') as f:
5968            cls.FILEDATA = f.read()
5969            assert len(cls.FILEDATA) == cls.FILESIZE
5970
5971    @classmethod
5972    def tearDownClass(cls):
5973        support.unlink(support.TESTFN)
5974
5975    def accept_conn(self):
5976        self.serv.settimeout(support.LONG_TIMEOUT)
5977        conn, addr = self.serv.accept()
5978        conn.settimeout(self.TIMEOUT)
5979        self.addCleanup(conn.close)
5980        return conn
5981
5982    def recv_data(self, conn):
5983        received = []
5984        while True:
5985            chunk = conn.recv(self.BUFSIZE)
5986            if not chunk:
5987                break
5988            received.append(chunk)
5989        return b''.join(received)
5990
5991    def meth_from_sock(self, sock):
5992        # Depending on the mixin class being run return either send()
5993        # or sendfile() method implementation.
5994        return getattr(sock, "_sendfile_use_send")
5995
5996    # regular file
5997
5998    def _testRegularFile(self):
5999        address = self.serv.getsockname()
6000        file = open(support.TESTFN, 'rb')
6001        with socket.create_connection(address) as sock, file as file:
6002            meth = self.meth_from_sock(sock)
6003            sent = meth(file)
6004            self.assertEqual(sent, self.FILESIZE)
6005            self.assertEqual(file.tell(), self.FILESIZE)
6006
6007    def testRegularFile(self):
6008        conn = self.accept_conn()
6009        data = self.recv_data(conn)
6010        self.assertEqual(len(data), self.FILESIZE)
6011        self.assertEqual(data, self.FILEDATA)
6012
6013    # non regular file
6014
6015    def _testNonRegularFile(self):
6016        address = self.serv.getsockname()
6017        file = io.BytesIO(self.FILEDATA)
6018        with socket.create_connection(address) as sock, file as file:
6019            sent = sock.sendfile(file)
6020            self.assertEqual(sent, self.FILESIZE)
6021            self.assertEqual(file.tell(), self.FILESIZE)
6022            self.assertRaises(socket._GiveupOnSendfile,
6023                              sock._sendfile_use_sendfile, file)
6024
6025    def testNonRegularFile(self):
6026        conn = self.accept_conn()
6027        data = self.recv_data(conn)
6028        self.assertEqual(len(data), self.FILESIZE)
6029        self.assertEqual(data, self.FILEDATA)
6030
6031    # empty file
6032
6033    def _testEmptyFileSend(self):
6034        address = self.serv.getsockname()
6035        filename = support.TESTFN + "2"
6036        with open(filename, 'wb'):
6037            self.addCleanup(support.unlink, filename)
6038        file = open(filename, 'rb')
6039        with socket.create_connection(address) as sock, file as file:
6040            meth = self.meth_from_sock(sock)
6041            sent = meth(file)
6042            self.assertEqual(sent, 0)
6043            self.assertEqual(file.tell(), 0)
6044
6045    def testEmptyFileSend(self):
6046        conn = self.accept_conn()
6047        data = self.recv_data(conn)
6048        self.assertEqual(data, b"")
6049
6050    # offset
6051
6052    def _testOffset(self):
6053        address = self.serv.getsockname()
6054        file = open(support.TESTFN, 'rb')
6055        with socket.create_connection(address) as sock, file as file:
6056            meth = self.meth_from_sock(sock)
6057            sent = meth(file, offset=5000)
6058            self.assertEqual(sent, self.FILESIZE - 5000)
6059            self.assertEqual(file.tell(), self.FILESIZE)
6060
6061    def testOffset(self):
6062        conn = self.accept_conn()
6063        data = self.recv_data(conn)
6064        self.assertEqual(len(data), self.FILESIZE - 5000)
6065        self.assertEqual(data, self.FILEDATA[5000:])
6066
6067    # count
6068
6069    def _testCount(self):
6070        address = self.serv.getsockname()
6071        file = open(support.TESTFN, 'rb')
6072        sock = socket.create_connection(address,
6073                                        timeout=support.LOOPBACK_TIMEOUT)
6074        with sock, file:
6075            count = 5000007
6076            meth = self.meth_from_sock(sock)
6077            sent = meth(file, count=count)
6078            self.assertEqual(sent, count)
6079            self.assertEqual(file.tell(), count)
6080
6081    def testCount(self):
6082        count = 5000007
6083        conn = self.accept_conn()
6084        data = self.recv_data(conn)
6085        self.assertEqual(len(data), count)
6086        self.assertEqual(data, self.FILEDATA[:count])
6087
6088    # count small
6089
6090    def _testCountSmall(self):
6091        address = self.serv.getsockname()
6092        file = open(support.TESTFN, 'rb')
6093        sock = socket.create_connection(address,
6094                                        timeout=support.LOOPBACK_TIMEOUT)
6095        with sock, file:
6096            count = 1
6097            meth = self.meth_from_sock(sock)
6098            sent = meth(file, count=count)
6099            self.assertEqual(sent, count)
6100            self.assertEqual(file.tell(), count)
6101
6102    def testCountSmall(self):
6103        count = 1
6104        conn = self.accept_conn()
6105        data = self.recv_data(conn)
6106        self.assertEqual(len(data), count)
6107        self.assertEqual(data, self.FILEDATA[:count])
6108
6109    # count + offset
6110
6111    def _testCountWithOffset(self):
6112        address = self.serv.getsockname()
6113        file = open(support.TESTFN, 'rb')
6114        with socket.create_connection(address, timeout=2) as sock, file as file:
6115            count = 100007
6116            meth = self.meth_from_sock(sock)
6117            sent = meth(file, offset=2007, count=count)
6118            self.assertEqual(sent, count)
6119            self.assertEqual(file.tell(), count + 2007)
6120
6121    def testCountWithOffset(self):
6122        count = 100007
6123        conn = self.accept_conn()
6124        data = self.recv_data(conn)
6125        self.assertEqual(len(data), count)
6126        self.assertEqual(data, self.FILEDATA[2007:count+2007])
6127
6128    # non blocking sockets are not supposed to work
6129
6130    def _testNonBlocking(self):
6131        address = self.serv.getsockname()
6132        file = open(support.TESTFN, 'rb')
6133        with socket.create_connection(address) as sock, file as file:
6134            sock.setblocking(False)
6135            meth = self.meth_from_sock(sock)
6136            self.assertRaises(ValueError, meth, file)
6137            self.assertRaises(ValueError, sock.sendfile, file)
6138
6139    def testNonBlocking(self):
6140        conn = self.accept_conn()
6141        if conn.recv(8192):
6142            self.fail('was not supposed to receive any data')
6143
6144    # timeout (non-triggered)
6145
6146    def _testWithTimeout(self):
6147        address = self.serv.getsockname()
6148        file = open(support.TESTFN, 'rb')
6149        sock = socket.create_connection(address,
6150                                        timeout=support.LOOPBACK_TIMEOUT)
6151        with sock, file:
6152            meth = self.meth_from_sock(sock)
6153            sent = meth(file)
6154            self.assertEqual(sent, self.FILESIZE)
6155
6156    def testWithTimeout(self):
6157        conn = self.accept_conn()
6158        data = self.recv_data(conn)
6159        self.assertEqual(len(data), self.FILESIZE)
6160        self.assertEqual(data, self.FILEDATA)
6161
6162    # timeout (triggered)
6163
6164    def _testWithTimeoutTriggeredSend(self):
6165        address = self.serv.getsockname()
6166        with open(support.TESTFN, 'rb') as file:
6167            with socket.create_connection(address) as sock:
6168                sock.settimeout(0.01)
6169                meth = self.meth_from_sock(sock)
6170                self.assertRaises(socket.timeout, meth, file)
6171
6172    def testWithTimeoutTriggeredSend(self):
6173        conn = self.accept_conn()
6174        conn.recv(88192)
6175
6176    # errors
6177
6178    def _test_errors(self):
6179        pass
6180
6181    def test_errors(self):
6182        with open(support.TESTFN, 'rb') as file:
6183            with socket.socket(type=socket.SOCK_DGRAM) as s:
6184                meth = self.meth_from_sock(s)
6185                self.assertRaisesRegex(
6186                    ValueError, "SOCK_STREAM", meth, file)
6187        with open(support.TESTFN, 'rt') as file:
6188            with socket.socket() as s:
6189                meth = self.meth_from_sock(s)
6190                self.assertRaisesRegex(
6191                    ValueError, "binary mode", meth, file)
6192        with open(support.TESTFN, 'rb') as file:
6193            with socket.socket() as s:
6194                meth = self.meth_from_sock(s)
6195                self.assertRaisesRegex(TypeError, "positive integer",
6196                                       meth, file, count='2')
6197                self.assertRaisesRegex(TypeError, "positive integer",
6198                                       meth, file, count=0.1)
6199                self.assertRaisesRegex(ValueError, "positive integer",
6200                                       meth, file, count=0)
6201                self.assertRaisesRegex(ValueError, "positive integer",
6202                                       meth, file, count=-1)
6203
6204
6205@unittest.skipUnless(hasattr(os, "sendfile"),
6206                     'os.sendfile() required for this test.')
6207class SendfileUsingSendfileTest(SendfileUsingSendTest):
6208    """
6209    Test the sendfile() implementation of socket.sendfile().
6210    """
6211    def meth_from_sock(self, sock):
6212        return getattr(sock, "_sendfile_use_sendfile")
6213
6214
6215@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
6216class LinuxKernelCryptoAPI(unittest.TestCase):
6217    # tests for AF_ALG
6218    def create_alg(self, typ, name):
6219        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6220        try:
6221            sock.bind((typ, name))
6222        except FileNotFoundError as e:
6223            # type / algorithm is not available
6224            sock.close()
6225            raise unittest.SkipTest(str(e), typ, name)
6226        else:
6227            return sock
6228
6229    # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY,
6230    # at least on ppc64le architecture
6231    @support.requires_linux_version(4, 5)
6232    def test_sha256(self):
6233        expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
6234                                 "177a9cb410ff61f20015ad")
6235        with self.create_alg('hash', 'sha256') as algo:
6236            op, _ = algo.accept()
6237            with op:
6238                op.sendall(b"abc")
6239                self.assertEqual(op.recv(512), expected)
6240
6241            op, _ = algo.accept()
6242            with op:
6243                op.send(b'a', socket.MSG_MORE)
6244                op.send(b'b', socket.MSG_MORE)
6245                op.send(b'c', socket.MSG_MORE)
6246                op.send(b'')
6247                self.assertEqual(op.recv(512), expected)
6248
6249    def test_hmac_sha1(self):
6250        expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
6251        with self.create_alg('hash', 'hmac(sha1)') as algo:
6252            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
6253            op, _ = algo.accept()
6254            with op:
6255                op.sendall(b"what do ya want for nothing?")
6256                self.assertEqual(op.recv(512), expected)
6257
6258    # Although it should work with 3.19 and newer the test blocks on
6259    # Ubuntu 15.10 with Kernel 4.2.0-19.
6260    @support.requires_linux_version(4, 3)
6261    def test_aes_cbc(self):
6262        key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
6263        iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
6264        msg = b"Single block msg"
6265        ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
6266        msglen = len(msg)
6267        with self.create_alg('skcipher', 'cbc(aes)') as algo:
6268            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6269            op, _ = algo.accept()
6270            with op:
6271                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6272                                 flags=socket.MSG_MORE)
6273                op.sendall(msg)
6274                self.assertEqual(op.recv(msglen), ciphertext)
6275
6276            op, _ = algo.accept()
6277            with op:
6278                op.sendmsg_afalg([ciphertext],
6279                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6280                self.assertEqual(op.recv(msglen), msg)
6281
6282            # long message
6283            multiplier = 1024
6284            longmsg = [msg] * multiplier
6285            op, _ = algo.accept()
6286            with op:
6287                op.sendmsg_afalg(longmsg,
6288                                 op=socket.ALG_OP_ENCRYPT, iv=iv)
6289                enc = op.recv(msglen * multiplier)
6290            self.assertEqual(len(enc), msglen * multiplier)
6291            self.assertEqual(enc[:msglen], ciphertext)
6292
6293            op, _ = algo.accept()
6294            with op:
6295                op.sendmsg_afalg([enc],
6296                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6297                dec = op.recv(msglen * multiplier)
6298            self.assertEqual(len(dec), msglen * multiplier)
6299            self.assertEqual(dec, msg * multiplier)
6300
6301    @support.requires_linux_version(4, 9)  # see issue29324
6302    def test_aead_aes_gcm(self):
6303        key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
6304        iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
6305        plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
6306        assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
6307        expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
6308        expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
6309
6310        taglen = len(expected_tag)
6311        assoclen = len(assoc)
6312
6313        with self.create_alg('aead', 'gcm(aes)') as algo:
6314            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6315            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
6316                            None, taglen)
6317
6318            # send assoc, plain and tag buffer in separate steps
6319            op, _ = algo.accept()
6320            with op:
6321                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6322                                 assoclen=assoclen, flags=socket.MSG_MORE)
6323                op.sendall(assoc, socket.MSG_MORE)
6324                op.sendall(plain)
6325                res = op.recv(assoclen + len(plain) + taglen)
6326                self.assertEqual(expected_ct, res[assoclen:-taglen])
6327                self.assertEqual(expected_tag, res[-taglen:])
6328
6329            # now with msg
6330            op, _ = algo.accept()
6331            with op:
6332                msg = assoc + plain
6333                op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
6334                                 assoclen=assoclen)
6335                res = op.recv(assoclen + len(plain) + taglen)
6336                self.assertEqual(expected_ct, res[assoclen:-taglen])
6337                self.assertEqual(expected_tag, res[-taglen:])
6338
6339            # create anc data manually
6340            pack_uint32 = struct.Struct('I').pack
6341            op, _ = algo.accept()
6342            with op:
6343                msg = assoc + plain
6344                op.sendmsg(
6345                    [msg],
6346                    ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
6347                     [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
6348                     [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
6349                    )
6350                )
6351                res = op.recv(len(msg) + taglen)
6352                self.assertEqual(expected_ct, res[assoclen:-taglen])
6353                self.assertEqual(expected_tag, res[-taglen:])
6354
6355            # decrypt and verify
6356            op, _ = algo.accept()
6357            with op:
6358                msg = assoc + expected_ct + expected_tag
6359                op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
6360                                 assoclen=assoclen)
6361                res = op.recv(len(msg) - taglen)
6362                self.assertEqual(plain, res[assoclen:])
6363
6364    @support.requires_linux_version(4, 3)  # see test_aes_cbc
6365    def test_drbg_pr_sha256(self):
6366        # deterministic random bit generator, prediction resistance, sha256
6367        with self.create_alg('rng', 'drbg_pr_sha256') as algo:
6368            extra_seed = os.urandom(32)
6369            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
6370            op, _ = algo.accept()
6371            with op:
6372                rn = op.recv(32)
6373                self.assertEqual(len(rn), 32)
6374
6375    def test_sendmsg_afalg_args(self):
6376        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6377        with sock:
6378            with self.assertRaises(TypeError):
6379                sock.sendmsg_afalg()
6380
6381            with self.assertRaises(TypeError):
6382                sock.sendmsg_afalg(op=None)
6383
6384            with self.assertRaises(TypeError):
6385                sock.sendmsg_afalg(1)
6386
6387            with self.assertRaises(TypeError):
6388                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
6389
6390            with self.assertRaises(TypeError):
6391                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
6392
6393    def test_length_restriction(self):
6394        # bpo-35050, off-by-one error in length check
6395        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6396        self.addCleanup(sock.close)
6397
6398        # salg_type[14]
6399        with self.assertRaises(FileNotFoundError):
6400            sock.bind(("t" * 13, "name"))
6401        with self.assertRaisesRegex(ValueError, "type too long"):
6402            sock.bind(("t" * 14, "name"))
6403
6404        # salg_name[64]
6405        with self.assertRaises(FileNotFoundError):
6406            sock.bind(("type", "n" * 63))
6407        with self.assertRaisesRegex(ValueError, "name too long"):
6408            sock.bind(("type", "n" * 64))
6409
6410
6411@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
6412class TestMSWindowsTCPFlags(unittest.TestCase):
6413    knownTCPFlags = {
6414                       # available since long time ago
6415                       'TCP_MAXSEG',
6416                       'TCP_NODELAY',
6417                       # available starting with Windows 10 1607
6418                       'TCP_FASTOPEN',
6419                       # available starting with Windows 10 1703
6420                       'TCP_KEEPCNT',
6421                       # available starting with Windows 10 1709
6422                       'TCP_KEEPIDLE',
6423                       'TCP_KEEPINTVL'
6424                       }
6425
6426    def test_new_tcp_flags(self):
6427        provided = [s for s in dir(socket) if s.startswith('TCP')]
6428        unknown = [s for s in provided if s not in self.knownTCPFlags]
6429
6430        self.assertEqual([], unknown,
6431            "New TCP flags were discovered. See bpo-32394 for more information")
6432
6433
6434class CreateServerTest(unittest.TestCase):
6435
6436    def test_address(self):
6437        port = socket_helper.find_unused_port()
6438        with socket.create_server(("127.0.0.1", port)) as sock:
6439            self.assertEqual(sock.getsockname()[0], "127.0.0.1")
6440            self.assertEqual(sock.getsockname()[1], port)
6441        if socket_helper.IPV6_ENABLED:
6442            with socket.create_server(("::1", port),
6443                                      family=socket.AF_INET6) as sock:
6444                self.assertEqual(sock.getsockname()[0], "::1")
6445                self.assertEqual(sock.getsockname()[1], port)
6446
6447    def test_family_and_type(self):
6448        with socket.create_server(("127.0.0.1", 0)) as sock:
6449            self.assertEqual(sock.family, socket.AF_INET)
6450            self.assertEqual(sock.type, socket.SOCK_STREAM)
6451        if socket_helper.IPV6_ENABLED:
6452            with socket.create_server(("::1", 0), family=socket.AF_INET6) as s:
6453                self.assertEqual(s.family, socket.AF_INET6)
6454                self.assertEqual(sock.type, socket.SOCK_STREAM)
6455
6456    def test_reuse_port(self):
6457        if not hasattr(socket, "SO_REUSEPORT"):
6458            with self.assertRaises(ValueError):
6459                socket.create_server(("localhost", 0), reuse_port=True)
6460        else:
6461            with socket.create_server(("localhost", 0)) as sock:
6462                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6463                self.assertEqual(opt, 0)
6464            with socket.create_server(("localhost", 0), reuse_port=True) as sock:
6465                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6466                self.assertNotEqual(opt, 0)
6467
6468    @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
6469                     not hasattr(_socket, 'IPV6_V6ONLY'),
6470                     "IPV6_V6ONLY option not supported")
6471    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6472    def test_ipv6_only_default(self):
6473        with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock:
6474            assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
6475
6476    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6477                     "dualstack_ipv6 not supported")
6478    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6479    def test_dualstack_ipv6_family(self):
6480        with socket.create_server(("::1", 0), family=socket.AF_INET6,
6481                                  dualstack_ipv6=True) as sock:
6482            self.assertEqual(sock.family, socket.AF_INET6)
6483
6484
6485class CreateServerFunctionalTest(unittest.TestCase):
6486    timeout = support.LOOPBACK_TIMEOUT
6487
6488    def setUp(self):
6489        self.thread = None
6490
6491    def tearDown(self):
6492        if self.thread is not None:
6493            self.thread.join(self.timeout)
6494
6495    def echo_server(self, sock):
6496        def run(sock):
6497            with sock:
6498                conn, _ = sock.accept()
6499                with conn:
6500                    event.wait(self.timeout)
6501                    msg = conn.recv(1024)
6502                    if not msg:
6503                        return
6504                    conn.sendall(msg)
6505
6506        event = threading.Event()
6507        sock.settimeout(self.timeout)
6508        self.thread = threading.Thread(target=run, args=(sock, ))
6509        self.thread.start()
6510        event.set()
6511
6512    def echo_client(self, addr, family):
6513        with socket.socket(family=family) as sock:
6514            sock.settimeout(self.timeout)
6515            sock.connect(addr)
6516            sock.sendall(b'foo')
6517            self.assertEqual(sock.recv(1024), b'foo')
6518
6519    def test_tcp4(self):
6520        port = socket_helper.find_unused_port()
6521        with socket.create_server(("", port)) as sock:
6522            self.echo_server(sock)
6523            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6524
6525    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6526    def test_tcp6(self):
6527        port = socket_helper.find_unused_port()
6528        with socket.create_server(("", port),
6529                                  family=socket.AF_INET6) as sock:
6530            self.echo_server(sock)
6531            self.echo_client(("::1", port), socket.AF_INET6)
6532
6533    # --- dual stack tests
6534
6535    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6536                     "dualstack_ipv6 not supported")
6537    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6538    def test_dual_stack_client_v4(self):
6539        port = socket_helper.find_unused_port()
6540        with socket.create_server(("", port), family=socket.AF_INET6,
6541                                  dualstack_ipv6=True) as sock:
6542            self.echo_server(sock)
6543            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6544
6545    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6546                     "dualstack_ipv6 not supported")
6547    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6548    def test_dual_stack_client_v6(self):
6549        port = socket_helper.find_unused_port()
6550        with socket.create_server(("", port), family=socket.AF_INET6,
6551                                  dualstack_ipv6=True) as sock:
6552            self.echo_server(sock)
6553            self.echo_client(("::1", port), socket.AF_INET6)
6554
6555@requireAttrs(socket, "send_fds")
6556@requireAttrs(socket, "recv_fds")
6557@requireAttrs(socket, "AF_UNIX")
6558class SendRecvFdsTests(unittest.TestCase):
6559    def testSendAndRecvFds(self):
6560        def close_pipes(pipes):
6561            for fd1, fd2 in pipes:
6562                os.close(fd1)
6563                os.close(fd2)
6564
6565        def close_fds(fds):
6566            for fd in fds:
6567                os.close(fd)
6568
6569        # send 10 file descriptors
6570        pipes = [os.pipe() for _ in range(10)]
6571        self.addCleanup(close_pipes, pipes)
6572        fds = [rfd for rfd, wfd in pipes]
6573
6574        # use a UNIX socket pair to exchange file descriptors locally
6575        sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
6576        with sock1, sock2:
6577            socket.send_fds(sock1, [MSG], fds)
6578            # request more data and file descriptors than expected
6579            msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2)
6580            self.addCleanup(close_fds, fds2)
6581
6582        self.assertEqual(msg, MSG)
6583        self.assertEqual(len(fds2), len(fds))
6584        self.assertEqual(flags, 0)
6585        # don't test addr
6586
6587        # test that file descriptors are connected
6588        for index, fds in enumerate(pipes):
6589            rfd, wfd = fds
6590            os.write(wfd, str(index).encode())
6591
6592        for index, rfd in enumerate(fds2):
6593            data = os.read(rfd, 100)
6594            self.assertEqual(data,  str(index).encode())
6595
6596
6597def test_main():
6598    tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
6599             TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
6600             UDPTimeoutTest, CreateServerTest, CreateServerFunctionalTest,
6601             SendRecvFdsTests]
6602
6603    tests.extend([
6604        NonBlockingTCPTests,
6605        FileObjectClassTestCase,
6606        UnbufferedFileObjectClassTestCase,
6607        LineBufferedFileObjectClassTestCase,
6608        SmallBufferedFileObjectClassTestCase,
6609        UnicodeReadFileObjectClassTestCase,
6610        UnicodeWriteFileObjectClassTestCase,
6611        UnicodeReadWriteFileObjectClassTestCase,
6612        NetworkConnectionNoServer,
6613        NetworkConnectionAttributesTest,
6614        NetworkConnectionBehaviourTest,
6615        ContextManagersTest,
6616        InheritanceTest,
6617        NonblockConstantTest
6618    ])
6619    tests.append(BasicSocketPairTest)
6620    tests.append(TestUnixDomain)
6621    tests.append(TestLinuxAbstractNamespace)
6622    tests.extend([TIPCTest, TIPCThreadableTest])
6623    tests.extend([BasicCANTest, CANTest])
6624    tests.extend([BasicRDSTest, RDSTest])
6625    tests.append(LinuxKernelCryptoAPI)
6626    tests.append(BasicQIPCRTRTest)
6627    tests.extend([
6628        BasicVSOCKTest,
6629        ThreadedVSOCKSocketStreamTest,
6630    ])
6631    tests.append(BasicBluetoothTest)
6632    tests.extend([
6633        CmsgMacroTests,
6634        SendmsgUDPTest,
6635        RecvmsgUDPTest,
6636        RecvmsgIntoUDPTest,
6637        SendmsgUDP6Test,
6638        RecvmsgUDP6Test,
6639        RecvmsgRFC3542AncillaryUDP6Test,
6640        RecvmsgIntoRFC3542AncillaryUDP6Test,
6641        RecvmsgIntoUDP6Test,
6642        SendmsgUDPLITETest,
6643        RecvmsgUDPLITETest,
6644        RecvmsgIntoUDPLITETest,
6645        SendmsgUDPLITE6Test,
6646        RecvmsgUDPLITE6Test,
6647        RecvmsgRFC3542AncillaryUDPLITE6Test,
6648        RecvmsgIntoRFC3542AncillaryUDPLITE6Test,
6649        RecvmsgIntoUDPLITE6Test,
6650        SendmsgTCPTest,
6651        RecvmsgTCPTest,
6652        RecvmsgIntoTCPTest,
6653        SendmsgSCTPStreamTest,
6654        RecvmsgSCTPStreamTest,
6655        RecvmsgIntoSCTPStreamTest,
6656        SendmsgUnixStreamTest,
6657        RecvmsgUnixStreamTest,
6658        RecvmsgIntoUnixStreamTest,
6659        RecvmsgSCMRightsStreamTest,
6660        RecvmsgIntoSCMRightsStreamTest,
6661        # These are slow when setitimer() is not available
6662        InterruptedRecvTimeoutTest,
6663        InterruptedSendTimeoutTest,
6664        TestSocketSharing,
6665        SendfileUsingSendTest,
6666        SendfileUsingSendfileTest,
6667    ])
6668    tests.append(TestMSWindowsTCPFlags)
6669
6670    thread_info = support.threading_setup()
6671    support.run_unittest(*tests)
6672    support.threading_cleanup(*thread_info)
6673
6674
6675if __name__ == "__main__":
6676    test_main()
6677