• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3
4import errno
5import io
6import itertools
7import socket
8import select
9import tempfile
10import time
11import traceback
12import queue
13import sys
14import os
15import array
16import contextlib
17from weakref import proxy
18import signal
19import math
20import pickle
21import struct
22import random
23import string
24try:
25    import multiprocessing
26except ImportError:
27    multiprocessing = False
28try:
29    import fcntl
30except ImportError:
31    fcntl = None
32
33HOST = support.HOST
34MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return
35
36try:
37    import _thread as thread
38    import threading
39except ImportError:
40    thread = None
41    threading = None
42try:
43    import _socket
44except ImportError:
45    _socket = None
46
47
48def _have_socket_can():
49    """Check whether CAN sockets are supported on this host."""
50    try:
51        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
52    except (AttributeError, OSError):
53        return False
54    else:
55        s.close()
56    return True
57
58def _have_socket_rds():
59    """Check whether RDS sockets are supported on this host."""
60    try:
61        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
62    except (AttributeError, OSError):
63        return False
64    else:
65        s.close()
66    return True
67
68def _have_socket_alg():
69    """Check whether AF_ALG sockets are supported on this host."""
70    try:
71        s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
72    except (AttributeError, OSError):
73        return False
74    else:
75        s.close()
76    return True
77
78HAVE_SOCKET_CAN = _have_socket_can()
79
80HAVE_SOCKET_RDS = _have_socket_rds()
81
82HAVE_SOCKET_ALG = _have_socket_alg()
83
84# Size in bytes of the int type
85SIZEOF_INT = array.array("i").itemsize
86
87class SocketTCPTest(unittest.TestCase):
88
89    def setUp(self):
90        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91        self.port = support.bind_port(self.serv)
92        self.serv.listen()
93
94    def tearDown(self):
95        self.serv.close()
96        self.serv = None
97
98class SocketUDPTest(unittest.TestCase):
99
100    def setUp(self):
101        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
102        self.port = support.bind_port(self.serv)
103
104    def tearDown(self):
105        self.serv.close()
106        self.serv = None
107
108class ThreadSafeCleanupTestCase(unittest.TestCase):
109    """Subclass of unittest.TestCase with thread-safe cleanup methods.
110
111    This subclass protects the addCleanup() and doCleanups() methods
112    with a recursive lock.
113    """
114
115    if threading:
116        def __init__(self, *args, **kwargs):
117            super().__init__(*args, **kwargs)
118            self._cleanup_lock = threading.RLock()
119
120        def addCleanup(self, *args, **kwargs):
121            with self._cleanup_lock:
122                return super().addCleanup(*args, **kwargs)
123
124        def doCleanups(self, *args, **kwargs):
125            with self._cleanup_lock:
126                return super().doCleanups(*args, **kwargs)
127
128class SocketCANTest(unittest.TestCase):
129
130    """To be able to run this test, a `vcan0` CAN interface can be created with
131    the following commands:
132    # modprobe vcan
133    # ip link add dev vcan0 type vcan
134    # ifconfig vcan0 up
135    """
136    interface = 'vcan0'
137    bufsize = 128
138
139    """The CAN frame structure is defined in <linux/can.h>:
140
141    struct can_frame {
142        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
143        __u8    can_dlc; /* data length code: 0 .. 8 */
144        __u8    data[8] __attribute__((aligned(8)));
145    };
146    """
147    can_frame_fmt = "=IB3x8s"
148    can_frame_size = struct.calcsize(can_frame_fmt)
149
150    """The Broadcast Management Command frame structure is defined
151    in <linux/can/bcm.h>:
152
153    struct bcm_msg_head {
154        __u32 opcode;
155        __u32 flags;
156        __u32 count;
157        struct timeval ival1, ival2;
158        canid_t can_id;
159        __u32 nframes;
160        struct can_frame frames[0];
161    }
162
163    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
164    `struct can_frame` definition). Must use native not standard types for packing.
165    """
166    bcm_cmd_msg_fmt = "@3I4l2I"
167    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
168
169    def setUp(self):
170        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
171        self.addCleanup(self.s.close)
172        try:
173            self.s.bind((self.interface,))
174        except OSError:
175            self.skipTest('network interface `%s` does not exist' %
176                           self.interface)
177
178
179class SocketRDSTest(unittest.TestCase):
180
181    """To be able to run this test, the `rds` kernel module must be loaded:
182    # modprobe rds
183    """
184    bufsize = 8192
185
186    def setUp(self):
187        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
188        self.addCleanup(self.serv.close)
189        try:
190            self.port = support.bind_port(self.serv)
191        except OSError:
192            self.skipTest('unable to bind RDS socket')
193
194
195class ThreadableTest:
196    """Threadable Test class
197
198    The ThreadableTest class makes it easy to create a threaded
199    client/server pair from an existing unit test. To create a
200    new threaded class from an existing unit test, use multiple
201    inheritance:
202
203        class NewClass (OldClass, ThreadableTest):
204            pass
205
206    This class defines two new fixture functions with obvious
207    purposes for overriding:
208
209        clientSetUp ()
210        clientTearDown ()
211
212    Any new test functions within the class must then define
213    tests in pairs, where the test name is preceded with a
214    '_' to indicate the client portion of the test. Ex:
215
216        def testFoo(self):
217            # Server portion
218
219        def _testFoo(self):
220            # Client portion
221
222    Any exceptions raised by the clients during their tests
223    are caught and transferred to the main thread to alert
224    the testing framework.
225
226    Note, the server setup function cannot call any blocking
227    functions that rely on the client thread during setup,
228    unless serverExplicitReady() is called just before
229    the blocking call (such as in setting up a client/server
230    connection and performing the accept() in setUp().
231    """
232
233    def __init__(self):
234        # Swap the true setup function
235        self.__setUp = self.setUp
236        self.__tearDown = self.tearDown
237        self.setUp = self._setUp
238        self.tearDown = self._tearDown
239
240    def serverExplicitReady(self):
241        """This method allows the server to explicitly indicate that
242        it wants the client thread to proceed. This is useful if the
243        server is about to execute a blocking routine that is
244        dependent upon the client thread during its setup routine."""
245        self.server_ready.set()
246
247    def _setUp(self):
248        self.server_ready = threading.Event()
249        self.client_ready = threading.Event()
250        self.done = threading.Event()
251        self.queue = queue.Queue(1)
252        self.server_crashed = False
253
254        # Do some munging to start the client test.
255        methodname = self.id()
256        i = methodname.rfind('.')
257        methodname = methodname[i+1:]
258        test_method = getattr(self, '_' + methodname)
259        self.client_thread = thread.start_new_thread(
260            self.clientRun, (test_method,))
261
262        try:
263            self.__setUp()
264        except:
265            self.server_crashed = True
266            raise
267        finally:
268            self.server_ready.set()
269        self.client_ready.wait()
270
271    def _tearDown(self):
272        self.__tearDown()
273        self.done.wait()
274
275        if self.queue.qsize():
276            exc = self.queue.get()
277            raise exc
278
279    def clientRun(self, test_func):
280        self.server_ready.wait()
281        try:
282            self.clientSetUp()
283        except BaseException as e:
284            self.queue.put(e)
285            self.clientTearDown()
286            return
287        finally:
288            self.client_ready.set()
289        if self.server_crashed:
290            self.clientTearDown()
291            return
292        if not hasattr(test_func, '__call__'):
293            raise TypeError("test_func must be a callable function")
294        try:
295            test_func()
296        except BaseException as e:
297            self.queue.put(e)
298        finally:
299            self.clientTearDown()
300
301    def clientSetUp(self):
302        raise NotImplementedError("clientSetUp must be implemented.")
303
304    def clientTearDown(self):
305        self.done.set()
306        thread.exit()
307
308class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
309
310    def __init__(self, methodName='runTest'):
311        SocketTCPTest.__init__(self, methodName=methodName)
312        ThreadableTest.__init__(self)
313
314    def clientSetUp(self):
315        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
316
317    def clientTearDown(self):
318        self.cli.close()
319        self.cli = None
320        ThreadableTest.clientTearDown(self)
321
322class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
323
324    def __init__(self, methodName='runTest'):
325        SocketUDPTest.__init__(self, methodName=methodName)
326        ThreadableTest.__init__(self)
327
328    def clientSetUp(self):
329        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
330
331    def clientTearDown(self):
332        self.cli.close()
333        self.cli = None
334        ThreadableTest.clientTearDown(self)
335
336class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
337
338    def __init__(self, methodName='runTest'):
339        SocketCANTest.__init__(self, methodName=methodName)
340        ThreadableTest.__init__(self)
341
342    def clientSetUp(self):
343        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
344        try:
345            self.cli.bind((self.interface,))
346        except OSError:
347            # skipTest should not be called here, and will be called in the
348            # server instead
349            pass
350
351    def clientTearDown(self):
352        self.cli.close()
353        self.cli = None
354        ThreadableTest.clientTearDown(self)
355
356class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
357
358    def __init__(self, methodName='runTest'):
359        SocketRDSTest.__init__(self, methodName=methodName)
360        ThreadableTest.__init__(self)
361
362    def clientSetUp(self):
363        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
364        try:
365            # RDS sockets must be bound explicitly to send or receive data
366            self.cli.bind((HOST, 0))
367            self.cli_addr = self.cli.getsockname()
368        except OSError:
369            # skipTest should not be called here, and will be called in the
370            # server instead
371            pass
372
373    def clientTearDown(self):
374        self.cli.close()
375        self.cli = None
376        ThreadableTest.clientTearDown(self)
377
378class SocketConnectedTest(ThreadedTCPSocketTest):
379    """Socket tests for client-server connection.
380
381    self.cli_conn is a client socket connected to the server.  The
382    setUp() method guarantees that it is connected to the server.
383    """
384
385    def __init__(self, methodName='runTest'):
386        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
387
388    def setUp(self):
389        ThreadedTCPSocketTest.setUp(self)
390        # Indicate explicitly we're ready for the client thread to
391        # proceed and then perform the blocking call to accept
392        self.serverExplicitReady()
393        conn, addr = self.serv.accept()
394        self.cli_conn = conn
395
396    def tearDown(self):
397        self.cli_conn.close()
398        self.cli_conn = None
399        ThreadedTCPSocketTest.tearDown(self)
400
401    def clientSetUp(self):
402        ThreadedTCPSocketTest.clientSetUp(self)
403        self.cli.connect((HOST, self.port))
404        self.serv_conn = self.cli
405
406    def clientTearDown(self):
407        self.serv_conn.close()
408        self.serv_conn = None
409        ThreadedTCPSocketTest.clientTearDown(self)
410
411class SocketPairTest(unittest.TestCase, ThreadableTest):
412
413    def __init__(self, methodName='runTest'):
414        unittest.TestCase.__init__(self, methodName=methodName)
415        ThreadableTest.__init__(self)
416
417    def setUp(self):
418        self.serv, self.cli = socket.socketpair()
419
420    def tearDown(self):
421        self.serv.close()
422        self.serv = None
423
424    def clientSetUp(self):
425        pass
426
427    def clientTearDown(self):
428        self.cli.close()
429        self.cli = None
430        ThreadableTest.clientTearDown(self)
431
432
433# The following classes are used by the sendmsg()/recvmsg() tests.
434# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
435# gives a drop-in replacement for SocketConnectedTest, but different
436# address families can be used, and the attributes serv_addr and
437# cli_addr will be set to the addresses of the endpoints.
438
439class SocketTestBase(unittest.TestCase):
440    """A base class for socket tests.
441
442    Subclasses must provide methods newSocket() to return a new socket
443    and bindSock(sock) to bind it to an unused address.
444
445    Creates a socket self.serv and sets self.serv_addr to its address.
446    """
447
448    def setUp(self):
449        self.serv = self.newSocket()
450        self.bindServer()
451
452    def bindServer(self):
453        """Bind server socket and set self.serv_addr to its address."""
454        self.bindSock(self.serv)
455        self.serv_addr = self.serv.getsockname()
456
457    def tearDown(self):
458        self.serv.close()
459        self.serv = None
460
461
462class SocketListeningTestMixin(SocketTestBase):
463    """Mixin to listen on the server socket."""
464
465    def setUp(self):
466        super().setUp()
467        self.serv.listen()
468
469
470class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
471                              ThreadableTest):
472    """Mixin to add client socket and allow client/server tests.
473
474    Client socket is self.cli and its address is self.cli_addr.  See
475    ThreadableTest for usage information.
476    """
477
478    def __init__(self, *args, **kwargs):
479        super().__init__(*args, **kwargs)
480        ThreadableTest.__init__(self)
481
482    def clientSetUp(self):
483        self.cli = self.newClientSocket()
484        self.bindClient()
485
486    def newClientSocket(self):
487        """Return a new socket for use as client."""
488        return self.newSocket()
489
490    def bindClient(self):
491        """Bind client socket and set self.cli_addr to its address."""
492        self.bindSock(self.cli)
493        self.cli_addr = self.cli.getsockname()
494
495    def clientTearDown(self):
496        self.cli.close()
497        self.cli = None
498        ThreadableTest.clientTearDown(self)
499
500
501class ConnectedStreamTestMixin(SocketListeningTestMixin,
502                               ThreadedSocketTestMixin):
503    """Mixin to allow client/server stream tests with connected client.
504
505    Server's socket representing connection to client is self.cli_conn
506    and client's connection to server is self.serv_conn.  (Based on
507    SocketConnectedTest.)
508    """
509
510    def setUp(self):
511        super().setUp()
512        # Indicate explicitly we're ready for the client thread to
513        # proceed and then perform the blocking call to accept
514        self.serverExplicitReady()
515        conn, addr = self.serv.accept()
516        self.cli_conn = conn
517
518    def tearDown(self):
519        self.cli_conn.close()
520        self.cli_conn = None
521        super().tearDown()
522
523    def clientSetUp(self):
524        super().clientSetUp()
525        self.cli.connect(self.serv_addr)
526        self.serv_conn = self.cli
527
528    def clientTearDown(self):
529        try:
530            self.serv_conn.close()
531            self.serv_conn = None
532        except AttributeError:
533            pass
534        super().clientTearDown()
535
536
537class UnixSocketTestBase(SocketTestBase):
538    """Base class for Unix-domain socket tests."""
539
540    # This class is used for file descriptor passing tests, so we
541    # create the sockets in a private directory so that other users
542    # can't send anything that might be problematic for a privileged
543    # user running the tests.
544
545    def setUp(self):
546        self.dir_path = tempfile.mkdtemp()
547        self.addCleanup(os.rmdir, self.dir_path)
548        super().setUp()
549
550    def bindSock(self, sock):
551        path = tempfile.mktemp(dir=self.dir_path)
552        support.bind_unix_socket(sock, path)
553        self.addCleanup(support.unlink, path)
554
555class UnixStreamBase(UnixSocketTestBase):
556    """Base class for Unix-domain SOCK_STREAM tests."""
557
558    def newSocket(self):
559        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
560
561
562class InetTestBase(SocketTestBase):
563    """Base class for IPv4 socket tests."""
564
565    host = HOST
566
567    def setUp(self):
568        super().setUp()
569        self.port = self.serv_addr[1]
570
571    def bindSock(self, sock):
572        support.bind_port(sock, host=self.host)
573
574class TCPTestBase(InetTestBase):
575    """Base class for TCP-over-IPv4 tests."""
576
577    def newSocket(self):
578        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
579
580class UDPTestBase(InetTestBase):
581    """Base class for UDP-over-IPv4 tests."""
582
583    def newSocket(self):
584        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
585
586class SCTPStreamBase(InetTestBase):
587    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
588
589    def newSocket(self):
590        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
591                             socket.IPPROTO_SCTP)
592
593
594class Inet6TestBase(InetTestBase):
595    """Base class for IPv6 socket tests."""
596
597    host = support.HOSTv6
598
599class UDP6TestBase(Inet6TestBase):
600    """Base class for UDP-over-IPv6 tests."""
601
602    def newSocket(self):
603        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
604
605
606# Test-skipping decorators for use with ThreadableTest.
607
608def skipWithClientIf(condition, reason):
609    """Skip decorated test if condition is true, add client_skip decorator.
610
611    If the decorated object is not a class, sets its attribute
612    "client_skip" to a decorator which will return an empty function
613    if the test is to be skipped, or the original function if it is
614    not.  This can be used to avoid running the client part of a
615    skipped test when using ThreadableTest.
616    """
617    def client_pass(*args, **kwargs):
618        pass
619    def skipdec(obj):
620        retval = unittest.skip(reason)(obj)
621        if not isinstance(obj, type):
622            retval.client_skip = lambda f: client_pass
623        return retval
624    def noskipdec(obj):
625        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
626            obj.client_skip = lambda f: f
627        return obj
628    return skipdec if condition else noskipdec
629
630
631def requireAttrs(obj, *attributes):
632    """Skip decorated test if obj is missing any of the given attributes.
633
634    Sets client_skip attribute as skipWithClientIf() does.
635    """
636    missing = [name for name in attributes if not hasattr(obj, name)]
637    return skipWithClientIf(
638        missing, "don't have " + ", ".join(name for name in missing))
639
640
641def requireSocket(*args):
642    """Skip decorated test if a socket cannot be created with given arguments.
643
644    When an argument is given as a string, will use the value of that
645    attribute of the socket module, or skip the test if it doesn't
646    exist.  Sets client_skip attribute as skipWithClientIf() does.
647    """
648    err = None
649    missing = [obj for obj in args if
650               isinstance(obj, str) and not hasattr(socket, obj)]
651    if missing:
652        err = "don't have " + ", ".join(name for name in missing)
653    else:
654        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
655                    for obj in args]
656        try:
657            s = socket.socket(*callargs)
658        except OSError as e:
659            # XXX: check errno?
660            err = str(e)
661        else:
662            s.close()
663    return skipWithClientIf(
664        err is not None,
665        "can't create socket({0}): {1}".format(
666            ", ".join(str(o) for o in args), err))
667
668
669#######################################################################
670## Begin Tests
671
672class GeneralModuleTests(unittest.TestCase):
673
674    def test_SocketType_is_socketobject(self):
675        import _socket
676        self.assertTrue(socket.SocketType is _socket.socket)
677        s = socket.socket()
678        self.assertIsInstance(s, socket.SocketType)
679        s.close()
680
681    def test_repr(self):
682        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
683        with s:
684            self.assertIn('fd=%i' % s.fileno(), repr(s))
685            self.assertIn('family=%s' % socket.AF_INET, repr(s))
686            self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
687            self.assertIn('proto=0', repr(s))
688            self.assertNotIn('raddr', repr(s))
689            s.bind(('127.0.0.1', 0))
690            self.assertIn('laddr', repr(s))
691            self.assertIn(str(s.getsockname()), repr(s))
692        self.assertIn('[closed]', repr(s))
693        self.assertNotIn('laddr', repr(s))
694
695    @unittest.skipUnless(_socket is not None, 'need _socket module')
696    def test_csocket_repr(self):
697        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
698        try:
699            expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
700                        % (s.fileno(), s.family, s.type, s.proto))
701            self.assertEqual(repr(s), expected)
702        finally:
703            s.close()
704        expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
705                    % (s.family, s.type, s.proto))
706        self.assertEqual(repr(s), expected)
707
708    def test_weakref(self):
709        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
710        p = proxy(s)
711        self.assertEqual(p.fileno(), s.fileno())
712        s.close()
713        s = None
714        try:
715            p.fileno()
716        except ReferenceError:
717            pass
718        else:
719            self.fail('Socket proxy still exists')
720
721    def testSocketError(self):
722        # Testing socket module exceptions
723        msg = "Error raising socket exception (%s)."
724        with self.assertRaises(OSError, msg=msg % 'OSError'):
725            raise OSError
726        with self.assertRaises(OSError, msg=msg % 'socket.herror'):
727            raise socket.herror
728        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
729            raise socket.gaierror
730
731    def testSendtoErrors(self):
732        # Testing that sendto doesn't mask failures. See #10169.
733        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
734        self.addCleanup(s.close)
735        s.bind(('', 0))
736        sockname = s.getsockname()
737        # 2 args
738        with self.assertRaises(TypeError) as cm:
739            s.sendto('\u2620', sockname)
740        self.assertEqual(str(cm.exception),
741                         "a bytes-like object is required, not 'str'")
742        with self.assertRaises(TypeError) as cm:
743            s.sendto(5j, sockname)
744        self.assertEqual(str(cm.exception),
745                         "a bytes-like object is required, not 'complex'")
746        with self.assertRaises(TypeError) as cm:
747            s.sendto(b'foo', None)
748        self.assertIn('not NoneType',str(cm.exception))
749        # 3 args
750        with self.assertRaises(TypeError) as cm:
751            s.sendto('\u2620', 0, sockname)
752        self.assertEqual(str(cm.exception),
753                         "a bytes-like object is required, not 'str'")
754        with self.assertRaises(TypeError) as cm:
755            s.sendto(5j, 0, sockname)
756        self.assertEqual(str(cm.exception),
757                         "a bytes-like object is required, not 'complex'")
758        with self.assertRaises(TypeError) as cm:
759            s.sendto(b'foo', 0, None)
760        self.assertIn('not NoneType', str(cm.exception))
761        with self.assertRaises(TypeError) as cm:
762            s.sendto(b'foo', 'bar', sockname)
763        self.assertIn('an integer is required', str(cm.exception))
764        with self.assertRaises(TypeError) as cm:
765            s.sendto(b'foo', None, None)
766        self.assertIn('an integer is required', str(cm.exception))
767        # wrong number of args
768        with self.assertRaises(TypeError) as cm:
769            s.sendto(b'foo')
770        self.assertIn('(1 given)', str(cm.exception))
771        with self.assertRaises(TypeError) as cm:
772            s.sendto(b'foo', 0, sockname, 4)
773        self.assertIn('(4 given)', str(cm.exception))
774
775    def testCrucialConstants(self):
776        # Testing for mission critical constants
777        socket.AF_INET
778        socket.SOCK_STREAM
779        socket.SOCK_DGRAM
780        socket.SOCK_RAW
781        socket.SOCK_RDM
782        socket.SOCK_SEQPACKET
783        socket.SOL_SOCKET
784        socket.SO_REUSEADDR
785
786    def testHostnameRes(self):
787        # Testing hostname resolution mechanisms
788        hostname = socket.gethostname()
789        try:
790            ip = socket.gethostbyname(hostname)
791        except OSError:
792            # Probably name lookup wasn't set up right; skip this test
793            self.skipTest('name lookup failure')
794        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
795        try:
796            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
797        except OSError:
798            # Probably a similar problem as above; skip this test
799            self.skipTest('name lookup failure')
800        all_host_names = [hostname, hname] + aliases
801        fqhn = socket.getfqdn(ip)
802        if not fqhn in all_host_names:
803            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
804
805    def test_host_resolution(self):
806        for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
807                     '1:1:1:1:1:1:1:1:1']:
808            self.assertRaises(OSError, socket.gethostbyname, addr)
809            self.assertRaises(OSError, socket.gethostbyaddr, addr)
810
811        for addr in [support.HOST, '10.0.0.1', '255.255.255.255']:
812            self.assertEqual(socket.gethostbyname(addr), addr)
813
814        # we don't test support.HOSTv6 because there's a chance it doesn't have
815        # a matching name entry (e.g. 'ip6-localhost')
816        for host in [support.HOST]:
817            self.assertIn(host, socket.gethostbyaddr(host)[2])
818
819    @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
820    @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
821    def test_sethostname(self):
822        oldhn = socket.gethostname()
823        try:
824            socket.sethostname('new')
825        except OSError as e:
826            if e.errno == errno.EPERM:
827                self.skipTest("test should be run as root")
828            else:
829                raise
830        try:
831            # running test as root!
832            self.assertEqual(socket.gethostname(), 'new')
833            # Should work with bytes objects too
834            socket.sethostname(b'bar')
835            self.assertEqual(socket.gethostname(), 'bar')
836        finally:
837            socket.sethostname(oldhn)
838
839    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
840                         'socket.if_nameindex() not available.')
841    def testInterfaceNameIndex(self):
842        interfaces = socket.if_nameindex()
843        for index, name in interfaces:
844            self.assertIsInstance(index, int)
845            self.assertIsInstance(name, str)
846            # interface indices are non-zero integers
847            self.assertGreater(index, 0)
848            _index = socket.if_nametoindex(name)
849            self.assertIsInstance(_index, int)
850            self.assertEqual(index, _index)
851            _name = socket.if_indextoname(index)
852            self.assertIsInstance(_name, str)
853            self.assertEqual(name, _name)
854
855    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
856                         'socket.if_nameindex() not available.')
857    def testInvalidInterfaceNameIndex(self):
858        # test nonexistent interface index/name
859        self.assertRaises(OSError, socket.if_indextoname, 0)
860        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
861        # test with invalid values
862        self.assertRaises(TypeError, socket.if_nametoindex, 0)
863        self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
864
865    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
866                         'test needs sys.getrefcount()')
867    def testRefCountGetNameInfo(self):
868        # Testing reference count for getnameinfo
869        try:
870            # On some versions, this loses a reference
871            orig = sys.getrefcount(__name__)
872            socket.getnameinfo(__name__,0)
873        except TypeError:
874            if sys.getrefcount(__name__) != orig:
875                self.fail("socket.getnameinfo loses a reference")
876
877    def testInterpreterCrash(self):
878        # Making sure getnameinfo doesn't crash the interpreter
879        try:
880            # On some versions, this crashes the interpreter.
881            socket.getnameinfo(('x', 0, 0, 0), 0)
882        except OSError:
883            pass
884
885    def testNtoH(self):
886        # This just checks that htons etc. are their own inverse,
887        # when looking at the lower 16 or 32 bits.
888        sizes = {socket.htonl: 32, socket.ntohl: 32,
889                 socket.htons: 16, socket.ntohs: 16}
890        for func, size in sizes.items():
891            mask = (1<<size) - 1
892            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
893                self.assertEqual(i & mask, func(func(i&mask)) & mask)
894
895            swapped = func(mask)
896            self.assertEqual(swapped & mask, mask)
897            self.assertRaises(OverflowError, func, 1<<34)
898
899    def testNtoHErrors(self):
900        good_values = [ 1, 2, 3, 1, 2, 3 ]
901        bad_values = [ -1, -2, -3, -1, -2, -3 ]
902        for k in good_values:
903            socket.ntohl(k)
904            socket.ntohs(k)
905            socket.htonl(k)
906            socket.htons(k)
907        for k in bad_values:
908            self.assertRaises(OverflowError, socket.ntohl, k)
909            self.assertRaises(OverflowError, socket.ntohs, k)
910            self.assertRaises(OverflowError, socket.htonl, k)
911            self.assertRaises(OverflowError, socket.htons, k)
912
913    def testGetServBy(self):
914        eq = self.assertEqual
915        # Find one service that exists, then check all the related interfaces.
916        # I've ordered this by protocols that have both a tcp and udp
917        # protocol, at least for modern Linuxes.
918        if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
919            or sys.platform in ('linux', 'darwin')):
920            # avoid the 'echo' service on this platform, as there is an
921            # assumption breaking non-standard port/protocol entry
922            services = ('daytime', 'qotd', 'domain')
923        else:
924            services = ('echo', 'daytime', 'domain')
925        for service in services:
926            try:
927                port = socket.getservbyname(service, 'tcp')
928                break
929            except OSError:
930                pass
931        else:
932            raise OSError
933        # Try same call with optional protocol omitted
934        port2 = socket.getservbyname(service)
935        eq(port, port2)
936        # Try udp, but don't barf if it doesn't exist
937        try:
938            udpport = socket.getservbyname(service, 'udp')
939        except OSError:
940            udpport = None
941        else:
942            eq(udpport, port)
943        # Now make sure the lookup by port returns the same service name
944        eq(socket.getservbyport(port2), service)
945        eq(socket.getservbyport(port, 'tcp'), service)
946        if udpport is not None:
947            eq(socket.getservbyport(udpport, 'udp'), service)
948        # Make sure getservbyport does not accept out of range ports.
949        self.assertRaises(OverflowError, socket.getservbyport, -1)
950        self.assertRaises(OverflowError, socket.getservbyport, 65536)
951
952    def testDefaultTimeout(self):
953        # Testing default timeout
954        # The default timeout should initially be None
955        self.assertEqual(socket.getdefaulttimeout(), None)
956        s = socket.socket()
957        self.assertEqual(s.gettimeout(), None)
958        s.close()
959
960        # Set the default timeout to 10, and see if it propagates
961        socket.setdefaulttimeout(10)
962        self.assertEqual(socket.getdefaulttimeout(), 10)
963        s = socket.socket()
964        self.assertEqual(s.gettimeout(), 10)
965        s.close()
966
967        # Reset the default timeout to None, and see if it propagates
968        socket.setdefaulttimeout(None)
969        self.assertEqual(socket.getdefaulttimeout(), None)
970        s = socket.socket()
971        self.assertEqual(s.gettimeout(), None)
972        s.close()
973
974        # Check that setting it to an invalid value raises ValueError
975        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
976
977        # Check that setting it to an invalid type raises TypeError
978        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
979
980    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
981                         'test needs socket.inet_aton()')
982    def testIPv4_inet_aton_fourbytes(self):
983        # Test that issue1008086 and issue767150 are fixed.
984        # It must return 4 bytes.
985        self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
986        self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
987
988    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
989                         'test needs socket.inet_pton()')
990    def testIPv4toString(self):
991        from socket import inet_aton as f, inet_pton, AF_INET
992        g = lambda a: inet_pton(AF_INET, a)
993
994        assertInvalid = lambda func,a: self.assertRaises(
995            (OSError, ValueError), func, a
996        )
997
998        self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
999        self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1000        self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1001        self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1002        self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1003        assertInvalid(f, '0.0.0.')
1004        assertInvalid(f, '300.0.0.0')
1005        assertInvalid(f, 'a.0.0.0')
1006        assertInvalid(f, '1.2.3.4.5')
1007        assertInvalid(f, '::1')
1008
1009        self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1010        self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1011        self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1012        self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1013        assertInvalid(g, '0.0.0.')
1014        assertInvalid(g, '300.0.0.0')
1015        assertInvalid(g, 'a.0.0.0')
1016        assertInvalid(g, '1.2.3.4.5')
1017        assertInvalid(g, '::1')
1018
1019    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1020                         'test needs socket.inet_pton()')
1021    def testIPv6toString(self):
1022        try:
1023            from socket import inet_pton, AF_INET6, has_ipv6
1024            if not has_ipv6:
1025                self.skipTest('IPv6 not available')
1026        except ImportError:
1027            self.skipTest('could not import needed symbols from socket')
1028
1029        if sys.platform == "win32":
1030            try:
1031                inet_pton(AF_INET6, '::')
1032            except OSError as e:
1033                if e.winerror == 10022:
1034                    self.skipTest('IPv6 might not be supported')
1035
1036        f = lambda a: inet_pton(AF_INET6, a)
1037        assertInvalid = lambda a: self.assertRaises(
1038            (OSError, ValueError), f, a
1039        )
1040
1041        self.assertEqual(b'\x00' * 16, f('::'))
1042        self.assertEqual(b'\x00' * 16, f('0::0'))
1043        self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1044        self.assertEqual(
1045            b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1046            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1047        )
1048        self.assertEqual(
1049            b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1050            f('ad42:abc::127:0:254:2')
1051        )
1052        self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1053        assertInvalid('0x20::')
1054        assertInvalid(':::')
1055        assertInvalid('::0::')
1056        assertInvalid('1::abc::')
1057        assertInvalid('1::abc::def')
1058        assertInvalid('1:2:3:4:5:6:')
1059        assertInvalid('1:2:3:4:5:6')
1060        assertInvalid('1:2:3:4:5:6:7:8:')
1061        assertInvalid('1:2:3:4:5:6:7:8:0')
1062
1063        self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1064            f('::254.42.23.64')
1065        )
1066        self.assertEqual(
1067            b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1068            f('42::a29b:254.42.23.64')
1069        )
1070        self.assertEqual(
1071            b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1072            f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1073        )
1074        assertInvalid('255.254.253.252')
1075        assertInvalid('1::260.2.3.0')
1076        assertInvalid('1::0.be.e.0')
1077        assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1078        assertInvalid('::1.2.3.4:0')
1079        assertInvalid('0.100.200.0:3:4:5:6:7:8')
1080
1081    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1082                         'test needs socket.inet_ntop()')
1083    def testStringToIPv4(self):
1084        from socket import inet_ntoa as f, inet_ntop, AF_INET
1085        g = lambda a: inet_ntop(AF_INET, a)
1086        assertInvalid = lambda func,a: self.assertRaises(
1087            (OSError, ValueError), func, a
1088        )
1089
1090        self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1091        self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1092        self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1093        self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1094        assertInvalid(f, b'\x00' * 3)
1095        assertInvalid(f, b'\x00' * 5)
1096        assertInvalid(f, b'\x00' * 16)
1097        self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1098
1099        self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1100        self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1101        self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1102        assertInvalid(g, b'\x00' * 3)
1103        assertInvalid(g, b'\x00' * 5)
1104        assertInvalid(g, b'\x00' * 16)
1105        self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1106
1107    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1108                         'test needs socket.inet_ntop()')
1109    def testStringToIPv6(self):
1110        try:
1111            from socket import inet_ntop, AF_INET6, has_ipv6
1112            if not has_ipv6:
1113                self.skipTest('IPv6 not available')
1114        except ImportError:
1115            self.skipTest('could not import needed symbols from socket')
1116
1117        if sys.platform == "win32":
1118            try:
1119                inet_ntop(AF_INET6, b'\x00' * 16)
1120            except OSError as e:
1121                if e.winerror == 10022:
1122                    self.skipTest('IPv6 might not be supported')
1123
1124        f = lambda a: inet_ntop(AF_INET6, a)
1125        assertInvalid = lambda a: self.assertRaises(
1126            (OSError, ValueError), f, a
1127        )
1128
1129        self.assertEqual('::', f(b'\x00' * 16))
1130        self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1131        self.assertEqual(
1132            'aef:b01:506:1001:ffff:9997:55:170',
1133            f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1134        )
1135        self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1136
1137        assertInvalid(b'\x12' * 15)
1138        assertInvalid(b'\x12' * 17)
1139        assertInvalid(b'\x12' * 4)
1140
1141    # XXX The following don't test module-level functionality...
1142
1143    def testSockName(self):
1144        # Testing getsockname()
1145        port = support.find_unused_port()
1146        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1147        self.addCleanup(sock.close)
1148        sock.bind(("0.0.0.0", port))
1149        name = sock.getsockname()
1150        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1151        # it reasonable to get the host's addr in addition to 0.0.0.0.
1152        # At least for eCos.  This is required for the S/390 to pass.
1153        try:
1154            my_ip_addr = socket.gethostbyname(socket.gethostname())
1155        except OSError:
1156            # Probably name lookup wasn't set up right; skip this test
1157            self.skipTest('name lookup failure')
1158        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1159        self.assertEqual(name[1], port)
1160
1161    def testGetSockOpt(self):
1162        # Testing getsockopt()
1163        # We know a socket should start without reuse==0
1164        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1165        self.addCleanup(sock.close)
1166        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1167        self.assertFalse(reuse != 0, "initial mode is reuse")
1168
1169    def testSetSockOpt(self):
1170        # Testing setsockopt()
1171        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1172        self.addCleanup(sock.close)
1173        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1174        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1175        self.assertFalse(reuse == 0, "failed to set reuse mode")
1176
1177    def testSendAfterClose(self):
1178        # testing send() after close() with timeout
1179        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1180        sock.settimeout(1)
1181        sock.close()
1182        self.assertRaises(OSError, sock.send, b"spam")
1183
1184    def testCloseException(self):
1185        sock = socket.socket()
1186        socket.socket(fileno=sock.fileno()).close()
1187        try:
1188            sock.close()
1189        except OSError as err:
1190            # Winsock apparently raises ENOTSOCK
1191            self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1192        else:
1193            self.fail("close() should raise EBADF/ENOTSOCK")
1194
1195    def testNewAttributes(self):
1196        # testing .family, .type and .protocol
1197
1198        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1199        self.assertEqual(sock.family, socket.AF_INET)
1200        if hasattr(socket, 'SOCK_CLOEXEC'):
1201            self.assertIn(sock.type,
1202                          (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1203                           socket.SOCK_STREAM))
1204        else:
1205            self.assertEqual(sock.type, socket.SOCK_STREAM)
1206        self.assertEqual(sock.proto, 0)
1207        sock.close()
1208
1209    def test_getsockaddrarg(self):
1210        sock = socket.socket()
1211        self.addCleanup(sock.close)
1212        port = support.find_unused_port()
1213        big_port = port + 65536
1214        neg_port = port - 65536
1215        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1216        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1217        # Since find_unused_port() is inherently subject to race conditions, we
1218        # call it a couple times if necessary.
1219        for i in itertools.count():
1220            port = support.find_unused_port()
1221            try:
1222                sock.bind((HOST, port))
1223            except OSError as e:
1224                if e.errno != errno.EADDRINUSE or i == 5:
1225                    raise
1226            else:
1227                break
1228
1229    @unittest.skipUnless(os.name == "nt", "Windows specific")
1230    def test_sock_ioctl(self):
1231        self.assertTrue(hasattr(socket.socket, 'ioctl'))
1232        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1233        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1234        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1235        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1236        s = socket.socket()
1237        self.addCleanup(s.close)
1238        self.assertRaises(ValueError, s.ioctl, -1, None)
1239        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1240
1241    @unittest.skipUnless(os.name == "nt", "Windows specific")
1242    @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1243                         'Loopback fast path support required for this test')
1244    def test_sio_loopback_fast_path(self):
1245        s = socket.socket()
1246        self.addCleanup(s.close)
1247        try:
1248            s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1249        except OSError as exc:
1250            WSAEOPNOTSUPP = 10045
1251            if exc.winerror == WSAEOPNOTSUPP:
1252                self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1253                              "doesn't implemented in this Windows version")
1254            raise
1255        self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1256
1257    def testGetaddrinfo(self):
1258        try:
1259            socket.getaddrinfo('localhost', 80)
1260        except socket.gaierror as err:
1261            if err.errno == socket.EAI_SERVICE:
1262                # see http://bugs.python.org/issue1282647
1263                self.skipTest("buggy libc version")
1264            raise
1265        # len of every sequence is supposed to be == 5
1266        for info in socket.getaddrinfo(HOST, None):
1267            self.assertEqual(len(info), 5)
1268        # host can be a domain name, a string representation of an
1269        # IPv4/v6 address or None
1270        socket.getaddrinfo('localhost', 80)
1271        socket.getaddrinfo('127.0.0.1', 80)
1272        socket.getaddrinfo(None, 80)
1273        if support.IPV6_ENABLED:
1274            socket.getaddrinfo('::1', 80)
1275        # port can be a string service name such as "http", a numeric
1276        # port number or None
1277        socket.getaddrinfo(HOST, "http")
1278        socket.getaddrinfo(HOST, 80)
1279        socket.getaddrinfo(HOST, None)
1280        # test family and socktype filters
1281        infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1282        for family, type, _, _, _ in infos:
1283            self.assertEqual(family, socket.AF_INET)
1284            self.assertEqual(str(family), 'AddressFamily.AF_INET')
1285            self.assertEqual(type, socket.SOCK_STREAM)
1286            self.assertEqual(str(type), 'SocketKind.SOCK_STREAM')
1287        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1288        for _, socktype, _, _, _ in infos:
1289            self.assertEqual(socktype, socket.SOCK_STREAM)
1290        # test proto and flags arguments
1291        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1292        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1293        # a server willing to support both IPv4 and IPv6 will
1294        # usually do this
1295        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1296                           socket.AI_PASSIVE)
1297        # test keyword arguments
1298        a = socket.getaddrinfo(HOST, None)
1299        b = socket.getaddrinfo(host=HOST, port=None)
1300        self.assertEqual(a, b)
1301        a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1302        b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1303        self.assertEqual(a, b)
1304        a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1305        b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1306        self.assertEqual(a, b)
1307        a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1308        b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1309        self.assertEqual(a, b)
1310        a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1311        b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1312        self.assertEqual(a, b)
1313        a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1314                               socket.AI_PASSIVE)
1315        b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1316                               type=socket.SOCK_STREAM, proto=0,
1317                               flags=socket.AI_PASSIVE)
1318        self.assertEqual(a, b)
1319        # Issue #6697.
1320        self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1321
1322        # Issue 17269: test workaround for OS X platform bug segfault
1323        if hasattr(socket, 'AI_NUMERICSERV'):
1324            try:
1325                # The arguments here are undefined and the call may succeed
1326                # or fail.  All we care here is that it doesn't segfault.
1327                socket.getaddrinfo("localhost", None, 0, 0, 0,
1328                                   socket.AI_NUMERICSERV)
1329            except socket.gaierror:
1330                pass
1331
1332    def test_getnameinfo(self):
1333        # only IP addresses are allowed
1334        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1335
1336    @unittest.skipUnless(support.is_resource_enabled('network'),
1337                         'network is not enabled')
1338    def test_idna(self):
1339        # Check for internet access before running test
1340        # (issue #12804, issue #25138).
1341        with support.transient_internet('python.org'):
1342            socket.gethostbyname('python.org')
1343
1344        # these should all be successful
1345        domain = 'испытание.pythontest.net'
1346        socket.gethostbyname(domain)
1347        socket.gethostbyname_ex(domain)
1348        socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1349        # this may not work if the forward lookup choses the IPv6 address, as that doesn't
1350        # have a reverse entry yet
1351        # socket.gethostbyaddr('испытание.python.org')
1352
1353    def check_sendall_interrupted(self, with_timeout):
1354        # socketpair() is not strictly required, but it makes things easier.
1355        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1356            self.skipTest("signal.alarm and socket.socketpair required for this test")
1357        # Our signal handlers clobber the C errno by calling a math function
1358        # with an invalid domain value.
1359        def ok_handler(*args):
1360            self.assertRaises(ValueError, math.acosh, 0)
1361        def raising_handler(*args):
1362            self.assertRaises(ValueError, math.acosh, 0)
1363            1 // 0
1364        c, s = socket.socketpair()
1365        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1366        try:
1367            if with_timeout:
1368                # Just above the one second minimum for signal.alarm
1369                c.settimeout(1.5)
1370            with self.assertRaises(ZeroDivisionError):
1371                signal.alarm(1)
1372                c.sendall(b"x" * support.SOCK_MAX_SIZE)
1373            if with_timeout:
1374                signal.signal(signal.SIGALRM, ok_handler)
1375                signal.alarm(1)
1376                self.assertRaises(socket.timeout, c.sendall,
1377                                  b"x" * support.SOCK_MAX_SIZE)
1378        finally:
1379            signal.alarm(0)
1380            signal.signal(signal.SIGALRM, old_alarm)
1381            c.close()
1382            s.close()
1383
1384    def test_sendall_interrupted(self):
1385        self.check_sendall_interrupted(False)
1386
1387    def test_sendall_interrupted_with_timeout(self):
1388        self.check_sendall_interrupted(True)
1389
1390    def test_dealloc_warn(self):
1391        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1392        r = repr(sock)
1393        with self.assertWarns(ResourceWarning) as cm:
1394            sock = None
1395            support.gc_collect()
1396        self.assertIn(r, str(cm.warning.args[0]))
1397        # An open socket file object gets dereferenced after the socket
1398        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1399        f = sock.makefile('rb')
1400        r = repr(sock)
1401        sock = None
1402        support.gc_collect()
1403        with self.assertWarns(ResourceWarning):
1404            f = None
1405            support.gc_collect()
1406
1407    def test_name_closed_socketio(self):
1408        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1409            fp = sock.makefile("rb")
1410            fp.close()
1411            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1412
1413    def test_unusable_closed_socketio(self):
1414        with socket.socket() as sock:
1415            fp = sock.makefile("rb", buffering=0)
1416            self.assertTrue(fp.readable())
1417            self.assertFalse(fp.writable())
1418            self.assertFalse(fp.seekable())
1419            fp.close()
1420            self.assertRaises(ValueError, fp.readable)
1421            self.assertRaises(ValueError, fp.writable)
1422            self.assertRaises(ValueError, fp.seekable)
1423
1424    def test_makefile_mode(self):
1425        for mode in 'r', 'rb', 'rw', 'w', 'wb':
1426            with self.subTest(mode=mode):
1427                with socket.socket() as sock:
1428                    with sock.makefile(mode) as fp:
1429                        self.assertEqual(fp.mode, mode)
1430
1431    def test_makefile_invalid_mode(self):
1432        for mode in 'rt', 'x', '+', 'a':
1433            with self.subTest(mode=mode):
1434                with socket.socket() as sock:
1435                    with self.assertRaisesRegex(ValueError, 'invalid mode'):
1436                        sock.makefile(mode)
1437
1438    def test_pickle(self):
1439        sock = socket.socket()
1440        with sock:
1441            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1442                self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1443        for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1444            family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1445            self.assertEqual(family, socket.AF_INET)
1446            type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1447            self.assertEqual(type, socket.SOCK_STREAM)
1448
1449    def test_listen_backlog(self):
1450        for backlog in 0, -1:
1451            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1452                srv.bind((HOST, 0))
1453                srv.listen(backlog)
1454
1455        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1456            srv.bind((HOST, 0))
1457            srv.listen()
1458
1459    @support.cpython_only
1460    def test_listen_backlog_overflow(self):
1461        # Issue 15989
1462        import _testcapi
1463        srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1464        srv.bind((HOST, 0))
1465        self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1466        srv.close()
1467
1468    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1469    def test_flowinfo(self):
1470        self.assertRaises(OverflowError, socket.getnameinfo,
1471                          (support.HOSTv6, 0, 0xffffffff), 0)
1472        with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1473            self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10))
1474
1475    def test_str_for_enums(self):
1476        # Make sure that the AF_* and SOCK_* constants have enum-like string
1477        # reprs.
1478        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1479            self.assertEqual(str(s.family), 'AddressFamily.AF_INET')
1480            self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM')
1481
1482    @unittest.skipIf(os.name == 'nt', 'Will not work on Windows')
1483    def test_uknown_socket_family_repr(self):
1484        # Test that when created with a family that's not one of the known
1485        # AF_*/SOCK_* constants, socket.family just returns the number.
1486        #
1487        # To do this we fool socket.socket into believing it already has an
1488        # open fd because on this path it doesn't actually verify the family and
1489        # type and populates the socket object.
1490        #
1491        # On Windows this trick won't work, so the test is skipped.
1492        fd, path = tempfile.mkstemp()
1493        self.addCleanup(os.unlink, path)
1494        with socket.socket(family=42424, type=13331, fileno=fd) as s:
1495            self.assertEqual(s.family, 42424)
1496            self.assertEqual(s.type, 13331)
1497
1498    @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1499    def test__sendfile_use_sendfile(self):
1500        class File:
1501            def __init__(self, fd):
1502                self.fd = fd
1503
1504            def fileno(self):
1505                return self.fd
1506        with socket.socket() as sock:
1507            fd = os.open(os.curdir, os.O_RDONLY)
1508            os.close(fd)
1509            with self.assertRaises(socket._GiveupOnSendfile):
1510                sock._sendfile_use_sendfile(File(fd))
1511            with self.assertRaises(OverflowError):
1512                sock._sendfile_use_sendfile(File(2**1000))
1513            with self.assertRaises(TypeError):
1514                sock._sendfile_use_sendfile(File(None))
1515
1516
1517@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1518class BasicCANTest(unittest.TestCase):
1519
1520    def testCrucialConstants(self):
1521        socket.AF_CAN
1522        socket.PF_CAN
1523        socket.CAN_RAW
1524
1525    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1526                         'socket.CAN_BCM required for this test.')
1527    def testBCMConstants(self):
1528        socket.CAN_BCM
1529
1530        # opcodes
1531        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task
1532        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task
1533        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task
1534        socket.CAN_BCM_TX_SEND      # send one CAN frame
1535        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription
1536        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription
1537        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription
1538        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request
1539        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0)
1540        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request
1541        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent
1542        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change)
1543
1544    def testCreateSocket(self):
1545        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1546            pass
1547
1548    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1549                         'socket.CAN_BCM required for this test.')
1550    def testCreateBCMSocket(self):
1551        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
1552            pass
1553
1554    def testBindAny(self):
1555        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1556            s.bind(('', ))
1557
1558    def testTooLongInterfaceName(self):
1559        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
1560        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1561            self.assertRaisesRegex(OSError, 'interface name too long',
1562                                   s.bind, ('x' * 1024,))
1563
1564    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
1565                         'socket.CAN_RAW_LOOPBACK required for this test.')
1566    def testLoopback(self):
1567        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1568            for loopback in (0, 1):
1569                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
1570                             loopback)
1571                self.assertEqual(loopback,
1572                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
1573
1574    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
1575                         'socket.CAN_RAW_FILTER required for this test.')
1576    def testFilter(self):
1577        can_id, can_mask = 0x200, 0x700
1578        can_filter = struct.pack("=II", can_id, can_mask)
1579        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1580            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
1581            self.assertEqual(can_filter,
1582                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
1583            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
1584
1585
1586@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1587@unittest.skipUnless(thread, 'Threading required for this test.')
1588class CANTest(ThreadedCANSocketTest):
1589
1590    def __init__(self, methodName='runTest'):
1591        ThreadedCANSocketTest.__init__(self, methodName=methodName)
1592
1593    @classmethod
1594    def build_can_frame(cls, can_id, data):
1595        """Build a CAN frame."""
1596        can_dlc = len(data)
1597        data = data.ljust(8, b'\x00')
1598        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
1599
1600    @classmethod
1601    def dissect_can_frame(cls, frame):
1602        """Dissect a CAN frame."""
1603        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
1604        return (can_id, can_dlc, data[:can_dlc])
1605
1606    def testSendFrame(self):
1607        cf, addr = self.s.recvfrom(self.bufsize)
1608        self.assertEqual(self.cf, cf)
1609        self.assertEqual(addr[0], self.interface)
1610        self.assertEqual(addr[1], socket.AF_CAN)
1611
1612    def _testSendFrame(self):
1613        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
1614        self.cli.send(self.cf)
1615
1616    def testSendMaxFrame(self):
1617        cf, addr = self.s.recvfrom(self.bufsize)
1618        self.assertEqual(self.cf, cf)
1619
1620    def _testSendMaxFrame(self):
1621        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
1622        self.cli.send(self.cf)
1623
1624    def testSendMultiFrames(self):
1625        cf, addr = self.s.recvfrom(self.bufsize)
1626        self.assertEqual(self.cf1, cf)
1627
1628        cf, addr = self.s.recvfrom(self.bufsize)
1629        self.assertEqual(self.cf2, cf)
1630
1631    def _testSendMultiFrames(self):
1632        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
1633        self.cli.send(self.cf1)
1634
1635        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
1636        self.cli.send(self.cf2)
1637
1638    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1639                         'socket.CAN_BCM required for this test.')
1640    def _testBCM(self):
1641        cf, addr = self.cli.recvfrom(self.bufsize)
1642        self.assertEqual(self.cf, cf)
1643        can_id, can_dlc, data = self.dissect_can_frame(cf)
1644        self.assertEqual(self.can_id, can_id)
1645        self.assertEqual(self.data, data)
1646
1647    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1648                         'socket.CAN_BCM required for this test.')
1649    def testBCM(self):
1650        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
1651        self.addCleanup(bcm.close)
1652        bcm.connect((self.interface,))
1653        self.can_id = 0x123
1654        self.data = bytes([0xc0, 0xff, 0xee])
1655        self.cf = self.build_can_frame(self.can_id, self.data)
1656        opcode = socket.CAN_BCM_TX_SEND
1657        flags = 0
1658        count = 0
1659        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
1660        bcm_can_id = 0x0222
1661        nframes = 1
1662        assert len(self.cf) == 16
1663        header = struct.pack(self.bcm_cmd_msg_fmt,
1664                    opcode,
1665                    flags,
1666                    count,
1667                    ival1_seconds,
1668                    ival1_usec,
1669                    ival2_seconds,
1670                    ival2_usec,
1671                    bcm_can_id,
1672                    nframes,
1673                    )
1674        header_plus_frame = header + self.cf
1675        bytes_sent = bcm.send(header_plus_frame)
1676        self.assertEqual(bytes_sent, len(header_plus_frame))
1677
1678
1679@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
1680class BasicRDSTest(unittest.TestCase):
1681
1682    def testCrucialConstants(self):
1683        socket.AF_RDS
1684        socket.PF_RDS
1685
1686    def testCreateSocket(self):
1687        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
1688            pass
1689
1690    def testSocketBufferSize(self):
1691        bufsize = 16384
1692        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
1693            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
1694            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
1695
1696
1697@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
1698@unittest.skipUnless(thread, 'Threading required for this test.')
1699class RDSTest(ThreadedRDSSocketTest):
1700
1701    def __init__(self, methodName='runTest'):
1702        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
1703
1704    def setUp(self):
1705        super().setUp()
1706        self.evt = threading.Event()
1707
1708    def testSendAndRecv(self):
1709        data, addr = self.serv.recvfrom(self.bufsize)
1710        self.assertEqual(self.data, data)
1711        self.assertEqual(self.cli_addr, addr)
1712
1713    def _testSendAndRecv(self):
1714        self.data = b'spam'
1715        self.cli.sendto(self.data, 0, (HOST, self.port))
1716
1717    def testPeek(self):
1718        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
1719        self.assertEqual(self.data, data)
1720        data, addr = self.serv.recvfrom(self.bufsize)
1721        self.assertEqual(self.data, data)
1722
1723    def _testPeek(self):
1724        self.data = b'spam'
1725        self.cli.sendto(self.data, 0, (HOST, self.port))
1726
1727    @requireAttrs(socket.socket, 'recvmsg')
1728    def testSendAndRecvMsg(self):
1729        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
1730        self.assertEqual(self.data, data)
1731
1732    @requireAttrs(socket.socket, 'sendmsg')
1733    def _testSendAndRecvMsg(self):
1734        self.data = b'hello ' * 10
1735        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
1736
1737    def testSendAndRecvMulti(self):
1738        data, addr = self.serv.recvfrom(self.bufsize)
1739        self.assertEqual(self.data1, data)
1740
1741        data, addr = self.serv.recvfrom(self.bufsize)
1742        self.assertEqual(self.data2, data)
1743
1744    def _testSendAndRecvMulti(self):
1745        self.data1 = b'bacon'
1746        self.cli.sendto(self.data1, 0, (HOST, self.port))
1747
1748        self.data2 = b'egg'
1749        self.cli.sendto(self.data2, 0, (HOST, self.port))
1750
1751    def testSelect(self):
1752        r, w, x = select.select([self.serv], [], [], 3.0)
1753        self.assertIn(self.serv, r)
1754        data, addr = self.serv.recvfrom(self.bufsize)
1755        self.assertEqual(self.data, data)
1756
1757    def _testSelect(self):
1758        self.data = b'select'
1759        self.cli.sendto(self.data, 0, (HOST, self.port))
1760
1761    def testCongestion(self):
1762        # wait until the sender is done
1763        self.evt.wait()
1764
1765    def _testCongestion(self):
1766        # test the behavior in case of congestion
1767        self.data = b'fill'
1768        self.cli.setblocking(False)
1769        try:
1770            # try to lower the receiver's socket buffer size
1771            self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
1772        except OSError:
1773            pass
1774        with self.assertRaises(OSError) as cm:
1775            try:
1776                # fill the receiver's socket buffer
1777                while True:
1778                    self.cli.sendto(self.data, 0, (HOST, self.port))
1779            finally:
1780                # signal the receiver we're done
1781                self.evt.set()
1782        # sendto() should have failed with ENOBUFS
1783        self.assertEqual(cm.exception.errno, errno.ENOBUFS)
1784        # and we should have received a congestion notification through poll
1785        r, w, x = select.select([self.serv], [], [], 3.0)
1786        self.assertIn(self.serv, r)
1787
1788
1789@unittest.skipUnless(thread, 'Threading required for this test.')
1790class BasicTCPTest(SocketConnectedTest):
1791
1792    def __init__(self, methodName='runTest'):
1793        SocketConnectedTest.__init__(self, methodName=methodName)
1794
1795    def testRecv(self):
1796        # Testing large receive over TCP
1797        msg = self.cli_conn.recv(1024)
1798        self.assertEqual(msg, MSG)
1799
1800    def _testRecv(self):
1801        self.serv_conn.send(MSG)
1802
1803    def testOverFlowRecv(self):
1804        # Testing receive in chunks over TCP
1805        seg1 = self.cli_conn.recv(len(MSG) - 3)
1806        seg2 = self.cli_conn.recv(1024)
1807        msg = seg1 + seg2
1808        self.assertEqual(msg, MSG)
1809
1810    def _testOverFlowRecv(self):
1811        self.serv_conn.send(MSG)
1812
1813    def testRecvFrom(self):
1814        # Testing large recvfrom() over TCP
1815        msg, addr = self.cli_conn.recvfrom(1024)
1816        self.assertEqual(msg, MSG)
1817
1818    def _testRecvFrom(self):
1819        self.serv_conn.send(MSG)
1820
1821    def testOverFlowRecvFrom(self):
1822        # Testing recvfrom() in chunks over TCP
1823        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
1824        seg2, addr = self.cli_conn.recvfrom(1024)
1825        msg = seg1 + seg2
1826        self.assertEqual(msg, MSG)
1827
1828    def _testOverFlowRecvFrom(self):
1829        self.serv_conn.send(MSG)
1830
1831    def testSendAll(self):
1832        # Testing sendall() with a 2048 byte string over TCP
1833        msg = b''
1834        while 1:
1835            read = self.cli_conn.recv(1024)
1836            if not read:
1837                break
1838            msg += read
1839        self.assertEqual(msg, b'f' * 2048)
1840
1841    def _testSendAll(self):
1842        big_chunk = b'f' * 2048
1843        self.serv_conn.sendall(big_chunk)
1844
1845    def testFromFd(self):
1846        # Testing fromfd()
1847        fd = self.cli_conn.fileno()
1848        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
1849        self.addCleanup(sock.close)
1850        self.assertIsInstance(sock, socket.socket)
1851        msg = sock.recv(1024)
1852        self.assertEqual(msg, MSG)
1853
1854    def _testFromFd(self):
1855        self.serv_conn.send(MSG)
1856
1857    def testDup(self):
1858        # Testing dup()
1859        sock = self.cli_conn.dup()
1860        self.addCleanup(sock.close)
1861        msg = sock.recv(1024)
1862        self.assertEqual(msg, MSG)
1863
1864    def _testDup(self):
1865        self.serv_conn.send(MSG)
1866
1867    def testShutdown(self):
1868        # Testing shutdown()
1869        msg = self.cli_conn.recv(1024)
1870        self.assertEqual(msg, MSG)
1871        # wait for _testShutdown to finish: on OS X, when the server
1872        # closes the connection the client also becomes disconnected,
1873        # and the client's shutdown call will fail. (Issue #4397.)
1874        self.done.wait()
1875
1876    def _testShutdown(self):
1877        self.serv_conn.send(MSG)
1878        self.serv_conn.shutdown(2)
1879
1880    testShutdown_overflow = support.cpython_only(testShutdown)
1881
1882    @support.cpython_only
1883    def _testShutdown_overflow(self):
1884        import _testcapi
1885        self.serv_conn.send(MSG)
1886        # Issue 15989
1887        self.assertRaises(OverflowError, self.serv_conn.shutdown,
1888                          _testcapi.INT_MAX + 1)
1889        self.assertRaises(OverflowError, self.serv_conn.shutdown,
1890                          2 + (_testcapi.UINT_MAX + 1))
1891        self.serv_conn.shutdown(2)
1892
1893    def testDetach(self):
1894        # Testing detach()
1895        fileno = self.cli_conn.fileno()
1896        f = self.cli_conn.detach()
1897        self.assertEqual(f, fileno)
1898        # cli_conn cannot be used anymore...
1899        self.assertTrue(self.cli_conn._closed)
1900        self.assertRaises(OSError, self.cli_conn.recv, 1024)
1901        self.cli_conn.close()
1902        # ...but we can create another socket using the (still open)
1903        # file descriptor
1904        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
1905        self.addCleanup(sock.close)
1906        msg = sock.recv(1024)
1907        self.assertEqual(msg, MSG)
1908
1909    def _testDetach(self):
1910        self.serv_conn.send(MSG)
1911
1912@unittest.skipUnless(thread, 'Threading required for this test.')
1913class BasicUDPTest(ThreadedUDPSocketTest):
1914
1915    def __init__(self, methodName='runTest'):
1916        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
1917
1918    def testSendtoAndRecv(self):
1919        # Testing sendto() and Recv() over UDP
1920        msg = self.serv.recv(len(MSG))
1921        self.assertEqual(msg, MSG)
1922
1923    def _testSendtoAndRecv(self):
1924        self.cli.sendto(MSG, 0, (HOST, self.port))
1925
1926    def testRecvFrom(self):
1927        # Testing recvfrom() over UDP
1928        msg, addr = self.serv.recvfrom(len(MSG))
1929        self.assertEqual(msg, MSG)
1930
1931    def _testRecvFrom(self):
1932        self.cli.sendto(MSG, 0, (HOST, self.port))
1933
1934    def testRecvFromNegative(self):
1935        # Negative lengths passed to recvfrom should give ValueError.
1936        self.assertRaises(ValueError, self.serv.recvfrom, -1)
1937
1938    def _testRecvFromNegative(self):
1939        self.cli.sendto(MSG, 0, (HOST, self.port))
1940
1941# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
1942# same test code is used with different families and types of socket
1943# (e.g. stream, datagram), and tests using recvmsg() are repeated
1944# using recvmsg_into().
1945#
1946# The generic test classes such as SendmsgTests and
1947# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
1948# supplied with sockets cli_sock and serv_sock representing the
1949# client's and the server's end of the connection respectively, and
1950# attributes cli_addr and serv_addr holding their (numeric where
1951# appropriate) addresses.
1952#
1953# The final concrete test classes combine these with subclasses of
1954# SocketTestBase which set up client and server sockets of a specific
1955# type, and with subclasses of SendrecvmsgBase such as
1956# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
1957# sockets to cli_sock and serv_sock and override the methods and
1958# attributes of SendrecvmsgBase to fill in destination addresses if
1959# needed when sending, check for specific flags in msg_flags, etc.
1960#
1961# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
1962# recvmsg_into().
1963
1964# XXX: like the other datagram (UDP) tests in this module, the code
1965# here assumes that datagram delivery on the local machine will be
1966# reliable.
1967
1968class SendrecvmsgBase(ThreadSafeCleanupTestCase):
1969    # Base class for sendmsg()/recvmsg() tests.
1970
1971    # Time in seconds to wait before considering a test failed, or
1972    # None for no timeout.  Not all tests actually set a timeout.
1973    fail_timeout = 3.0
1974
1975    def setUp(self):
1976        self.misc_event = threading.Event()
1977        super().setUp()
1978
1979    def sendToServer(self, msg):
1980        # Send msg to the server.
1981        return self.cli_sock.send(msg)
1982
1983    # Tuple of alternative default arguments for sendmsg() when called
1984    # via sendmsgToServer() (e.g. to include a destination address).
1985    sendmsg_to_server_defaults = ()
1986
1987    def sendmsgToServer(self, *args):
1988        # Call sendmsg() on self.cli_sock with the given arguments,
1989        # filling in any arguments which are not supplied with the
1990        # corresponding items of self.sendmsg_to_server_defaults, if
1991        # any.
1992        return self.cli_sock.sendmsg(
1993            *(args + self.sendmsg_to_server_defaults[len(args):]))
1994
1995    def doRecvmsg(self, sock, bufsize, *args):
1996        # Call recvmsg() on sock with given arguments and return its
1997        # result.  Should be used for tests which can use either
1998        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
1999        # this method with one which emulates it using recvmsg_into(),
2000        # thus allowing the same test to be used for both methods.
2001        result = sock.recvmsg(bufsize, *args)
2002        self.registerRecvmsgResult(result)
2003        return result
2004
2005    def registerRecvmsgResult(self, result):
2006        # Called by doRecvmsg() with the return value of recvmsg() or
2007        # recvmsg_into().  Can be overridden to arrange cleanup based
2008        # on the returned ancillary data, for instance.
2009        pass
2010
2011    def checkRecvmsgAddress(self, addr1, addr2):
2012        # Called to compare the received address with the address of
2013        # the peer.
2014        self.assertEqual(addr1, addr2)
2015
2016    # Flags that are normally unset in msg_flags
2017    msg_flags_common_unset = 0
2018    for name in ("MSG_CTRUNC", "MSG_OOB"):
2019        msg_flags_common_unset |= getattr(socket, name, 0)
2020
2021    # Flags that are normally set
2022    msg_flags_common_set = 0
2023
2024    # Flags set when a complete record has been received (e.g. MSG_EOR
2025    # for SCTP)
2026    msg_flags_eor_indicator = 0
2027
2028    # Flags set when a complete record has not been received
2029    # (e.g. MSG_TRUNC for datagram sockets)
2030    msg_flags_non_eor_indicator = 0
2031
2032    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2033        # Method to check the value of msg_flags returned by recvmsg[_into]().
2034        #
2035        # Checks that all bits in msg_flags_common_set attribute are
2036        # set in "flags" and all bits in msg_flags_common_unset are
2037        # unset.
2038        #
2039        # The "eor" argument specifies whether the flags should
2040        # indicate that a full record (or datagram) has been received.
2041        # If "eor" is None, no checks are done; otherwise, checks
2042        # that:
2043        #
2044        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
2045        #    set and all bits in msg_flags_non_eor_indicator are unset
2046        #
2047        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
2048        #    are set and all bits in msg_flags_eor_indicator are unset
2049        #
2050        # If "checkset" and/or "checkunset" are supplied, they require
2051        # the given bits to be set or unset respectively, overriding
2052        # what the attributes require for those bits.
2053        #
2054        # If any bits are set in "ignore", they will not be checked,
2055        # regardless of the other inputs.
2056        #
2057        # Will raise Exception if the inputs require a bit to be both
2058        # set and unset, and it is not ignored.
2059
2060        defaultset = self.msg_flags_common_set
2061        defaultunset = self.msg_flags_common_unset
2062
2063        if eor:
2064            defaultset |= self.msg_flags_eor_indicator
2065            defaultunset |= self.msg_flags_non_eor_indicator
2066        elif eor is not None:
2067            defaultset |= self.msg_flags_non_eor_indicator
2068            defaultunset |= self.msg_flags_eor_indicator
2069
2070        # Function arguments override defaults
2071        defaultset &= ~checkunset
2072        defaultunset &= ~checkset
2073
2074        # Merge arguments with remaining defaults, and check for conflicts
2075        checkset |= defaultset
2076        checkunset |= defaultunset
2077        inboth = checkset & checkunset & ~ignore
2078        if inboth:
2079            raise Exception("contradictory set, unset requirements for flags "
2080                            "{0:#x}".format(inboth))
2081
2082        # Compare with given msg_flags value
2083        mask = (checkset | checkunset) & ~ignore
2084        self.assertEqual(flags & mask, checkset & mask)
2085
2086
2087class RecvmsgIntoMixin(SendrecvmsgBase):
2088    # Mixin to implement doRecvmsg() using recvmsg_into().
2089
2090    def doRecvmsg(self, sock, bufsize, *args):
2091        buf = bytearray(bufsize)
2092        result = sock.recvmsg_into([buf], *args)
2093        self.registerRecvmsgResult(result)
2094        self.assertGreaterEqual(result[0], 0)
2095        self.assertLessEqual(result[0], bufsize)
2096        return (bytes(buf[:result[0]]),) + result[1:]
2097
2098
2099class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2100    # Defines flags to be checked in msg_flags for datagram sockets.
2101
2102    @property
2103    def msg_flags_non_eor_indicator(self):
2104        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2105
2106
2107class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2108    # Defines flags to be checked in msg_flags for SCTP sockets.
2109
2110    @property
2111    def msg_flags_eor_indicator(self):
2112        return super().msg_flags_eor_indicator | socket.MSG_EOR
2113
2114
2115class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2116    # Base class for tests on connectionless-mode sockets.  Users must
2117    # supply sockets on attributes cli and serv to be mapped to
2118    # cli_sock and serv_sock respectively.
2119
2120    @property
2121    def serv_sock(self):
2122        return self.serv
2123
2124    @property
2125    def cli_sock(self):
2126        return self.cli
2127
2128    @property
2129    def sendmsg_to_server_defaults(self):
2130        return ([], [], 0, self.serv_addr)
2131
2132    def sendToServer(self, msg):
2133        return self.cli_sock.sendto(msg, self.serv_addr)
2134
2135
2136class SendrecvmsgConnectedBase(SendrecvmsgBase):
2137    # Base class for tests on connected sockets.  Users must supply
2138    # sockets on attributes serv_conn and cli_conn (representing the
2139    # connections *to* the server and the client), to be mapped to
2140    # cli_sock and serv_sock respectively.
2141
2142    @property
2143    def serv_sock(self):
2144        return self.cli_conn
2145
2146    @property
2147    def cli_sock(self):
2148        return self.serv_conn
2149
2150    def checkRecvmsgAddress(self, addr1, addr2):
2151        # Address is currently "unspecified" for a connected socket,
2152        # so we don't examine it
2153        pass
2154
2155
2156class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2157    # Base class to set a timeout on server's socket.
2158
2159    def setUp(self):
2160        super().setUp()
2161        self.serv_sock.settimeout(self.fail_timeout)
2162
2163
2164class SendmsgTests(SendrecvmsgServerTimeoutBase):
2165    # Tests for sendmsg() which can use any socket type and do not
2166    # involve recvmsg() or recvmsg_into().
2167
2168    def testSendmsg(self):
2169        # Send a simple message with sendmsg().
2170        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2171
2172    def _testSendmsg(self):
2173        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2174
2175    def testSendmsgDataGenerator(self):
2176        # Send from buffer obtained from a generator (not a sequence).
2177        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2178
2179    def _testSendmsgDataGenerator(self):
2180        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2181                         len(MSG))
2182
2183    def testSendmsgAncillaryGenerator(self):
2184        # Gather (empty) ancillary data from a generator.
2185        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2186
2187    def _testSendmsgAncillaryGenerator(self):
2188        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2189                         len(MSG))
2190
2191    def testSendmsgArray(self):
2192        # Send data from an array instead of the usual bytes object.
2193        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2194
2195    def _testSendmsgArray(self):
2196        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2197                         len(MSG))
2198
2199    def testSendmsgGather(self):
2200        # Send message data from more than one buffer (gather write).
2201        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2202
2203    def _testSendmsgGather(self):
2204        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2205
2206    def testSendmsgBadArgs(self):
2207        # Check that sendmsg() rejects invalid arguments.
2208        self.assertEqual(self.serv_sock.recv(1000), b"done")
2209
2210    def _testSendmsgBadArgs(self):
2211        self.assertRaises(TypeError, self.cli_sock.sendmsg)
2212        self.assertRaises(TypeError, self.sendmsgToServer,
2213                          b"not in an iterable")
2214        self.assertRaises(TypeError, self.sendmsgToServer,
2215                          object())
2216        self.assertRaises(TypeError, self.sendmsgToServer,
2217                          [object()])
2218        self.assertRaises(TypeError, self.sendmsgToServer,
2219                          [MSG, object()])
2220        self.assertRaises(TypeError, self.sendmsgToServer,
2221                          [MSG], object())
2222        self.assertRaises(TypeError, self.sendmsgToServer,
2223                          [MSG], [], object())
2224        self.assertRaises(TypeError, self.sendmsgToServer,
2225                          [MSG], [], 0, object())
2226        self.sendToServer(b"done")
2227
2228    def testSendmsgBadCmsg(self):
2229        # Check that invalid ancillary data items are rejected.
2230        self.assertEqual(self.serv_sock.recv(1000), b"done")
2231
2232    def _testSendmsgBadCmsg(self):
2233        self.assertRaises(TypeError, self.sendmsgToServer,
2234                          [MSG], [object()])
2235        self.assertRaises(TypeError, self.sendmsgToServer,
2236                          [MSG], [(object(), 0, b"data")])
2237        self.assertRaises(TypeError, self.sendmsgToServer,
2238                          [MSG], [(0, object(), b"data")])
2239        self.assertRaises(TypeError, self.sendmsgToServer,
2240                          [MSG], [(0, 0, object())])
2241        self.assertRaises(TypeError, self.sendmsgToServer,
2242                          [MSG], [(0, 0)])
2243        self.assertRaises(TypeError, self.sendmsgToServer,
2244                          [MSG], [(0, 0, b"data", 42)])
2245        self.sendToServer(b"done")
2246
2247    @requireAttrs(socket, "CMSG_SPACE")
2248    def testSendmsgBadMultiCmsg(self):
2249        # Check that invalid ancillary data items are rejected when
2250        # more than one item is present.
2251        self.assertEqual(self.serv_sock.recv(1000), b"done")
2252
2253    @testSendmsgBadMultiCmsg.client_skip
2254    def _testSendmsgBadMultiCmsg(self):
2255        self.assertRaises(TypeError, self.sendmsgToServer,
2256                          [MSG], [0, 0, b""])
2257        self.assertRaises(TypeError, self.sendmsgToServer,
2258                          [MSG], [(0, 0, b""), object()])
2259        self.sendToServer(b"done")
2260
2261    def testSendmsgExcessCmsgReject(self):
2262        # Check that sendmsg() rejects excess ancillary data items
2263        # when the number that can be sent is limited.
2264        self.assertEqual(self.serv_sock.recv(1000), b"done")
2265
2266    def _testSendmsgExcessCmsgReject(self):
2267        if not hasattr(socket, "CMSG_SPACE"):
2268            # Can only send one item
2269            with self.assertRaises(OSError) as cm:
2270                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
2271            self.assertIsNone(cm.exception.errno)
2272        self.sendToServer(b"done")
2273
2274    def testSendmsgAfterClose(self):
2275        # Check that sendmsg() fails on a closed socket.
2276        pass
2277
2278    def _testSendmsgAfterClose(self):
2279        self.cli_sock.close()
2280        self.assertRaises(OSError, self.sendmsgToServer, [MSG])
2281
2282
2283class SendmsgStreamTests(SendmsgTests):
2284    # Tests for sendmsg() which require a stream socket and do not
2285    # involve recvmsg() or recvmsg_into().
2286
2287    def testSendmsgExplicitNoneAddr(self):
2288        # Check that peer address can be specified as None.
2289        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2290
2291    def _testSendmsgExplicitNoneAddr(self):
2292        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
2293
2294    def testSendmsgTimeout(self):
2295        # Check that timeout works with sendmsg().
2296        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2297        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2298
2299    def _testSendmsgTimeout(self):
2300        try:
2301            self.cli_sock.settimeout(0.03)
2302            with self.assertRaises(socket.timeout):
2303                while True:
2304                    self.sendmsgToServer([b"a"*512])
2305        finally:
2306            self.misc_event.set()
2307
2308    # XXX: would be nice to have more tests for sendmsg flags argument.
2309
2310    # Linux supports MSG_DONTWAIT when sending, but in general, it
2311    # only works when receiving.  Could add other platforms if they
2312    # support it too.
2313    @skipWithClientIf(sys.platform not in {"linux"},
2314                      "MSG_DONTWAIT not known to work on this platform when "
2315                      "sending")
2316    def testSendmsgDontWait(self):
2317        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
2318        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2319        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2320
2321    @testSendmsgDontWait.client_skip
2322    def _testSendmsgDontWait(self):
2323        try:
2324            with self.assertRaises(OSError) as cm:
2325                while True:
2326                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
2327            self.assertIn(cm.exception.errno,
2328                          (errno.EAGAIN, errno.EWOULDBLOCK))
2329        finally:
2330            self.misc_event.set()
2331
2332
2333class SendmsgConnectionlessTests(SendmsgTests):
2334    # Tests for sendmsg() which require a connectionless-mode
2335    # (e.g. datagram) socket, and do not involve recvmsg() or
2336    # recvmsg_into().
2337
2338    def testSendmsgNoDestAddr(self):
2339        # Check that sendmsg() fails when no destination address is
2340        # given for unconnected socket.
2341        pass
2342
2343    def _testSendmsgNoDestAddr(self):
2344        self.assertRaises(OSError, self.cli_sock.sendmsg,
2345                          [MSG])
2346        self.assertRaises(OSError, self.cli_sock.sendmsg,
2347                          [MSG], [], 0, None)
2348
2349
2350class RecvmsgGenericTests(SendrecvmsgBase):
2351    # Tests for recvmsg() which can also be emulated using
2352    # recvmsg_into(), and can use any socket type.
2353
2354    def testRecvmsg(self):
2355        # Receive a simple message with recvmsg[_into]().
2356        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2357        self.assertEqual(msg, MSG)
2358        self.checkRecvmsgAddress(addr, self.cli_addr)
2359        self.assertEqual(ancdata, [])
2360        self.checkFlags(flags, eor=True)
2361
2362    def _testRecvmsg(self):
2363        self.sendToServer(MSG)
2364
2365    def testRecvmsgExplicitDefaults(self):
2366        # Test recvmsg[_into]() with default arguments provided explicitly.
2367        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2368                                                   len(MSG), 0, 0)
2369        self.assertEqual(msg, MSG)
2370        self.checkRecvmsgAddress(addr, self.cli_addr)
2371        self.assertEqual(ancdata, [])
2372        self.checkFlags(flags, eor=True)
2373
2374    def _testRecvmsgExplicitDefaults(self):
2375        self.sendToServer(MSG)
2376
2377    def testRecvmsgShorter(self):
2378        # Receive a message smaller than buffer.
2379        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2380                                                   len(MSG) + 42)
2381        self.assertEqual(msg, MSG)
2382        self.checkRecvmsgAddress(addr, self.cli_addr)
2383        self.assertEqual(ancdata, [])
2384        self.checkFlags(flags, eor=True)
2385
2386    def _testRecvmsgShorter(self):
2387        self.sendToServer(MSG)
2388
2389    # FreeBSD < 8 doesn't always set the MSG_TRUNC flag when a truncated
2390    # datagram is received (issue #13001).
2391    @support.requires_freebsd_version(8)
2392    def testRecvmsgTrunc(self):
2393        # Receive part of message, check for truncation indicators.
2394        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2395                                                   len(MSG) - 3)
2396        self.assertEqual(msg, MSG[:-3])
2397        self.checkRecvmsgAddress(addr, self.cli_addr)
2398        self.assertEqual(ancdata, [])
2399        self.checkFlags(flags, eor=False)
2400
2401    @support.requires_freebsd_version(8)
2402    def _testRecvmsgTrunc(self):
2403        self.sendToServer(MSG)
2404
2405    def testRecvmsgShortAncillaryBuf(self):
2406        # Test ancillary data buffer too small to hold any ancillary data.
2407        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2408                                                   len(MSG), 1)
2409        self.assertEqual(msg, MSG)
2410        self.checkRecvmsgAddress(addr, self.cli_addr)
2411        self.assertEqual(ancdata, [])
2412        self.checkFlags(flags, eor=True)
2413
2414    def _testRecvmsgShortAncillaryBuf(self):
2415        self.sendToServer(MSG)
2416
2417    def testRecvmsgLongAncillaryBuf(self):
2418        # Test large ancillary data buffer.
2419        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2420                                                   len(MSG), 10240)
2421        self.assertEqual(msg, MSG)
2422        self.checkRecvmsgAddress(addr, self.cli_addr)
2423        self.assertEqual(ancdata, [])
2424        self.checkFlags(flags, eor=True)
2425
2426    def _testRecvmsgLongAncillaryBuf(self):
2427        self.sendToServer(MSG)
2428
2429    def testRecvmsgAfterClose(self):
2430        # Check that recvmsg[_into]() fails on a closed socket.
2431        self.serv_sock.close()
2432        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
2433
2434    def _testRecvmsgAfterClose(self):
2435        pass
2436
2437    def testRecvmsgTimeout(self):
2438        # Check that timeout works.
2439        try:
2440            self.serv_sock.settimeout(0.03)
2441            self.assertRaises(socket.timeout,
2442                              self.doRecvmsg, self.serv_sock, len(MSG))
2443        finally:
2444            self.misc_event.set()
2445
2446    def _testRecvmsgTimeout(self):
2447        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2448
2449    @requireAttrs(socket, "MSG_PEEK")
2450    def testRecvmsgPeek(self):
2451        # Check that MSG_PEEK in flags enables examination of pending
2452        # data without consuming it.
2453
2454        # Receive part of data with MSG_PEEK.
2455        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2456                                                   len(MSG) - 3, 0,
2457                                                   socket.MSG_PEEK)
2458        self.assertEqual(msg, MSG[:-3])
2459        self.checkRecvmsgAddress(addr, self.cli_addr)
2460        self.assertEqual(ancdata, [])
2461        # Ignoring MSG_TRUNC here (so this test is the same for stream
2462        # and datagram sockets).  Some wording in POSIX seems to
2463        # suggest that it needn't be set when peeking, but that may
2464        # just be a slip.
2465        self.checkFlags(flags, eor=False,
2466                        ignore=getattr(socket, "MSG_TRUNC", 0))
2467
2468        # Receive all data with MSG_PEEK.
2469        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2470                                                   len(MSG), 0,
2471                                                   socket.MSG_PEEK)
2472        self.assertEqual(msg, MSG)
2473        self.checkRecvmsgAddress(addr, self.cli_addr)
2474        self.assertEqual(ancdata, [])
2475        self.checkFlags(flags, eor=True)
2476
2477        # Check that the same data can still be received normally.
2478        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2479        self.assertEqual(msg, MSG)
2480        self.checkRecvmsgAddress(addr, self.cli_addr)
2481        self.assertEqual(ancdata, [])
2482        self.checkFlags(flags, eor=True)
2483
2484    @testRecvmsgPeek.client_skip
2485    def _testRecvmsgPeek(self):
2486        self.sendToServer(MSG)
2487
2488    @requireAttrs(socket.socket, "sendmsg")
2489    def testRecvmsgFromSendmsg(self):
2490        # Test receiving with recvmsg[_into]() when message is sent
2491        # using sendmsg().
2492        self.serv_sock.settimeout(self.fail_timeout)
2493        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2494        self.assertEqual(msg, MSG)
2495        self.checkRecvmsgAddress(addr, self.cli_addr)
2496        self.assertEqual(ancdata, [])
2497        self.checkFlags(flags, eor=True)
2498
2499    @testRecvmsgFromSendmsg.client_skip
2500    def _testRecvmsgFromSendmsg(self):
2501        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2502
2503
2504class RecvmsgGenericStreamTests(RecvmsgGenericTests):
2505    # Tests which require a stream socket and can use either recvmsg()
2506    # or recvmsg_into().
2507
2508    def testRecvmsgEOF(self):
2509        # Receive end-of-stream indicator (b"", peer socket closed).
2510        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2511        self.assertEqual(msg, b"")
2512        self.checkRecvmsgAddress(addr, self.cli_addr)
2513        self.assertEqual(ancdata, [])
2514        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
2515
2516    def _testRecvmsgEOF(self):
2517        self.cli_sock.close()
2518
2519    def testRecvmsgOverflow(self):
2520        # Receive a message in more than one chunk.
2521        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2522                                                    len(MSG) - 3)
2523        self.checkRecvmsgAddress(addr, self.cli_addr)
2524        self.assertEqual(ancdata, [])
2525        self.checkFlags(flags, eor=False)
2526
2527        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2528        self.checkRecvmsgAddress(addr, self.cli_addr)
2529        self.assertEqual(ancdata, [])
2530        self.checkFlags(flags, eor=True)
2531
2532        msg = seg1 + seg2
2533        self.assertEqual(msg, MSG)
2534
2535    def _testRecvmsgOverflow(self):
2536        self.sendToServer(MSG)
2537
2538
2539class RecvmsgTests(RecvmsgGenericTests):
2540    # Tests for recvmsg() which can use any socket type.
2541
2542    def testRecvmsgBadArgs(self):
2543        # Check that recvmsg() rejects invalid arguments.
2544        self.assertRaises(TypeError, self.serv_sock.recvmsg)
2545        self.assertRaises(ValueError, self.serv_sock.recvmsg,
2546                          -1, 0, 0)
2547        self.assertRaises(ValueError, self.serv_sock.recvmsg,
2548                          len(MSG), -1, 0)
2549        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2550                          [bytearray(10)], 0, 0)
2551        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2552                          object(), 0, 0)
2553        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2554                          len(MSG), object(), 0)
2555        self.assertRaises(TypeError, self.serv_sock.recvmsg,
2556                          len(MSG), 0, object())
2557
2558        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
2559        self.assertEqual(msg, MSG)
2560        self.checkRecvmsgAddress(addr, self.cli_addr)
2561        self.assertEqual(ancdata, [])
2562        self.checkFlags(flags, eor=True)
2563
2564    def _testRecvmsgBadArgs(self):
2565        self.sendToServer(MSG)
2566
2567
2568class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
2569    # Tests for recvmsg_into() which can use any socket type.
2570
2571    def testRecvmsgIntoBadArgs(self):
2572        # Check that recvmsg_into() rejects invalid arguments.
2573        buf = bytearray(len(MSG))
2574        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
2575        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2576                          len(MSG), 0, 0)
2577        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2578                          buf, 0, 0)
2579        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2580                          [object()], 0, 0)
2581        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2582                          [b"I'm not writable"], 0, 0)
2583        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2584                          [buf, object()], 0, 0)
2585        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
2586                          [buf], -1, 0)
2587        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2588                          [buf], object(), 0)
2589        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2590                          [buf], 0, object())
2591
2592        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
2593        self.assertEqual(nbytes, len(MSG))
2594        self.assertEqual(buf, bytearray(MSG))
2595        self.checkRecvmsgAddress(addr, self.cli_addr)
2596        self.assertEqual(ancdata, [])
2597        self.checkFlags(flags, eor=True)
2598
2599    def _testRecvmsgIntoBadArgs(self):
2600        self.sendToServer(MSG)
2601
2602    def testRecvmsgIntoGenerator(self):
2603        # Receive into buffer obtained from a generator (not a sequence).
2604        buf = bytearray(len(MSG))
2605        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
2606            (o for o in [buf]))
2607        self.assertEqual(nbytes, len(MSG))
2608        self.assertEqual(buf, bytearray(MSG))
2609        self.checkRecvmsgAddress(addr, self.cli_addr)
2610        self.assertEqual(ancdata, [])
2611        self.checkFlags(flags, eor=True)
2612
2613    def _testRecvmsgIntoGenerator(self):
2614        self.sendToServer(MSG)
2615
2616    def testRecvmsgIntoArray(self):
2617        # Receive into an array rather than the usual bytearray.
2618        buf = array.array("B", [0] * len(MSG))
2619        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
2620        self.assertEqual(nbytes, len(MSG))
2621        self.assertEqual(buf.tobytes(), MSG)
2622        self.checkRecvmsgAddress(addr, self.cli_addr)
2623        self.assertEqual(ancdata, [])
2624        self.checkFlags(flags, eor=True)
2625
2626    def _testRecvmsgIntoArray(self):
2627        self.sendToServer(MSG)
2628
2629    def testRecvmsgIntoScatter(self):
2630        # Receive into multiple buffers (scatter write).
2631        b1 = bytearray(b"----")
2632        b2 = bytearray(b"0123456789")
2633        b3 = bytearray(b"--------------")
2634        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
2635            [b1, memoryview(b2)[2:9], b3])
2636        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
2637        self.assertEqual(b1, bytearray(b"Mary"))
2638        self.assertEqual(b2, bytearray(b"01 had a 9"))
2639        self.assertEqual(b3, bytearray(b"little lamb---"))
2640        self.checkRecvmsgAddress(addr, self.cli_addr)
2641        self.assertEqual(ancdata, [])
2642        self.checkFlags(flags, eor=True)
2643
2644    def _testRecvmsgIntoScatter(self):
2645        self.sendToServer(b"Mary had a little lamb")
2646
2647
2648class CmsgMacroTests(unittest.TestCase):
2649    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
2650    # assumptions used by sendmsg() and recvmsg[_into](), which share
2651    # code with these functions.
2652
2653    # Match the definition in socketmodule.c
2654    try:
2655        import _testcapi
2656    except ImportError:
2657        socklen_t_limit = 0x7fffffff
2658    else:
2659        socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
2660
2661    @requireAttrs(socket, "CMSG_LEN")
2662    def testCMSG_LEN(self):
2663        # Test CMSG_LEN() with various valid and invalid values,
2664        # checking the assumptions used by recvmsg() and sendmsg().
2665        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
2666        values = list(range(257)) + list(range(toobig - 257, toobig))
2667
2668        # struct cmsghdr has at least three members, two of which are ints
2669        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
2670        for n in values:
2671            ret = socket.CMSG_LEN(n)
2672            # This is how recvmsg() calculates the data size
2673            self.assertEqual(ret - socket.CMSG_LEN(0), n)
2674            self.assertLessEqual(ret, self.socklen_t_limit)
2675
2676        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
2677        # sendmsg() shares code with these functions, and requires
2678        # that it reject values over the limit.
2679        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
2680        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
2681
2682    @requireAttrs(socket, "CMSG_SPACE")
2683    def testCMSG_SPACE(self):
2684        # Test CMSG_SPACE() with various valid and invalid values,
2685        # checking the assumptions used by sendmsg().
2686        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
2687        values = list(range(257)) + list(range(toobig - 257, toobig))
2688
2689        last = socket.CMSG_SPACE(0)
2690        # struct cmsghdr has at least three members, two of which are ints
2691        self.assertGreater(last, array.array("i").itemsize * 2)
2692        for n in values:
2693            ret = socket.CMSG_SPACE(n)
2694            self.assertGreaterEqual(ret, last)
2695            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
2696            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
2697            self.assertLessEqual(ret, self.socklen_t_limit)
2698            last = ret
2699
2700        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
2701        # sendmsg() shares code with these functions, and requires
2702        # that it reject values over the limit.
2703        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
2704        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
2705
2706
2707class SCMRightsTest(SendrecvmsgServerTimeoutBase):
2708    # Tests for file descriptor passing on Unix-domain sockets.
2709
2710    # Invalid file descriptor value that's unlikely to evaluate to a
2711    # real FD even if one of its bytes is replaced with a different
2712    # value (which shouldn't actually happen).
2713    badfd = -0x5555
2714
2715    def newFDs(self, n):
2716        # Return a list of n file descriptors for newly-created files
2717        # containing their list indices as ASCII numbers.
2718        fds = []
2719        for i in range(n):
2720            fd, path = tempfile.mkstemp()
2721            self.addCleanup(os.unlink, path)
2722            self.addCleanup(os.close, fd)
2723            os.write(fd, str(i).encode())
2724            fds.append(fd)
2725        return fds
2726
2727    def checkFDs(self, fds):
2728        # Check that the file descriptors in the given list contain
2729        # their correct list indices as ASCII numbers.
2730        for n, fd in enumerate(fds):
2731            os.lseek(fd, 0, os.SEEK_SET)
2732            self.assertEqual(os.read(fd, 1024), str(n).encode())
2733
2734    def registerRecvmsgResult(self, result):
2735        self.addCleanup(self.closeRecvmsgFDs, result)
2736
2737    def closeRecvmsgFDs(self, recvmsg_result):
2738        # Close all file descriptors specified in the ancillary data
2739        # of the given return value from recvmsg() or recvmsg_into().
2740        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
2741            if (cmsg_level == socket.SOL_SOCKET and
2742                    cmsg_type == socket.SCM_RIGHTS):
2743                fds = array.array("i")
2744                fds.frombytes(cmsg_data[:
2745                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
2746                for fd in fds:
2747                    os.close(fd)
2748
2749    def createAndSendFDs(self, n):
2750        # Send n new file descriptors created by newFDs() to the
2751        # server, with the constant MSG as the non-ancillary data.
2752        self.assertEqual(
2753            self.sendmsgToServer([MSG],
2754                                 [(socket.SOL_SOCKET,
2755                                   socket.SCM_RIGHTS,
2756                                   array.array("i", self.newFDs(n)))]),
2757            len(MSG))
2758
2759    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
2760        # Check that constant MSG was received with numfds file
2761        # descriptors in a maximum of maxcmsgs control messages (which
2762        # must contain only complete integers).  By default, check
2763        # that MSG_CTRUNC is unset, but ignore any flags in
2764        # ignoreflags.
2765        msg, ancdata, flags, addr = result
2766        self.assertEqual(msg, MSG)
2767        self.checkRecvmsgAddress(addr, self.cli_addr)
2768        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
2769                        ignore=ignoreflags)
2770
2771        self.assertIsInstance(ancdata, list)
2772        self.assertLessEqual(len(ancdata), maxcmsgs)
2773        fds = array.array("i")
2774        for item in ancdata:
2775            self.assertIsInstance(item, tuple)
2776            cmsg_level, cmsg_type, cmsg_data = item
2777            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2778            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2779            self.assertIsInstance(cmsg_data, bytes)
2780            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
2781            fds.frombytes(cmsg_data)
2782
2783        self.assertEqual(len(fds), numfds)
2784        self.checkFDs(fds)
2785
2786    def testFDPassSimple(self):
2787        # Pass a single FD (array read from bytes object).
2788        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
2789                                               len(MSG), 10240))
2790
2791    def _testFDPassSimple(self):
2792        self.assertEqual(
2793            self.sendmsgToServer(
2794                [MSG],
2795                [(socket.SOL_SOCKET,
2796                  socket.SCM_RIGHTS,
2797                  array.array("i", self.newFDs(1)).tobytes())]),
2798            len(MSG))
2799
2800    def testMultipleFDPass(self):
2801        # Pass multiple FDs in a single array.
2802        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
2803                                               len(MSG), 10240))
2804
2805    def _testMultipleFDPass(self):
2806        self.createAndSendFDs(4)
2807
2808    @requireAttrs(socket, "CMSG_SPACE")
2809    def testFDPassCMSG_SPACE(self):
2810        # Test using CMSG_SPACE() to calculate ancillary buffer size.
2811        self.checkRecvmsgFDs(
2812            4, self.doRecvmsg(self.serv_sock, len(MSG),
2813                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
2814
2815    @testFDPassCMSG_SPACE.client_skip
2816    def _testFDPassCMSG_SPACE(self):
2817        self.createAndSendFDs(4)
2818
2819    def testFDPassCMSG_LEN(self):
2820        # Test using CMSG_LEN() to calculate ancillary buffer size.
2821        self.checkRecvmsgFDs(1,
2822                             self.doRecvmsg(self.serv_sock, len(MSG),
2823                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
2824                             # RFC 3542 says implementations may set
2825                             # MSG_CTRUNC if there isn't enough space
2826                             # for trailing padding.
2827                             ignoreflags=socket.MSG_CTRUNC)
2828
2829    def _testFDPassCMSG_LEN(self):
2830        self.createAndSendFDs(1)
2831
2832    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2833    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2834    @requireAttrs(socket, "CMSG_SPACE")
2835    def testFDPassSeparate(self):
2836        # Pass two FDs in two separate arrays.  Arrays may be combined
2837        # into a single control message by the OS.
2838        self.checkRecvmsgFDs(2,
2839                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
2840                             maxcmsgs=2)
2841
2842    @testFDPassSeparate.client_skip
2843    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2844    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2845    def _testFDPassSeparate(self):
2846        fd0, fd1 = self.newFDs(2)
2847        self.assertEqual(
2848            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
2849                                          socket.SCM_RIGHTS,
2850                                          array.array("i", [fd0])),
2851                                         (socket.SOL_SOCKET,
2852                                          socket.SCM_RIGHTS,
2853                                          array.array("i", [fd1]))]),
2854            len(MSG))
2855
2856    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2857    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2858    @requireAttrs(socket, "CMSG_SPACE")
2859    def testFDPassSeparateMinSpace(self):
2860        # Pass two FDs in two separate arrays, receiving them into the
2861        # minimum space for two arrays.
2862        self.checkRecvmsgFDs(2,
2863                             self.doRecvmsg(self.serv_sock, len(MSG),
2864                                            socket.CMSG_SPACE(SIZEOF_INT) +
2865                                            socket.CMSG_LEN(SIZEOF_INT)),
2866                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
2867
2868    @testFDPassSeparateMinSpace.client_skip
2869    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2870    @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2871    def _testFDPassSeparateMinSpace(self):
2872        fd0, fd1 = self.newFDs(2)
2873        self.assertEqual(
2874            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
2875                                          socket.SCM_RIGHTS,
2876                                          array.array("i", [fd0])),
2877                                         (socket.SOL_SOCKET,
2878                                          socket.SCM_RIGHTS,
2879                                          array.array("i", [fd1]))]),
2880            len(MSG))
2881
2882    def sendAncillaryIfPossible(self, msg, ancdata):
2883        # Try to send msg and ancdata to server, but if the system
2884        # call fails, just send msg with no ancillary data.
2885        try:
2886            nbytes = self.sendmsgToServer([msg], ancdata)
2887        except OSError as e:
2888            # Check that it was the system call that failed
2889            self.assertIsInstance(e.errno, int)
2890            nbytes = self.sendmsgToServer([msg])
2891        self.assertEqual(nbytes, len(msg))
2892
2893    @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
2894    def testFDPassEmpty(self):
2895        # Try to pass an empty FD array.  Can receive either no array
2896        # or an empty array.
2897        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
2898                                               len(MSG), 10240),
2899                             ignoreflags=socket.MSG_CTRUNC)
2900
2901    def _testFDPassEmpty(self):
2902        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
2903                                            socket.SCM_RIGHTS,
2904                                            b"")])
2905
2906    def testFDPassPartialInt(self):
2907        # Try to pass a truncated FD array.
2908        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2909                                                   len(MSG), 10240)
2910        self.assertEqual(msg, MSG)
2911        self.checkRecvmsgAddress(addr, self.cli_addr)
2912        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
2913        self.assertLessEqual(len(ancdata), 1)
2914        for cmsg_level, cmsg_type, cmsg_data in ancdata:
2915            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2916            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2917            self.assertLess(len(cmsg_data), SIZEOF_INT)
2918
2919    def _testFDPassPartialInt(self):
2920        self.sendAncillaryIfPossible(
2921            MSG,
2922            [(socket.SOL_SOCKET,
2923              socket.SCM_RIGHTS,
2924              array.array("i", [self.badfd]).tobytes()[:-1])])
2925
2926    @requireAttrs(socket, "CMSG_SPACE")
2927    def testFDPassPartialIntInMiddle(self):
2928        # Try to pass two FD arrays, the first of which is truncated.
2929        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2930                                                   len(MSG), 10240)
2931        self.assertEqual(msg, MSG)
2932        self.checkRecvmsgAddress(addr, self.cli_addr)
2933        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
2934        self.assertLessEqual(len(ancdata), 2)
2935        fds = array.array("i")
2936        # Arrays may have been combined in a single control message
2937        for cmsg_level, cmsg_type, cmsg_data in ancdata:
2938            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2939            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2940            fds.frombytes(cmsg_data[:
2941                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
2942        self.assertLessEqual(len(fds), 2)
2943        self.checkFDs(fds)
2944
2945    @testFDPassPartialIntInMiddle.client_skip
2946    def _testFDPassPartialIntInMiddle(self):
2947        fd0, fd1 = self.newFDs(2)
2948        self.sendAncillaryIfPossible(
2949            MSG,
2950            [(socket.SOL_SOCKET,
2951              socket.SCM_RIGHTS,
2952              array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
2953             (socket.SOL_SOCKET,
2954              socket.SCM_RIGHTS,
2955              array.array("i", [fd1]))])
2956
2957    def checkTruncatedHeader(self, result, ignoreflags=0):
2958        # Check that no ancillary data items are returned when data is
2959        # truncated inside the cmsghdr structure.
2960        msg, ancdata, flags, addr = result
2961        self.assertEqual(msg, MSG)
2962        self.checkRecvmsgAddress(addr, self.cli_addr)
2963        self.assertEqual(ancdata, [])
2964        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
2965                        ignore=ignoreflags)
2966
2967    def testCmsgTruncNoBufSize(self):
2968        # Check that no ancillary data is received when no buffer size
2969        # is specified.
2970        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
2971                                  # BSD seems to set MSG_CTRUNC only
2972                                  # if an item has been partially
2973                                  # received.
2974                                  ignoreflags=socket.MSG_CTRUNC)
2975
2976    def _testCmsgTruncNoBufSize(self):
2977        self.createAndSendFDs(1)
2978
2979    def testCmsgTrunc0(self):
2980        # Check that no ancillary data is received when buffer size is 0.
2981        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
2982                                  ignoreflags=socket.MSG_CTRUNC)
2983
2984    def _testCmsgTrunc0(self):
2985        self.createAndSendFDs(1)
2986
2987    # Check that no ancillary data is returned for various non-zero
2988    # (but still too small) buffer sizes.
2989
2990    def testCmsgTrunc1(self):
2991        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
2992
2993    def _testCmsgTrunc1(self):
2994        self.createAndSendFDs(1)
2995
2996    def testCmsgTrunc2Int(self):
2997        # The cmsghdr structure has at least three members, two of
2998        # which are ints, so we still shouldn't see any ancillary
2999        # data.
3000        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3001                                                 SIZEOF_INT * 2))
3002
3003    def _testCmsgTrunc2Int(self):
3004        self.createAndSendFDs(1)
3005
3006    def testCmsgTruncLen0Minus1(self):
3007        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3008                                                 socket.CMSG_LEN(0) - 1))
3009
3010    def _testCmsgTruncLen0Minus1(self):
3011        self.createAndSendFDs(1)
3012
3013    # The following tests try to truncate the control message in the
3014    # middle of the FD array.
3015
3016    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3017        # Check that file descriptor data is truncated to between
3018        # mindata and maxdata bytes when received with buffer size
3019        # ancbuf, and that any complete file descriptor numbers are
3020        # valid.
3021        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3022                                                   len(MSG), ancbuf)
3023        self.assertEqual(msg, MSG)
3024        self.checkRecvmsgAddress(addr, self.cli_addr)
3025        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3026
3027        if mindata == 0 and ancdata == []:
3028            return
3029        self.assertEqual(len(ancdata), 1)
3030        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3031        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3032        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3033        self.assertGreaterEqual(len(cmsg_data), mindata)
3034        self.assertLessEqual(len(cmsg_data), maxdata)
3035        fds = array.array("i")
3036        fds.frombytes(cmsg_data[:
3037                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3038        self.checkFDs(fds)
3039
3040    def testCmsgTruncLen0(self):
3041        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3042
3043    def _testCmsgTruncLen0(self):
3044        self.createAndSendFDs(1)
3045
3046    def testCmsgTruncLen0Plus1(self):
3047        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3048
3049    def _testCmsgTruncLen0Plus1(self):
3050        self.createAndSendFDs(2)
3051
3052    def testCmsgTruncLen1(self):
3053        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3054                                 maxdata=SIZEOF_INT)
3055
3056    def _testCmsgTruncLen1(self):
3057        self.createAndSendFDs(2)
3058
3059    def testCmsgTruncLen2Minus1(self):
3060        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3061                                 maxdata=(2 * SIZEOF_INT) - 1)
3062
3063    def _testCmsgTruncLen2Minus1(self):
3064        self.createAndSendFDs(2)
3065
3066
3067class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3068    # Test sendmsg() and recvmsg[_into]() using the ancillary data
3069    # features of the RFC 3542 Advanced Sockets API for IPv6.
3070    # Currently we can only handle certain data items (e.g. traffic
3071    # class, hop limit, MTU discovery and fragmentation settings)
3072    # without resorting to unportable means such as the struct module,
3073    # but the tests here are aimed at testing the ancillary data
3074    # handling in sendmsg() and recvmsg() rather than the IPv6 API
3075    # itself.
3076
3077    # Test value to use when setting hop limit of packet
3078    hop_limit = 2
3079
3080    # Test value to use when setting traffic class of packet.
3081    # -1 means "use kernel default".
3082    traffic_class = -1
3083
3084    def ancillaryMapping(self, ancdata):
3085        # Given ancillary data list ancdata, return a mapping from
3086        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3087        # Check that no (level, type) pair appears more than once.
3088        d = {}
3089        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3090            self.assertNotIn((cmsg_level, cmsg_type), d)
3091            d[(cmsg_level, cmsg_type)] = cmsg_data
3092        return d
3093
3094    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3095        # Receive hop limit into ancbufsize bytes of ancillary data
3096        # space.  Check that data is MSG, ancillary data is not
3097        # truncated (but ignore any flags in ignoreflags), and hop
3098        # limit is between 0 and maxhop inclusive.
3099        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3100                                  socket.IPV6_RECVHOPLIMIT, 1)
3101        self.misc_event.set()
3102        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3103                                                   len(MSG), ancbufsize)
3104
3105        self.assertEqual(msg, MSG)
3106        self.checkRecvmsgAddress(addr, self.cli_addr)
3107        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3108                        ignore=ignoreflags)
3109
3110        self.assertEqual(len(ancdata), 1)
3111        self.assertIsInstance(ancdata[0], tuple)
3112        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3113        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3114        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3115        self.assertIsInstance(cmsg_data, bytes)
3116        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3117        a = array.array("i")
3118        a.frombytes(cmsg_data)
3119        self.assertGreaterEqual(a[0], 0)
3120        self.assertLessEqual(a[0], maxhop)
3121
3122    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3123    def testRecvHopLimit(self):
3124        # Test receiving the packet hop limit as ancillary data.
3125        self.checkHopLimit(ancbufsize=10240)
3126
3127    @testRecvHopLimit.client_skip
3128    def _testRecvHopLimit(self):
3129        # Need to wait until server has asked to receive ancillary
3130        # data, as implementations are not required to buffer it
3131        # otherwise.
3132        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3133        self.sendToServer(MSG)
3134
3135    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3136    def testRecvHopLimitCMSG_SPACE(self):
3137        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3138        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3139
3140    @testRecvHopLimitCMSG_SPACE.client_skip
3141    def _testRecvHopLimitCMSG_SPACE(self):
3142        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3143        self.sendToServer(MSG)
3144
3145    # Could test receiving into buffer sized using CMSG_LEN, but RFC
3146    # 3542 says portable applications must provide space for trailing
3147    # padding.  Implementations may set MSG_CTRUNC if there isn't
3148    # enough space for the padding.
3149
3150    @requireAttrs(socket.socket, "sendmsg")
3151    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3152    def testSetHopLimit(self):
3153        # Test setting hop limit on outgoing packet and receiving it
3154        # at the other end.
3155        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3156
3157    @testSetHopLimit.client_skip
3158    def _testSetHopLimit(self):
3159        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3160        self.assertEqual(
3161            self.sendmsgToServer([MSG],
3162                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3163                                   array.array("i", [self.hop_limit]))]),
3164            len(MSG))
3165
3166    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3167                                     ignoreflags=0):
3168        # Receive traffic class and hop limit into ancbufsize bytes of
3169        # ancillary data space.  Check that data is MSG, ancillary
3170        # data is not truncated (but ignore any flags in ignoreflags),
3171        # and traffic class and hop limit are in range (hop limit no
3172        # more than maxhop).
3173        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3174                                  socket.IPV6_RECVHOPLIMIT, 1)
3175        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3176                                  socket.IPV6_RECVTCLASS, 1)
3177        self.misc_event.set()
3178        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3179                                                   len(MSG), ancbufsize)
3180
3181        self.assertEqual(msg, MSG)
3182        self.checkRecvmsgAddress(addr, self.cli_addr)
3183        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3184                        ignore=ignoreflags)
3185        self.assertEqual(len(ancdata), 2)
3186        ancmap = self.ancillaryMapping(ancdata)
3187
3188        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3189        self.assertEqual(len(tcdata), SIZEOF_INT)
3190        a = array.array("i")
3191        a.frombytes(tcdata)
3192        self.assertGreaterEqual(a[0], 0)
3193        self.assertLessEqual(a[0], 255)
3194
3195        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3196        self.assertEqual(len(hldata), SIZEOF_INT)
3197        a = array.array("i")
3198        a.frombytes(hldata)
3199        self.assertGreaterEqual(a[0], 0)
3200        self.assertLessEqual(a[0], maxhop)
3201
3202    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3203                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3204    def testRecvTrafficClassAndHopLimit(self):
3205        # Test receiving traffic class and hop limit as ancillary data.
3206        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3207
3208    @testRecvTrafficClassAndHopLimit.client_skip
3209    def _testRecvTrafficClassAndHopLimit(self):
3210        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3211        self.sendToServer(MSG)
3212
3213    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3214                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3215    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3216        # Test receiving traffic class and hop limit, using
3217        # CMSG_SPACE() to calculate buffer size.
3218        self.checkTrafficClassAndHopLimit(
3219            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3220
3221    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3222    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3223        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3224        self.sendToServer(MSG)
3225
3226    @requireAttrs(socket.socket, "sendmsg")
3227    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3228                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3229    def testSetTrafficClassAndHopLimit(self):
3230        # Test setting traffic class and hop limit on outgoing packet,
3231        # and receiving them at the other end.
3232        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3233                                          maxhop=self.hop_limit)
3234
3235    @testSetTrafficClassAndHopLimit.client_skip
3236    def _testSetTrafficClassAndHopLimit(self):
3237        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3238        self.assertEqual(
3239            self.sendmsgToServer([MSG],
3240                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3241                                   array.array("i", [self.traffic_class])),
3242                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3243                                   array.array("i", [self.hop_limit]))]),
3244            len(MSG))
3245
3246    @requireAttrs(socket.socket, "sendmsg")
3247    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3248                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3249    def testOddCmsgSize(self):
3250        # Try to send ancillary data with first item one byte too
3251        # long.  Fall back to sending with correct size if this fails,
3252        # and check that second item was handled correctly.
3253        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3254                                          maxhop=self.hop_limit)
3255
3256    @testOddCmsgSize.client_skip
3257    def _testOddCmsgSize(self):
3258        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3259        try:
3260            nbytes = self.sendmsgToServer(
3261                [MSG],
3262                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3263                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
3264                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3265                  array.array("i", [self.hop_limit]))])
3266        except OSError as e:
3267            self.assertIsInstance(e.errno, int)
3268            nbytes = self.sendmsgToServer(
3269                [MSG],
3270                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3271                  array.array("i", [self.traffic_class])),
3272                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3273                  array.array("i", [self.hop_limit]))])
3274            self.assertEqual(nbytes, len(MSG))
3275
3276    # Tests for proper handling of truncated ancillary data
3277
3278    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
3279        # Receive hop limit into ancbufsize bytes of ancillary data
3280        # space, which should be too small to contain the ancillary
3281        # data header (if ancbufsize is None, pass no second argument
3282        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
3283        # (unless included in ignoreflags), and no ancillary data is
3284        # returned.
3285        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3286                                  socket.IPV6_RECVHOPLIMIT, 1)
3287        self.misc_event.set()
3288        args = () if ancbufsize is None else (ancbufsize,)
3289        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3290                                                   len(MSG), *args)
3291
3292        self.assertEqual(msg, MSG)
3293        self.checkRecvmsgAddress(addr, self.cli_addr)
3294        self.assertEqual(ancdata, [])
3295        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3296                        ignore=ignoreflags)
3297
3298    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3299    def testCmsgTruncNoBufSize(self):
3300        # Check that no ancillary data is received when no ancillary
3301        # buffer size is provided.
3302        self.checkHopLimitTruncatedHeader(ancbufsize=None,
3303                                          # BSD seems to set
3304                                          # MSG_CTRUNC only if an item
3305                                          # has been partially
3306                                          # received.
3307                                          ignoreflags=socket.MSG_CTRUNC)
3308
3309    @testCmsgTruncNoBufSize.client_skip
3310    def _testCmsgTruncNoBufSize(self):
3311        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3312        self.sendToServer(MSG)
3313
3314    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3315    def testSingleCmsgTrunc0(self):
3316        # Check that no ancillary data is received when ancillary
3317        # buffer size is zero.
3318        self.checkHopLimitTruncatedHeader(ancbufsize=0,
3319                                          ignoreflags=socket.MSG_CTRUNC)
3320
3321    @testSingleCmsgTrunc0.client_skip
3322    def _testSingleCmsgTrunc0(self):
3323        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3324        self.sendToServer(MSG)
3325
3326    # Check that no ancillary data is returned for various non-zero
3327    # (but still too small) buffer sizes.
3328
3329    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3330    def testSingleCmsgTrunc1(self):
3331        self.checkHopLimitTruncatedHeader(ancbufsize=1)
3332
3333    @testSingleCmsgTrunc1.client_skip
3334    def _testSingleCmsgTrunc1(self):
3335        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3336        self.sendToServer(MSG)
3337
3338    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3339    def testSingleCmsgTrunc2Int(self):
3340        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
3341
3342    @testSingleCmsgTrunc2Int.client_skip
3343    def _testSingleCmsgTrunc2Int(self):
3344        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3345        self.sendToServer(MSG)
3346
3347    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3348    def testSingleCmsgTruncLen0Minus1(self):
3349        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
3350
3351    @testSingleCmsgTruncLen0Minus1.client_skip
3352    def _testSingleCmsgTruncLen0Minus1(self):
3353        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3354        self.sendToServer(MSG)
3355
3356    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3357    def testSingleCmsgTruncInData(self):
3358        # Test truncation of a control message inside its associated
3359        # data.  The message may be returned with its data truncated,
3360        # or not returned at all.
3361        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3362                                  socket.IPV6_RECVHOPLIMIT, 1)
3363        self.misc_event.set()
3364        msg, ancdata, flags, addr = self.doRecvmsg(
3365            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
3366
3367        self.assertEqual(msg, MSG)
3368        self.checkRecvmsgAddress(addr, self.cli_addr)
3369        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3370
3371        self.assertLessEqual(len(ancdata), 1)
3372        if ancdata:
3373            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3374            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3375            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3376            self.assertLess(len(cmsg_data), SIZEOF_INT)
3377
3378    @testSingleCmsgTruncInData.client_skip
3379    def _testSingleCmsgTruncInData(self):
3380        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3381        self.sendToServer(MSG)
3382
3383    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
3384        # Receive traffic class and hop limit into ancbufsize bytes of
3385        # ancillary data space, which should be large enough to
3386        # contain the first item, but too small to contain the header
3387        # of the second.  Check that data is MSG, MSG_CTRUNC is set
3388        # (unless included in ignoreflags), and only one ancillary
3389        # data item is returned.
3390        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3391                                  socket.IPV6_RECVHOPLIMIT, 1)
3392        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3393                                  socket.IPV6_RECVTCLASS, 1)
3394        self.misc_event.set()
3395        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3396                                                   len(MSG), ancbufsize)
3397
3398        self.assertEqual(msg, MSG)
3399        self.checkRecvmsgAddress(addr, self.cli_addr)
3400        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3401                        ignore=ignoreflags)
3402
3403        self.assertEqual(len(ancdata), 1)
3404        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3405        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3406        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
3407        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3408        a = array.array("i")
3409        a.frombytes(cmsg_data)
3410        self.assertGreaterEqual(a[0], 0)
3411        self.assertLessEqual(a[0], 255)
3412
3413    # Try the above test with various buffer sizes.
3414
3415    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3416                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3417    def testSecondCmsgTrunc0(self):
3418        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
3419                                        ignoreflags=socket.MSG_CTRUNC)
3420
3421    @testSecondCmsgTrunc0.client_skip
3422    def _testSecondCmsgTrunc0(self):
3423        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3424        self.sendToServer(MSG)
3425
3426    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3427                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3428    def testSecondCmsgTrunc1(self):
3429        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
3430
3431    @testSecondCmsgTrunc1.client_skip
3432    def _testSecondCmsgTrunc1(self):
3433        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3434        self.sendToServer(MSG)
3435
3436    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3437                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3438    def testSecondCmsgTrunc2Int(self):
3439        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3440                                        2 * SIZEOF_INT)
3441
3442    @testSecondCmsgTrunc2Int.client_skip
3443    def _testSecondCmsgTrunc2Int(self):
3444        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3445        self.sendToServer(MSG)
3446
3447    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3448                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3449    def testSecondCmsgTruncLen0Minus1(self):
3450        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3451                                        socket.CMSG_LEN(0) - 1)
3452
3453    @testSecondCmsgTruncLen0Minus1.client_skip
3454    def _testSecondCmsgTruncLen0Minus1(self):
3455        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3456        self.sendToServer(MSG)
3457
3458    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3459                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3460    def testSecomdCmsgTruncInData(self):
3461        # Test truncation of the second of two control messages inside
3462        # its associated data.
3463        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3464                                  socket.IPV6_RECVHOPLIMIT, 1)
3465        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3466                                  socket.IPV6_RECVTCLASS, 1)
3467        self.misc_event.set()
3468        msg, ancdata, flags, addr = self.doRecvmsg(
3469            self.serv_sock, len(MSG),
3470            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
3471
3472        self.assertEqual(msg, MSG)
3473        self.checkRecvmsgAddress(addr, self.cli_addr)
3474        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3475
3476        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
3477
3478        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3479        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3480        cmsg_types.remove(cmsg_type)
3481        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3482        a = array.array("i")
3483        a.frombytes(cmsg_data)
3484        self.assertGreaterEqual(a[0], 0)
3485        self.assertLessEqual(a[0], 255)
3486
3487        if ancdata:
3488            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3489            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3490            cmsg_types.remove(cmsg_type)
3491            self.assertLess(len(cmsg_data), SIZEOF_INT)
3492
3493        self.assertEqual(ancdata, [])
3494
3495    @testSecomdCmsgTruncInData.client_skip
3496    def _testSecomdCmsgTruncInData(self):
3497        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3498        self.sendToServer(MSG)
3499
3500
3501# Derive concrete test classes for different socket types.
3502
3503class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
3504                             SendrecvmsgConnectionlessBase,
3505                             ThreadedSocketTestMixin, UDPTestBase):
3506    pass
3507
3508@requireAttrs(socket.socket, "sendmsg")
3509@unittest.skipUnless(thread, 'Threading required for this test.')
3510class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
3511    pass
3512
3513@requireAttrs(socket.socket, "recvmsg")
3514@unittest.skipUnless(thread, 'Threading required for this test.')
3515class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
3516    pass
3517
3518@requireAttrs(socket.socket, "recvmsg_into")
3519@unittest.skipUnless(thread, 'Threading required for this test.')
3520class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
3521    pass
3522
3523
3524class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
3525                              SendrecvmsgConnectionlessBase,
3526                              ThreadedSocketTestMixin, UDP6TestBase):
3527
3528    def checkRecvmsgAddress(self, addr1, addr2):
3529        # Called to compare the received address with the address of
3530        # the peer, ignoring scope ID
3531        self.assertEqual(addr1[:-1], addr2[:-1])
3532
3533@requireAttrs(socket.socket, "sendmsg")
3534@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3535@requireSocket("AF_INET6", "SOCK_DGRAM")
3536@unittest.skipUnless(thread, 'Threading required for this test.')
3537class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
3538    pass
3539
3540@requireAttrs(socket.socket, "recvmsg")
3541@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3542@requireSocket("AF_INET6", "SOCK_DGRAM")
3543@unittest.skipUnless(thread, 'Threading required for this test.')
3544class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
3545    pass
3546
3547@requireAttrs(socket.socket, "recvmsg_into")
3548@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3549@requireSocket("AF_INET6", "SOCK_DGRAM")
3550@unittest.skipUnless(thread, 'Threading required for this test.')
3551class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
3552    pass
3553
3554@requireAttrs(socket.socket, "recvmsg")
3555@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3556@requireAttrs(socket, "IPPROTO_IPV6")
3557@requireSocket("AF_INET6", "SOCK_DGRAM")
3558@unittest.skipUnless(thread, 'Threading required for this test.')
3559class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
3560                                      SendrecvmsgUDP6TestBase):
3561    pass
3562
3563@requireAttrs(socket.socket, "recvmsg_into")
3564@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3565@requireAttrs(socket, "IPPROTO_IPV6")
3566@requireSocket("AF_INET6", "SOCK_DGRAM")
3567@unittest.skipUnless(thread, 'Threading required for this test.')
3568class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
3569                                          RFC3542AncillaryTest,
3570                                          SendrecvmsgUDP6TestBase):
3571    pass
3572
3573
3574class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
3575                             ConnectedStreamTestMixin, TCPTestBase):
3576    pass
3577
3578@requireAttrs(socket.socket, "sendmsg")
3579@unittest.skipUnless(thread, 'Threading required for this test.')
3580class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
3581    pass
3582
3583@requireAttrs(socket.socket, "recvmsg")
3584@unittest.skipUnless(thread, 'Threading required for this test.')
3585class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
3586                     SendrecvmsgTCPTestBase):
3587    pass
3588
3589@requireAttrs(socket.socket, "recvmsg_into")
3590@unittest.skipUnless(thread, 'Threading required for this test.')
3591class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3592                         SendrecvmsgTCPTestBase):
3593    pass
3594
3595
3596class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
3597                                    SendrecvmsgConnectedBase,
3598                                    ConnectedStreamTestMixin, SCTPStreamBase):
3599    pass
3600
3601@requireAttrs(socket.socket, "sendmsg")
3602@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3603@unittest.skipUnless(thread, 'Threading required for this test.')
3604class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
3605    pass
3606
3607@requireAttrs(socket.socket, "recvmsg")
3608@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3609@unittest.skipUnless(thread, 'Threading required for this test.')
3610class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
3611                            SendrecvmsgSCTPStreamTestBase):
3612
3613    def testRecvmsgEOF(self):
3614        try:
3615            super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
3616        except OSError as e:
3617            if e.errno != errno.ENOTCONN:
3618                raise
3619            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
3620
3621@requireAttrs(socket.socket, "recvmsg_into")
3622@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3623@unittest.skipUnless(thread, 'Threading required for this test.')
3624class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3625                                SendrecvmsgSCTPStreamTestBase):
3626
3627    def testRecvmsgEOF(self):
3628        try:
3629            super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
3630        except OSError as e:
3631            if e.errno != errno.ENOTCONN:
3632                raise
3633            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
3634
3635
3636class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
3637                                    ConnectedStreamTestMixin, UnixStreamBase):
3638    pass
3639
3640@requireAttrs(socket.socket, "sendmsg")
3641@requireAttrs(socket, "AF_UNIX")
3642@unittest.skipUnless(thread, 'Threading required for this test.')
3643class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
3644    pass
3645
3646@requireAttrs(socket.socket, "recvmsg")
3647@requireAttrs(socket, "AF_UNIX")
3648@unittest.skipUnless(thread, 'Threading required for this test.')
3649class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
3650                            SendrecvmsgUnixStreamTestBase):
3651    pass
3652
3653@requireAttrs(socket.socket, "recvmsg_into")
3654@requireAttrs(socket, "AF_UNIX")
3655@unittest.skipUnless(thread, 'Threading required for this test.')
3656class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3657                                SendrecvmsgUnixStreamTestBase):
3658    pass
3659
3660@requireAttrs(socket.socket, "sendmsg", "recvmsg")
3661@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
3662@unittest.skipUnless(thread, 'Threading required for this test.')
3663class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
3664    pass
3665
3666@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
3667@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
3668@unittest.skipUnless(thread, 'Threading required for this test.')
3669class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
3670                                     SendrecvmsgUnixStreamTestBase):
3671    pass
3672
3673
3674# Test interrupting the interruptible send/receive methods with a
3675# signal when a timeout is set.  These tests avoid having multiple
3676# threads alive during the test so that the OS cannot deliver the
3677# signal to the wrong one.
3678
3679class InterruptedTimeoutBase(unittest.TestCase):
3680    # Base class for interrupted send/receive tests.  Installs an
3681    # empty handler for SIGALRM and removes it on teardown, along with
3682    # any scheduled alarms.
3683
3684    def setUp(self):
3685        super().setUp()
3686        orig_alrm_handler = signal.signal(signal.SIGALRM,
3687                                          lambda signum, frame: 1 / 0)
3688        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
3689        self.addCleanup(self.setAlarm, 0)
3690
3691    # Timeout for socket operations
3692    timeout = 4.0
3693
3694    # Provide setAlarm() method to schedule delivery of SIGALRM after
3695    # given number of seconds, or cancel it if zero, and an
3696    # appropriate time value to use.  Use setitimer() if available.
3697    if hasattr(signal, "setitimer"):
3698        alarm_time = 0.05
3699
3700        def setAlarm(self, seconds):
3701            signal.setitimer(signal.ITIMER_REAL, seconds)
3702    else:
3703        # Old systems may deliver the alarm up to one second early
3704        alarm_time = 2
3705
3706        def setAlarm(self, seconds):
3707            signal.alarm(seconds)
3708
3709
3710# Require siginterrupt() in order to ensure that system calls are
3711# interrupted by default.
3712@requireAttrs(signal, "siginterrupt")
3713@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
3714                     "Don't have signal.alarm or signal.setitimer")
3715class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
3716    # Test interrupting the recv*() methods with signals when a
3717    # timeout is set.
3718
3719    def setUp(self):
3720        super().setUp()
3721        self.serv.settimeout(self.timeout)
3722
3723    def checkInterruptedRecv(self, func, *args, **kwargs):
3724        # Check that func(*args, **kwargs) raises
3725        # errno of EINTR when interrupted by a signal.
3726        self.setAlarm(self.alarm_time)
3727        with self.assertRaises(ZeroDivisionError) as cm:
3728            func(*args, **kwargs)
3729
3730    def testInterruptedRecvTimeout(self):
3731        self.checkInterruptedRecv(self.serv.recv, 1024)
3732
3733    def testInterruptedRecvIntoTimeout(self):
3734        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
3735
3736    def testInterruptedRecvfromTimeout(self):
3737        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
3738
3739    def testInterruptedRecvfromIntoTimeout(self):
3740        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
3741
3742    @requireAttrs(socket.socket, "recvmsg")
3743    def testInterruptedRecvmsgTimeout(self):
3744        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
3745
3746    @requireAttrs(socket.socket, "recvmsg_into")
3747    def testInterruptedRecvmsgIntoTimeout(self):
3748        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
3749
3750
3751# Require siginterrupt() in order to ensure that system calls are
3752# interrupted by default.
3753@requireAttrs(signal, "siginterrupt")
3754@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
3755                     "Don't have signal.alarm or signal.setitimer")
3756@unittest.skipUnless(thread, 'Threading required for this test.')
3757class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
3758                                 ThreadSafeCleanupTestCase,
3759                                 SocketListeningTestMixin, TCPTestBase):
3760    # Test interrupting the interruptible send*() methods with signals
3761    # when a timeout is set.
3762
3763    def setUp(self):
3764        super().setUp()
3765        self.serv_conn = self.newSocket()
3766        self.addCleanup(self.serv_conn.close)
3767        # Use a thread to complete the connection, but wait for it to
3768        # terminate before running the test, so that there is only one
3769        # thread to accept the signal.
3770        cli_thread = threading.Thread(target=self.doConnect)
3771        cli_thread.start()
3772        self.cli_conn, addr = self.serv.accept()
3773        self.addCleanup(self.cli_conn.close)
3774        cli_thread.join()
3775        self.serv_conn.settimeout(self.timeout)
3776
3777    def doConnect(self):
3778        self.serv_conn.connect(self.serv_addr)
3779
3780    def checkInterruptedSend(self, func, *args, **kwargs):
3781        # Check that func(*args, **kwargs), run in a loop, raises
3782        # OSError with an errno of EINTR when interrupted by a
3783        # signal.
3784        with self.assertRaises(ZeroDivisionError) as cm:
3785            while True:
3786                self.setAlarm(self.alarm_time)
3787                func(*args, **kwargs)
3788
3789    # Issue #12958: The following tests have problems on OS X prior to 10.7
3790    @support.requires_mac_ver(10, 7)
3791    def testInterruptedSendTimeout(self):
3792        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
3793
3794    @support.requires_mac_ver(10, 7)
3795    def testInterruptedSendtoTimeout(self):
3796        # Passing an actual address here as Python's wrapper for
3797        # sendto() doesn't allow passing a zero-length one; POSIX
3798        # requires that the address is ignored since the socket is
3799        # connection-mode, however.
3800        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
3801                                  self.serv_addr)
3802
3803    @support.requires_mac_ver(10, 7)
3804    @requireAttrs(socket.socket, "sendmsg")
3805    def testInterruptedSendmsgTimeout(self):
3806        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
3807
3808
3809@unittest.skipUnless(thread, 'Threading required for this test.')
3810class TCPCloserTest(ThreadedTCPSocketTest):
3811
3812    def testClose(self):
3813        conn, addr = self.serv.accept()
3814        conn.close()
3815
3816        sd = self.cli
3817        read, write, err = select.select([sd], [], [], 1.0)
3818        self.assertEqual(read, [sd])
3819        self.assertEqual(sd.recv(1), b'')
3820
3821        # Calling close() many times should be safe.
3822        conn.close()
3823        conn.close()
3824
3825    def _testClose(self):
3826        self.cli.connect((HOST, self.port))
3827        time.sleep(1.0)
3828
3829@unittest.skipUnless(thread, 'Threading required for this test.')
3830class BasicSocketPairTest(SocketPairTest):
3831
3832    def __init__(self, methodName='runTest'):
3833        SocketPairTest.__init__(self, methodName=methodName)
3834
3835    def _check_defaults(self, sock):
3836        self.assertIsInstance(sock, socket.socket)
3837        if hasattr(socket, 'AF_UNIX'):
3838            self.assertEqual(sock.family, socket.AF_UNIX)
3839        else:
3840            self.assertEqual(sock.family, socket.AF_INET)
3841        self.assertEqual(sock.type, socket.SOCK_STREAM)
3842        self.assertEqual(sock.proto, 0)
3843
3844    def _testDefaults(self):
3845        self._check_defaults(self.cli)
3846
3847    def testDefaults(self):
3848        self._check_defaults(self.serv)
3849
3850    def testRecv(self):
3851        msg = self.serv.recv(1024)
3852        self.assertEqual(msg, MSG)
3853
3854    def _testRecv(self):
3855        self.cli.send(MSG)
3856
3857    def testSend(self):
3858        self.serv.send(MSG)
3859
3860    def _testSend(self):
3861        msg = self.cli.recv(1024)
3862        self.assertEqual(msg, MSG)
3863
3864@unittest.skipUnless(thread, 'Threading required for this test.')
3865class NonBlockingTCPTests(ThreadedTCPSocketTest):
3866
3867    def __init__(self, methodName='runTest'):
3868        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
3869
3870    def testSetBlocking(self):
3871        # Testing whether set blocking works
3872        self.serv.setblocking(True)
3873        self.assertIsNone(self.serv.gettimeout())
3874        self.serv.setblocking(False)
3875        self.assertEqual(self.serv.gettimeout(), 0.0)
3876        start = time.time()
3877        try:
3878            self.serv.accept()
3879        except OSError:
3880            pass
3881        end = time.time()
3882        self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
3883
3884    def _testSetBlocking(self):
3885        pass
3886
3887    @support.cpython_only
3888    def testSetBlocking_overflow(self):
3889        # Issue 15989
3890        import _testcapi
3891        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
3892            self.skipTest('needs UINT_MAX < ULONG_MAX')
3893        self.serv.setblocking(False)
3894        self.assertEqual(self.serv.gettimeout(), 0.0)
3895        self.serv.setblocking(_testcapi.UINT_MAX + 1)
3896        self.assertIsNone(self.serv.gettimeout())
3897
3898    _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
3899
3900    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
3901                         'test needs socket.SOCK_NONBLOCK')
3902    @support.requires_linux_version(2, 6, 28)
3903    def testInitNonBlocking(self):
3904        # reinit server socket
3905        self.serv.close()
3906        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM |
3907                                                  socket.SOCK_NONBLOCK)
3908        self.port = support.bind_port(self.serv)
3909        self.serv.listen()
3910        # actual testing
3911        start = time.time()
3912        try:
3913            self.serv.accept()
3914        except OSError:
3915            pass
3916        end = time.time()
3917        self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
3918
3919    def _testInitNonBlocking(self):
3920        pass
3921
3922    def testInheritFlags(self):
3923        # Issue #7995: when calling accept() on a listening socket with a
3924        # timeout, the resulting socket should not be non-blocking.
3925        self.serv.settimeout(10)
3926        try:
3927            conn, addr = self.serv.accept()
3928            message = conn.recv(len(MSG))
3929        finally:
3930            conn.close()
3931            self.serv.settimeout(None)
3932
3933    def _testInheritFlags(self):
3934        time.sleep(0.1)
3935        self.cli.connect((HOST, self.port))
3936        time.sleep(0.5)
3937        self.cli.send(MSG)
3938
3939    def testAccept(self):
3940        # Testing non-blocking accept
3941        self.serv.setblocking(0)
3942        try:
3943            conn, addr = self.serv.accept()
3944        except OSError:
3945            pass
3946        else:
3947            self.fail("Error trying to do non-blocking accept.")
3948        read, write, err = select.select([self.serv], [], [])
3949        if self.serv in read:
3950            conn, addr = self.serv.accept()
3951            self.assertIsNone(conn.gettimeout())
3952            conn.close()
3953        else:
3954            self.fail("Error trying to do accept after select.")
3955
3956    def _testAccept(self):
3957        time.sleep(0.1)
3958        self.cli.connect((HOST, self.port))
3959
3960    def testConnect(self):
3961        # Testing non-blocking connect
3962        conn, addr = self.serv.accept()
3963        conn.close()
3964
3965    def _testConnect(self):
3966        self.cli.settimeout(10)
3967        self.cli.connect((HOST, self.port))
3968
3969    def testRecv(self):
3970        # Testing non-blocking recv
3971        conn, addr = self.serv.accept()
3972        conn.setblocking(0)
3973        try:
3974            msg = conn.recv(len(MSG))
3975        except OSError:
3976            pass
3977        else:
3978            self.fail("Error trying to do non-blocking recv.")
3979        read, write, err = select.select([conn], [], [])
3980        if conn in read:
3981            msg = conn.recv(len(MSG))
3982            conn.close()
3983            self.assertEqual(msg, MSG)
3984        else:
3985            self.fail("Error during select call to non-blocking socket.")
3986
3987    def _testRecv(self):
3988        self.cli.connect((HOST, self.port))
3989        time.sleep(0.1)
3990        self.cli.send(MSG)
3991
3992@unittest.skipUnless(thread, 'Threading required for this test.')
3993class FileObjectClassTestCase(SocketConnectedTest):
3994    """Unit tests for the object returned by socket.makefile()
3995
3996    self.read_file is the io object returned by makefile() on
3997    the client connection.  You can read from this file to
3998    get output from the server.
3999
4000    self.write_file is the io object returned by makefile() on the
4001    server connection.  You can write to this file to send output
4002    to the client.
4003    """
4004
4005    bufsize = -1 # Use default buffer size
4006    encoding = 'utf-8'
4007    errors = 'strict'
4008    newline = None
4009
4010    read_mode = 'rb'
4011    read_msg = MSG
4012    write_mode = 'wb'
4013    write_msg = MSG
4014
4015    def __init__(self, methodName='runTest'):
4016        SocketConnectedTest.__init__(self, methodName=methodName)
4017
4018    def setUp(self):
4019        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4020            threading.Event() for i in range(4)]
4021        SocketConnectedTest.setUp(self)
4022        self.read_file = self.cli_conn.makefile(
4023            self.read_mode, self.bufsize,
4024            encoding = self.encoding,
4025            errors = self.errors,
4026            newline = self.newline)
4027
4028    def tearDown(self):
4029        self.serv_finished.set()
4030        self.read_file.close()
4031        self.assertTrue(self.read_file.closed)
4032        self.read_file = None
4033        SocketConnectedTest.tearDown(self)
4034
4035    def clientSetUp(self):
4036        SocketConnectedTest.clientSetUp(self)
4037        self.write_file = self.serv_conn.makefile(
4038            self.write_mode, self.bufsize,
4039            encoding = self.encoding,
4040            errors = self.errors,
4041            newline = self.newline)
4042
4043    def clientTearDown(self):
4044        self.cli_finished.set()
4045        self.write_file.close()
4046        self.assertTrue(self.write_file.closed)
4047        self.write_file = None
4048        SocketConnectedTest.clientTearDown(self)
4049
4050    def testReadAfterTimeout(self):
4051        # Issue #7322: A file object must disallow further reads
4052        # after a timeout has occurred.
4053        self.cli_conn.settimeout(1)
4054        self.read_file.read(3)
4055        # First read raises a timeout
4056        self.assertRaises(socket.timeout, self.read_file.read, 1)
4057        # Second read is disallowed
4058        with self.assertRaises(OSError) as ctx:
4059            self.read_file.read(1)
4060        self.assertIn("cannot read from timed out object", str(ctx.exception))
4061
4062    def _testReadAfterTimeout(self):
4063        self.write_file.write(self.write_msg[0:3])
4064        self.write_file.flush()
4065        self.serv_finished.wait()
4066
4067    def testSmallRead(self):
4068        # Performing small file read test
4069        first_seg = self.read_file.read(len(self.read_msg)-3)
4070        second_seg = self.read_file.read(3)
4071        msg = first_seg + second_seg
4072        self.assertEqual(msg, self.read_msg)
4073
4074    def _testSmallRead(self):
4075        self.write_file.write(self.write_msg)
4076        self.write_file.flush()
4077
4078    def testFullRead(self):
4079        # read until EOF
4080        msg = self.read_file.read()
4081        self.assertEqual(msg, self.read_msg)
4082
4083    def _testFullRead(self):
4084        self.write_file.write(self.write_msg)
4085        self.write_file.close()
4086
4087    def testUnbufferedRead(self):
4088        # Performing unbuffered file read test
4089        buf = type(self.read_msg)()
4090        while 1:
4091            char = self.read_file.read(1)
4092            if not char:
4093                break
4094            buf += char
4095        self.assertEqual(buf, self.read_msg)
4096
4097    def _testUnbufferedRead(self):
4098        self.write_file.write(self.write_msg)
4099        self.write_file.flush()
4100
4101    def testReadline(self):
4102        # Performing file readline test
4103        line = self.read_file.readline()
4104        self.assertEqual(line, self.read_msg)
4105
4106    def _testReadline(self):
4107        self.write_file.write(self.write_msg)
4108        self.write_file.flush()
4109
4110    def testCloseAfterMakefile(self):
4111        # The file returned by makefile should keep the socket open.
4112        self.cli_conn.close()
4113        # read until EOF
4114        msg = self.read_file.read()
4115        self.assertEqual(msg, self.read_msg)
4116
4117    def _testCloseAfterMakefile(self):
4118        self.write_file.write(self.write_msg)
4119        self.write_file.flush()
4120
4121    def testMakefileAfterMakefileClose(self):
4122        self.read_file.close()
4123        msg = self.cli_conn.recv(len(MSG))
4124        if isinstance(self.read_msg, str):
4125            msg = msg.decode()
4126        self.assertEqual(msg, self.read_msg)
4127
4128    def _testMakefileAfterMakefileClose(self):
4129        self.write_file.write(self.write_msg)
4130        self.write_file.flush()
4131
4132    def testClosedAttr(self):
4133        self.assertTrue(not self.read_file.closed)
4134
4135    def _testClosedAttr(self):
4136        self.assertTrue(not self.write_file.closed)
4137
4138    def testAttributes(self):
4139        self.assertEqual(self.read_file.mode, self.read_mode)
4140        self.assertEqual(self.read_file.name, self.cli_conn.fileno())
4141
4142    def _testAttributes(self):
4143        self.assertEqual(self.write_file.mode, self.write_mode)
4144        self.assertEqual(self.write_file.name, self.serv_conn.fileno())
4145
4146    def testRealClose(self):
4147        self.read_file.close()
4148        self.assertRaises(ValueError, self.read_file.fileno)
4149        self.cli_conn.close()
4150        self.assertRaises(OSError, self.cli_conn.getsockname)
4151
4152    def _testRealClose(self):
4153        pass
4154
4155
4156class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
4157
4158    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
4159
4160    In this case (and in this case only), it should be possible to
4161    create a file object, read a line from it, create another file
4162    object, read another line from it, without loss of data in the
4163    first file object's buffer.  Note that http.client relies on this
4164    when reading multiple requests from the same socket."""
4165
4166    bufsize = 0 # Use unbuffered mode
4167
4168    def testUnbufferedReadline(self):
4169        # Read a line, create a new file object, read another line with it
4170        line = self.read_file.readline() # first line
4171        self.assertEqual(line, b"A. " + self.write_msg) # first line
4172        self.read_file = self.cli_conn.makefile('rb', 0)
4173        line = self.read_file.readline() # second line
4174        self.assertEqual(line, b"B. " + self.write_msg) # second line
4175
4176    def _testUnbufferedReadline(self):
4177        self.write_file.write(b"A. " + self.write_msg)
4178        self.write_file.write(b"B. " + self.write_msg)
4179        self.write_file.flush()
4180
4181    def testMakefileClose(self):
4182        # The file returned by makefile should keep the socket open...
4183        self.cli_conn.close()
4184        msg = self.cli_conn.recv(1024)
4185        self.assertEqual(msg, self.read_msg)
4186        # ...until the file is itself closed
4187        self.read_file.close()
4188        self.assertRaises(OSError, self.cli_conn.recv, 1024)
4189
4190    def _testMakefileClose(self):
4191        self.write_file.write(self.write_msg)
4192        self.write_file.flush()
4193
4194    def testMakefileCloseSocketDestroy(self):
4195        refcount_before = sys.getrefcount(self.cli_conn)
4196        self.read_file.close()
4197        refcount_after = sys.getrefcount(self.cli_conn)
4198        self.assertEqual(refcount_before - 1, refcount_after)
4199
4200    def _testMakefileCloseSocketDestroy(self):
4201        pass
4202
4203    # Non-blocking ops
4204    # NOTE: to set `read_file` as non-blocking, we must call
4205    # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
4206
4207    def testSmallReadNonBlocking(self):
4208        self.cli_conn.setblocking(False)
4209        self.assertEqual(self.read_file.readinto(bytearray(10)), None)
4210        self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
4211        self.evt1.set()
4212        self.evt2.wait(1.0)
4213        first_seg = self.read_file.read(len(self.read_msg) - 3)
4214        if first_seg is None:
4215            # Data not arrived (can happen under Windows), wait a bit
4216            time.sleep(0.5)
4217            first_seg = self.read_file.read(len(self.read_msg) - 3)
4218        buf = bytearray(10)
4219        n = self.read_file.readinto(buf)
4220        self.assertEqual(n, 3)
4221        msg = first_seg + buf[:n]
4222        self.assertEqual(msg, self.read_msg)
4223        self.assertEqual(self.read_file.readinto(bytearray(16)), None)
4224        self.assertEqual(self.read_file.read(1), None)
4225
4226    def _testSmallReadNonBlocking(self):
4227        self.evt1.wait(1.0)
4228        self.write_file.write(self.write_msg)
4229        self.write_file.flush()
4230        self.evt2.set()
4231        # Avoid cloding the socket before the server test has finished,
4232        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
4233        self.serv_finished.wait(5.0)
4234
4235    def testWriteNonBlocking(self):
4236        self.cli_finished.wait(5.0)
4237        # The client thread can't skip directly - the SkipTest exception
4238        # would appear as a failure.
4239        if self.serv_skipped:
4240            self.skipTest(self.serv_skipped)
4241
4242    def _testWriteNonBlocking(self):
4243        self.serv_skipped = None
4244        self.serv_conn.setblocking(False)
4245        # Try to saturate the socket buffer pipe with repeated large writes.
4246        BIG = b"x" * support.SOCK_MAX_SIZE
4247        LIMIT = 10
4248        # The first write() succeeds since a chunk of data can be buffered
4249        n = self.write_file.write(BIG)
4250        self.assertGreater(n, 0)
4251        for i in range(LIMIT):
4252            n = self.write_file.write(BIG)
4253            if n is None:
4254                # Succeeded
4255                break
4256            self.assertGreater(n, 0)
4257        else:
4258            # Let us know that this test didn't manage to establish
4259            # the expected conditions. This is not a failure in itself but,
4260            # if it happens repeatedly, the test should be fixed.
4261            self.serv_skipped = "failed to saturate the socket buffer"
4262
4263
4264class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4265
4266    bufsize = 1 # Default-buffered for reading; line-buffered for writing
4267
4268
4269class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4270
4271    bufsize = 2 # Exercise the buffering code
4272
4273
4274class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
4275    """Tests for socket.makefile() in text mode (rather than binary)"""
4276
4277    read_mode = 'r'
4278    read_msg = MSG.decode('utf-8')
4279    write_mode = 'wb'
4280    write_msg = MSG
4281    newline = ''
4282
4283
4284class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
4285    """Tests for socket.makefile() in text mode (rather than binary)"""
4286
4287    read_mode = 'rb'
4288    read_msg = MSG
4289    write_mode = 'w'
4290    write_msg = MSG.decode('utf-8')
4291    newline = ''
4292
4293
4294class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
4295    """Tests for socket.makefile() in text mode (rather than binary)"""
4296
4297    read_mode = 'r'
4298    read_msg = MSG.decode('utf-8')
4299    write_mode = 'w'
4300    write_msg = MSG.decode('utf-8')
4301    newline = ''
4302
4303
4304class NetworkConnectionTest(object):
4305    """Prove network connection."""
4306
4307    def clientSetUp(self):
4308        # We're inherited below by BasicTCPTest2, which also inherits
4309        # BasicTCPTest, which defines self.port referenced below.
4310        self.cli = socket.create_connection((HOST, self.port))
4311        self.serv_conn = self.cli
4312
4313class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
4314    """Tests that NetworkConnection does not break existing TCP functionality.
4315    """
4316
4317class NetworkConnectionNoServer(unittest.TestCase):
4318
4319    class MockSocket(socket.socket):
4320        def connect(self, *args):
4321            raise socket.timeout('timed out')
4322
4323    @contextlib.contextmanager
4324    def mocked_socket_module(self):
4325        """Return a socket which times out on connect"""
4326        old_socket = socket.socket
4327        socket.socket = self.MockSocket
4328        try:
4329            yield
4330        finally:
4331            socket.socket = old_socket
4332
4333    def test_connect(self):
4334        port = support.find_unused_port()
4335        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4336        self.addCleanup(cli.close)
4337        with self.assertRaises(OSError) as cm:
4338            cli.connect((HOST, port))
4339        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
4340
4341    def test_create_connection(self):
4342        # Issue #9792: errors raised by create_connection() should have
4343        # a proper errno attribute.
4344        port = support.find_unused_port()
4345        with self.assertRaises(OSError) as cm:
4346            socket.create_connection((HOST, port))
4347
4348        # Issue #16257: create_connection() calls getaddrinfo() against
4349        # 'localhost'.  This may result in an IPV6 addr being returned
4350        # as well as an IPV4 one:
4351        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
4352        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
4353        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
4354        #
4355        # create_connection() enumerates through all the addresses returned
4356        # and if it doesn't successfully bind to any of them, it propagates
4357        # the last exception it encountered.
4358        #
4359        # On Solaris, ENETUNREACH is returned in this circumstance instead
4360        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
4361        # expected errnos.
4362        expected_errnos = [ errno.ECONNREFUSED, ]
4363        if hasattr(errno, 'ENETUNREACH'):
4364            expected_errnos.append(errno.ENETUNREACH)
4365
4366        self.assertIn(cm.exception.errno, expected_errnos)
4367
4368    def test_create_connection_timeout(self):
4369        # Issue #9792: create_connection() should not recast timeout errors
4370        # as generic socket errors.
4371        with self.mocked_socket_module():
4372            with self.assertRaises(socket.timeout):
4373                socket.create_connection((HOST, 1234))
4374
4375
4376@unittest.skipUnless(thread, 'Threading required for this test.')
4377class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
4378
4379    def __init__(self, methodName='runTest'):
4380        SocketTCPTest.__init__(self, methodName=methodName)
4381        ThreadableTest.__init__(self)
4382
4383    def clientSetUp(self):
4384        self.source_port = support.find_unused_port()
4385
4386    def clientTearDown(self):
4387        self.cli.close()
4388        self.cli = None
4389        ThreadableTest.clientTearDown(self)
4390
4391    def _justAccept(self):
4392        conn, addr = self.serv.accept()
4393        conn.close()
4394
4395    testFamily = _justAccept
4396    def _testFamily(self):
4397        self.cli = socket.create_connection((HOST, self.port), timeout=30)
4398        self.addCleanup(self.cli.close)
4399        self.assertEqual(self.cli.family, 2)
4400
4401    testSourceAddress = _justAccept
4402    def _testSourceAddress(self):
4403        self.cli = socket.create_connection((HOST, self.port), timeout=30,
4404                source_address=('', self.source_port))
4405        self.addCleanup(self.cli.close)
4406        self.assertEqual(self.cli.getsockname()[1], self.source_port)
4407        # The port number being used is sufficient to show that the bind()
4408        # call happened.
4409
4410    testTimeoutDefault = _justAccept
4411    def _testTimeoutDefault(self):
4412        # passing no explicit timeout uses socket's global default
4413        self.assertTrue(socket.getdefaulttimeout() is None)
4414        socket.setdefaulttimeout(42)
4415        try:
4416            self.cli = socket.create_connection((HOST, self.port))
4417            self.addCleanup(self.cli.close)
4418        finally:
4419            socket.setdefaulttimeout(None)
4420        self.assertEqual(self.cli.gettimeout(), 42)
4421
4422    testTimeoutNone = _justAccept
4423    def _testTimeoutNone(self):
4424        # None timeout means the same as sock.settimeout(None)
4425        self.assertTrue(socket.getdefaulttimeout() is None)
4426        socket.setdefaulttimeout(30)
4427        try:
4428            self.cli = socket.create_connection((HOST, self.port), timeout=None)
4429            self.addCleanup(self.cli.close)
4430        finally:
4431            socket.setdefaulttimeout(None)
4432        self.assertEqual(self.cli.gettimeout(), None)
4433
4434    testTimeoutValueNamed = _justAccept
4435    def _testTimeoutValueNamed(self):
4436        self.cli = socket.create_connection((HOST, self.port), timeout=30)
4437        self.assertEqual(self.cli.gettimeout(), 30)
4438
4439    testTimeoutValueNonamed = _justAccept
4440    def _testTimeoutValueNonamed(self):
4441        self.cli = socket.create_connection((HOST, self.port), 30)
4442        self.addCleanup(self.cli.close)
4443        self.assertEqual(self.cli.gettimeout(), 30)
4444
4445@unittest.skipUnless(thread, 'Threading required for this test.')
4446class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
4447
4448    def __init__(self, methodName='runTest'):
4449        SocketTCPTest.__init__(self, methodName=methodName)
4450        ThreadableTest.__init__(self)
4451
4452    def clientSetUp(self):
4453        pass
4454
4455    def clientTearDown(self):
4456        self.cli.close()
4457        self.cli = None
4458        ThreadableTest.clientTearDown(self)
4459
4460    def testInsideTimeout(self):
4461        conn, addr = self.serv.accept()
4462        self.addCleanup(conn.close)
4463        time.sleep(3)
4464        conn.send(b"done!")
4465    testOutsideTimeout = testInsideTimeout
4466
4467    def _testInsideTimeout(self):
4468        self.cli = sock = socket.create_connection((HOST, self.port))
4469        data = sock.recv(5)
4470        self.assertEqual(data, b"done!")
4471
4472    def _testOutsideTimeout(self):
4473        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
4474        self.assertRaises(socket.timeout, lambda: sock.recv(5))
4475
4476
4477class TCPTimeoutTest(SocketTCPTest):
4478
4479    def testTCPTimeout(self):
4480        def raise_timeout(*args, **kwargs):
4481            self.serv.settimeout(1.0)
4482            self.serv.accept()
4483        self.assertRaises(socket.timeout, raise_timeout,
4484                              "Error generating a timeout exception (TCP)")
4485
4486    def testTimeoutZero(self):
4487        ok = False
4488        try:
4489            self.serv.settimeout(0.0)
4490            foo = self.serv.accept()
4491        except socket.timeout:
4492            self.fail("caught timeout instead of error (TCP)")
4493        except OSError:
4494            ok = True
4495        except:
4496            self.fail("caught unexpected exception (TCP)")
4497        if not ok:
4498            self.fail("accept() returned success when we did not expect it")
4499
4500    @unittest.skipUnless(hasattr(signal, 'alarm'),
4501                         'test needs signal.alarm()')
4502    def testInterruptedTimeout(self):
4503        # XXX I don't know how to do this test on MSWindows or any other
4504        # plaform that doesn't support signal.alarm() or os.kill(), though
4505        # the bug should have existed on all platforms.
4506        self.serv.settimeout(5.0)   # must be longer than alarm
4507        class Alarm(Exception):
4508            pass
4509        def alarm_handler(signal, frame):
4510            raise Alarm
4511        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
4512        try:
4513            signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
4514            try:
4515                foo = self.serv.accept()
4516            except socket.timeout:
4517                self.fail("caught timeout instead of Alarm")
4518            except Alarm:
4519                pass
4520            except:
4521                self.fail("caught other exception instead of Alarm:"
4522                          " %s(%s):\n%s" %
4523                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
4524            else:
4525                self.fail("nothing caught")
4526            finally:
4527                signal.alarm(0)         # shut off alarm
4528        except Alarm:
4529            self.fail("got Alarm in wrong place")
4530        finally:
4531            # no alarm can be pending.  Safe to restore old handler.
4532            signal.signal(signal.SIGALRM, old_alarm)
4533
4534class UDPTimeoutTest(SocketUDPTest):
4535
4536    def testUDPTimeout(self):
4537        def raise_timeout(*args, **kwargs):
4538            self.serv.settimeout(1.0)
4539            self.serv.recv(1024)
4540        self.assertRaises(socket.timeout, raise_timeout,
4541                              "Error generating a timeout exception (UDP)")
4542
4543    def testTimeoutZero(self):
4544        ok = False
4545        try:
4546            self.serv.settimeout(0.0)
4547            foo = self.serv.recv(1024)
4548        except socket.timeout:
4549            self.fail("caught timeout instead of error (UDP)")
4550        except OSError:
4551            ok = True
4552        except:
4553            self.fail("caught unexpected exception (UDP)")
4554        if not ok:
4555            self.fail("recv() returned success when we did not expect it")
4556
4557class TestExceptions(unittest.TestCase):
4558
4559    def testExceptionTree(self):
4560        self.assertTrue(issubclass(OSError, Exception))
4561        self.assertTrue(issubclass(socket.herror, OSError))
4562        self.assertTrue(issubclass(socket.gaierror, OSError))
4563        self.assertTrue(issubclass(socket.timeout, OSError))
4564
4565    def test_setblocking_invalidfd(self):
4566        # Regression test for issue #28471
4567
4568        sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
4569        sock = socket.socket(
4570            socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
4571        sock0.close()
4572        self.addCleanup(sock.detach)
4573
4574        with self.assertRaises(OSError):
4575            sock.setblocking(False)
4576
4577
4578@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
4579class TestLinuxAbstractNamespace(unittest.TestCase):
4580
4581    UNIX_PATH_MAX = 108
4582
4583    def testLinuxAbstractNamespace(self):
4584        address = b"\x00python-test-hello\x00\xff"
4585        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
4586            s1.bind(address)
4587            s1.listen()
4588            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
4589                s2.connect(s1.getsockname())
4590                with s1.accept()[0] as s3:
4591                    self.assertEqual(s1.getsockname(), address)
4592                    self.assertEqual(s2.getpeername(), address)
4593
4594    def testMaxName(self):
4595        address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
4596        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4597            s.bind(address)
4598            self.assertEqual(s.getsockname(), address)
4599
4600    def testNameOverflow(self):
4601        address = "\x00" + "h" * self.UNIX_PATH_MAX
4602        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4603            self.assertRaises(OSError, s.bind, address)
4604
4605    def testStrName(self):
4606        # Check that an abstract name can be passed as a string.
4607        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
4608        try:
4609            s.bind("\x00python\x00test\x00")
4610            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
4611        finally:
4612            s.close()
4613
4614    def testBytearrayName(self):
4615        # Check that an abstract name can be passed as a bytearray.
4616        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4617            s.bind(bytearray(b"\x00python\x00test\x00"))
4618            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
4619
4620@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
4621class TestUnixDomain(unittest.TestCase):
4622
4623    def setUp(self):
4624        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
4625
4626    def tearDown(self):
4627        self.sock.close()
4628
4629    def encoded(self, path):
4630        # Return the given path encoded in the file system encoding,
4631        # or skip the test if this is not possible.
4632        try:
4633            return os.fsencode(path)
4634        except UnicodeEncodeError:
4635            self.skipTest(
4636                "Pathname {0!a} cannot be represented in file "
4637                "system encoding {1!r}".format(
4638                    path, sys.getfilesystemencoding()))
4639
4640    def bind(self, sock, path):
4641        # Bind the socket
4642        try:
4643            support.bind_unix_socket(sock, path)
4644        except OSError as e:
4645            if str(e) == "AF_UNIX path too long":
4646                self.skipTest(
4647                    "Pathname {0!a} is too long to serve as an AF_UNIX path"
4648                    .format(path))
4649            else:
4650                raise
4651
4652    def testStrAddr(self):
4653        # Test binding to and retrieving a normal string pathname.
4654        path = os.path.abspath(support.TESTFN)
4655        self.bind(self.sock, path)
4656        self.addCleanup(support.unlink, path)
4657        self.assertEqual(self.sock.getsockname(), path)
4658
4659    def testBytesAddr(self):
4660        # Test binding to a bytes pathname.
4661        path = os.path.abspath(support.TESTFN)
4662        self.bind(self.sock, self.encoded(path))
4663        self.addCleanup(support.unlink, path)
4664        self.assertEqual(self.sock.getsockname(), path)
4665
4666    def testSurrogateescapeBind(self):
4667        # Test binding to a valid non-ASCII pathname, with the
4668        # non-ASCII bytes supplied using surrogateescape encoding.
4669        path = os.path.abspath(support.TESTFN_UNICODE)
4670        b = self.encoded(path)
4671        self.bind(self.sock, b.decode("ascii", "surrogateescape"))
4672        self.addCleanup(support.unlink, path)
4673        self.assertEqual(self.sock.getsockname(), path)
4674
4675    def testUnencodableAddr(self):
4676        # Test binding to a pathname that cannot be encoded in the
4677        # file system encoding.
4678        if support.TESTFN_UNENCODABLE is None:
4679            self.skipTest("No unencodable filename available")
4680        path = os.path.abspath(support.TESTFN_UNENCODABLE)
4681        self.bind(self.sock, path)
4682        self.addCleanup(support.unlink, path)
4683        self.assertEqual(self.sock.getsockname(), path)
4684
4685@unittest.skipUnless(thread, 'Threading required for this test.')
4686class BufferIOTest(SocketConnectedTest):
4687    """
4688    Test the buffer versions of socket.recv() and socket.send().
4689    """
4690    def __init__(self, methodName='runTest'):
4691        SocketConnectedTest.__init__(self, methodName=methodName)
4692
4693    def testRecvIntoArray(self):
4694        buf = array.array("B", [0] * len(MSG))
4695        nbytes = self.cli_conn.recv_into(buf)
4696        self.assertEqual(nbytes, len(MSG))
4697        buf = buf.tobytes()
4698        msg = buf[:len(MSG)]
4699        self.assertEqual(msg, MSG)
4700
4701    def _testRecvIntoArray(self):
4702        buf = bytes(MSG)
4703        self.serv_conn.send(buf)
4704
4705    def testRecvIntoBytearray(self):
4706        buf = bytearray(1024)
4707        nbytes = self.cli_conn.recv_into(buf)
4708        self.assertEqual(nbytes, len(MSG))
4709        msg = buf[:len(MSG)]
4710        self.assertEqual(msg, MSG)
4711
4712    _testRecvIntoBytearray = _testRecvIntoArray
4713
4714    def testRecvIntoMemoryview(self):
4715        buf = bytearray(1024)
4716        nbytes = self.cli_conn.recv_into(memoryview(buf))
4717        self.assertEqual(nbytes, len(MSG))
4718        msg = buf[:len(MSG)]
4719        self.assertEqual(msg, MSG)
4720
4721    _testRecvIntoMemoryview = _testRecvIntoArray
4722
4723    def testRecvFromIntoArray(self):
4724        buf = array.array("B", [0] * len(MSG))
4725        nbytes, addr = self.cli_conn.recvfrom_into(buf)
4726        self.assertEqual(nbytes, len(MSG))
4727        buf = buf.tobytes()
4728        msg = buf[:len(MSG)]
4729        self.assertEqual(msg, MSG)
4730
4731    def _testRecvFromIntoArray(self):
4732        buf = bytes(MSG)
4733        self.serv_conn.send(buf)
4734
4735    def testRecvFromIntoBytearray(self):
4736        buf = bytearray(1024)
4737        nbytes, addr = self.cli_conn.recvfrom_into(buf)
4738        self.assertEqual(nbytes, len(MSG))
4739        msg = buf[:len(MSG)]
4740        self.assertEqual(msg, MSG)
4741
4742    _testRecvFromIntoBytearray = _testRecvFromIntoArray
4743
4744    def testRecvFromIntoMemoryview(self):
4745        buf = bytearray(1024)
4746        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
4747        self.assertEqual(nbytes, len(MSG))
4748        msg = buf[:len(MSG)]
4749        self.assertEqual(msg, MSG)
4750
4751    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
4752
4753    def testRecvFromIntoSmallBuffer(self):
4754        # See issue #20246.
4755        buf = bytearray(8)
4756        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
4757
4758    def _testRecvFromIntoSmallBuffer(self):
4759        self.serv_conn.send(MSG)
4760
4761    def testRecvFromIntoEmptyBuffer(self):
4762        buf = bytearray()
4763        self.cli_conn.recvfrom_into(buf)
4764        self.cli_conn.recvfrom_into(buf, 0)
4765
4766    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
4767
4768
4769TIPC_STYPE = 2000
4770TIPC_LOWER = 200
4771TIPC_UPPER = 210
4772
4773def isTipcAvailable():
4774    """Check if the TIPC module is loaded
4775
4776    The TIPC module is not loaded automatically on Ubuntu and probably
4777    other Linux distros.
4778    """
4779    if not hasattr(socket, "AF_TIPC"):
4780        return False
4781    try:
4782        f = open("/proc/modules")
4783    except (FileNotFoundError, IsADirectoryError, PermissionError):
4784        # It's ok if the file does not exist, is a directory or if we
4785        # have not the permission to read it.
4786        return False
4787    with f:
4788        for line in f:
4789            if line.startswith("tipc "):
4790                return True
4791    return False
4792
4793@unittest.skipUnless(isTipcAvailable(),
4794                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
4795class TIPCTest(unittest.TestCase):
4796    def testRDM(self):
4797        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
4798        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
4799        self.addCleanup(srv.close)
4800        self.addCleanup(cli.close)
4801
4802        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
4803        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
4804                TIPC_LOWER, TIPC_UPPER)
4805        srv.bind(srvaddr)
4806
4807        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
4808                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
4809        cli.sendto(MSG, sendaddr)
4810
4811        msg, recvaddr = srv.recvfrom(1024)
4812
4813        self.assertEqual(cli.getsockname(), recvaddr)
4814        self.assertEqual(msg, MSG)
4815
4816
4817@unittest.skipUnless(isTipcAvailable(),
4818                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
4819class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
4820    def __init__(self, methodName = 'runTest'):
4821        unittest.TestCase.__init__(self, methodName = methodName)
4822        ThreadableTest.__init__(self)
4823
4824    def setUp(self):
4825        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
4826        self.addCleanup(self.srv.close)
4827        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
4828        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
4829                TIPC_LOWER, TIPC_UPPER)
4830        self.srv.bind(srvaddr)
4831        self.srv.listen()
4832        self.serverExplicitReady()
4833        self.conn, self.connaddr = self.srv.accept()
4834        self.addCleanup(self.conn.close)
4835
4836    def clientSetUp(self):
4837        # There is a hittable race between serverExplicitReady() and the
4838        # accept() call; sleep a little while to avoid it, otherwise
4839        # we could get an exception
4840        time.sleep(0.1)
4841        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
4842        self.addCleanup(self.cli.close)
4843        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
4844                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
4845        self.cli.connect(addr)
4846        self.cliaddr = self.cli.getsockname()
4847
4848    def testStream(self):
4849        msg = self.conn.recv(1024)
4850        self.assertEqual(msg, MSG)
4851        self.assertEqual(self.cliaddr, self.connaddr)
4852
4853    def _testStream(self):
4854        self.cli.send(MSG)
4855        self.cli.close()
4856
4857
4858@unittest.skipUnless(thread, 'Threading required for this test.')
4859class ContextManagersTest(ThreadedTCPSocketTest):
4860
4861    def _testSocketClass(self):
4862        # base test
4863        with socket.socket() as sock:
4864            self.assertFalse(sock._closed)
4865        self.assertTrue(sock._closed)
4866        # close inside with block
4867        with socket.socket() as sock:
4868            sock.close()
4869        self.assertTrue(sock._closed)
4870        # exception inside with block
4871        with socket.socket() as sock:
4872            self.assertRaises(OSError, sock.sendall, b'foo')
4873        self.assertTrue(sock._closed)
4874
4875    def testCreateConnectionBase(self):
4876        conn, addr = self.serv.accept()
4877        self.addCleanup(conn.close)
4878        data = conn.recv(1024)
4879        conn.sendall(data)
4880
4881    def _testCreateConnectionBase(self):
4882        address = self.serv.getsockname()
4883        with socket.create_connection(address) as sock:
4884            self.assertFalse(sock._closed)
4885            sock.sendall(b'foo')
4886            self.assertEqual(sock.recv(1024), b'foo')
4887        self.assertTrue(sock._closed)
4888
4889    def testCreateConnectionClose(self):
4890        conn, addr = self.serv.accept()
4891        self.addCleanup(conn.close)
4892        data = conn.recv(1024)
4893        conn.sendall(data)
4894
4895    def _testCreateConnectionClose(self):
4896        address = self.serv.getsockname()
4897        with socket.create_connection(address) as sock:
4898            sock.close()
4899        self.assertTrue(sock._closed)
4900        self.assertRaises(OSError, sock.sendall, b'foo')
4901
4902
4903class InheritanceTest(unittest.TestCase):
4904    @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
4905                         "SOCK_CLOEXEC not defined")
4906    @support.requires_linux_version(2, 6, 28)
4907    def test_SOCK_CLOEXEC(self):
4908        with socket.socket(socket.AF_INET,
4909                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
4910            self.assertTrue(s.type & socket.SOCK_CLOEXEC)
4911            self.assertFalse(s.get_inheritable())
4912
4913    def test_default_inheritable(self):
4914        sock = socket.socket()
4915        with sock:
4916            self.assertEqual(sock.get_inheritable(), False)
4917
4918    def test_dup(self):
4919        sock = socket.socket()
4920        with sock:
4921            newsock = sock.dup()
4922            sock.close()
4923            with newsock:
4924                self.assertEqual(newsock.get_inheritable(), False)
4925
4926    def test_set_inheritable(self):
4927        sock = socket.socket()
4928        with sock:
4929            sock.set_inheritable(True)
4930            self.assertEqual(sock.get_inheritable(), True)
4931
4932            sock.set_inheritable(False)
4933            self.assertEqual(sock.get_inheritable(), False)
4934
4935    @unittest.skipIf(fcntl is None, "need fcntl")
4936    def test_get_inheritable_cloexec(self):
4937        sock = socket.socket()
4938        with sock:
4939            fd = sock.fileno()
4940            self.assertEqual(sock.get_inheritable(), False)
4941
4942            # clear FD_CLOEXEC flag
4943            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
4944            flags &= ~fcntl.FD_CLOEXEC
4945            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
4946
4947            self.assertEqual(sock.get_inheritable(), True)
4948
4949    @unittest.skipIf(fcntl is None, "need fcntl")
4950    def test_set_inheritable_cloexec(self):
4951        sock = socket.socket()
4952        with sock:
4953            fd = sock.fileno()
4954            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
4955                             fcntl.FD_CLOEXEC)
4956
4957            sock.set_inheritable(True)
4958            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
4959                             0)
4960
4961
4962    @unittest.skipUnless(hasattr(socket, "socketpair"),
4963                         "need socket.socketpair()")
4964    def test_socketpair(self):
4965        s1, s2 = socket.socketpair()
4966        self.addCleanup(s1.close)
4967        self.addCleanup(s2.close)
4968        self.assertEqual(s1.get_inheritable(), False)
4969        self.assertEqual(s2.get_inheritable(), False)
4970
4971
4972@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
4973                     "SOCK_NONBLOCK not defined")
4974class NonblockConstantTest(unittest.TestCase):
4975    def checkNonblock(self, s, nonblock=True, timeout=0.0):
4976        if nonblock:
4977            self.assertTrue(s.type & socket.SOCK_NONBLOCK)
4978            self.assertEqual(s.gettimeout(), timeout)
4979        else:
4980            self.assertFalse(s.type & socket.SOCK_NONBLOCK)
4981            self.assertEqual(s.gettimeout(), None)
4982
4983    @support.requires_linux_version(2, 6, 28)
4984    def test_SOCK_NONBLOCK(self):
4985        # a lot of it seems silly and redundant, but I wanted to test that
4986        # changing back and forth worked ok
4987        with socket.socket(socket.AF_INET,
4988                           socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
4989            self.checkNonblock(s)
4990            s.setblocking(1)
4991            self.checkNonblock(s, False)
4992            s.setblocking(0)
4993            self.checkNonblock(s)
4994            s.settimeout(None)
4995            self.checkNonblock(s, False)
4996            s.settimeout(2.0)
4997            self.checkNonblock(s, timeout=2.0)
4998            s.setblocking(1)
4999            self.checkNonblock(s, False)
5000        # defaulttimeout
5001        t = socket.getdefaulttimeout()
5002        socket.setdefaulttimeout(0.0)
5003        with socket.socket() as s:
5004            self.checkNonblock(s)
5005        socket.setdefaulttimeout(None)
5006        with socket.socket() as s:
5007            self.checkNonblock(s, False)
5008        socket.setdefaulttimeout(2.0)
5009        with socket.socket() as s:
5010            self.checkNonblock(s, timeout=2.0)
5011        socket.setdefaulttimeout(None)
5012        with socket.socket() as s:
5013            self.checkNonblock(s, False)
5014        socket.setdefaulttimeout(t)
5015
5016
5017@unittest.skipUnless(os.name == "nt", "Windows specific")
5018@unittest.skipUnless(multiprocessing, "need multiprocessing")
5019class TestSocketSharing(SocketTCPTest):
5020    # This must be classmethod and not staticmethod or multiprocessing
5021    # won't be able to bootstrap it.
5022    @classmethod
5023    def remoteProcessServer(cls, q):
5024        # Recreate socket from shared data
5025        sdata = q.get()
5026        message = q.get()
5027
5028        s = socket.fromshare(sdata)
5029        s2, c = s.accept()
5030
5031        # Send the message
5032        s2.sendall(message)
5033        s2.close()
5034        s.close()
5035
5036    def testShare(self):
5037        # Transfer the listening server socket to another process
5038        # and service it from there.
5039
5040        # Create process:
5041        q = multiprocessing.Queue()
5042        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
5043        p.start()
5044
5045        # Get the shared socket data
5046        data = self.serv.share(p.pid)
5047
5048        # Pass the shared socket to the other process
5049        addr = self.serv.getsockname()
5050        self.serv.close()
5051        q.put(data)
5052
5053        # The data that the server will send us
5054        message = b"slapmahfro"
5055        q.put(message)
5056
5057        # Connect
5058        s = socket.create_connection(addr)
5059        #  listen for the data
5060        m = []
5061        while True:
5062            data = s.recv(100)
5063            if not data:
5064                break
5065            m.append(data)
5066        s.close()
5067        received = b"".join(m)
5068        self.assertEqual(received, message)
5069        p.join()
5070
5071    def testShareLength(self):
5072        data = self.serv.share(os.getpid())
5073        self.assertRaises(ValueError, socket.fromshare, data[:-1])
5074        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
5075
5076    def compareSockets(self, org, other):
5077        # socket sharing is expected to work only for blocking socket
5078        # since the internal python timeout value isn't transferred.
5079        self.assertEqual(org.gettimeout(), None)
5080        self.assertEqual(org.gettimeout(), other.gettimeout())
5081
5082        self.assertEqual(org.family, other.family)
5083        self.assertEqual(org.type, other.type)
5084        # If the user specified "0" for proto, then
5085        # internally windows will have picked the correct value.
5086        # Python introspection on the socket however will still return
5087        # 0.  For the shared socket, the python value is recreated
5088        # from the actual value, so it may not compare correctly.
5089        if org.proto != 0:
5090            self.assertEqual(org.proto, other.proto)
5091
5092    def testShareLocal(self):
5093        data = self.serv.share(os.getpid())
5094        s = socket.fromshare(data)
5095        try:
5096            self.compareSockets(self.serv, s)
5097        finally:
5098            s.close()
5099
5100    def testTypes(self):
5101        families = [socket.AF_INET, socket.AF_INET6]
5102        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
5103        for f in families:
5104            for t in types:
5105                try:
5106                    source = socket.socket(f, t)
5107                except OSError:
5108                    continue # This combination is not supported
5109                try:
5110                    data = source.share(os.getpid())
5111                    shared = socket.fromshare(data)
5112                    try:
5113                        self.compareSockets(source, shared)
5114                    finally:
5115                        shared.close()
5116                finally:
5117                    source.close()
5118
5119
5120@unittest.skipUnless(thread, 'Threading required for this test.')
5121class SendfileUsingSendTest(ThreadedTCPSocketTest):
5122    """
5123    Test the send() implementation of socket.sendfile().
5124    """
5125
5126    FILESIZE = (10 * 1024 * 1024)  # 10MB
5127    BUFSIZE = 8192
5128    FILEDATA = b""
5129    TIMEOUT = 2
5130
5131    @classmethod
5132    def setUpClass(cls):
5133        def chunks(total, step):
5134            assert total >= step
5135            while total > step:
5136                yield step
5137                total -= step
5138            if total:
5139                yield total
5140
5141        chunk = b"".join([random.choice(string.ascii_letters).encode()
5142                          for i in range(cls.BUFSIZE)])
5143        with open(support.TESTFN, 'wb') as f:
5144            for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
5145                f.write(chunk)
5146        with open(support.TESTFN, 'rb') as f:
5147            cls.FILEDATA = f.read()
5148            assert len(cls.FILEDATA) == cls.FILESIZE
5149
5150    @classmethod
5151    def tearDownClass(cls):
5152        support.unlink(support.TESTFN)
5153
5154    def accept_conn(self):
5155        self.serv.settimeout(self.TIMEOUT)
5156        conn, addr = self.serv.accept()
5157        conn.settimeout(self.TIMEOUT)
5158        self.addCleanup(conn.close)
5159        return conn
5160
5161    def recv_data(self, conn):
5162        received = []
5163        while True:
5164            chunk = conn.recv(self.BUFSIZE)
5165            if not chunk:
5166                break
5167            received.append(chunk)
5168        return b''.join(received)
5169
5170    def meth_from_sock(self, sock):
5171        # Depending on the mixin class being run return either send()
5172        # or sendfile() method implementation.
5173        return getattr(sock, "_sendfile_use_send")
5174
5175    # regular file
5176
5177    def _testRegularFile(self):
5178        address = self.serv.getsockname()
5179        file = open(support.TESTFN, 'rb')
5180        with socket.create_connection(address) as sock, file as file:
5181            meth = self.meth_from_sock(sock)
5182            sent = meth(file)
5183            self.assertEqual(sent, self.FILESIZE)
5184            self.assertEqual(file.tell(), self.FILESIZE)
5185
5186    def testRegularFile(self):
5187        conn = self.accept_conn()
5188        data = self.recv_data(conn)
5189        self.assertEqual(len(data), self.FILESIZE)
5190        self.assertEqual(data, self.FILEDATA)
5191
5192    # non regular file
5193
5194    def _testNonRegularFile(self):
5195        address = self.serv.getsockname()
5196        file = io.BytesIO(self.FILEDATA)
5197        with socket.create_connection(address) as sock, file as file:
5198            sent = sock.sendfile(file)
5199            self.assertEqual(sent, self.FILESIZE)
5200            self.assertEqual(file.tell(), self.FILESIZE)
5201            self.assertRaises(socket._GiveupOnSendfile,
5202                              sock._sendfile_use_sendfile, file)
5203
5204    def testNonRegularFile(self):
5205        conn = self.accept_conn()
5206        data = self.recv_data(conn)
5207        self.assertEqual(len(data), self.FILESIZE)
5208        self.assertEqual(data, self.FILEDATA)
5209
5210    # empty file
5211
5212    def _testEmptyFileSend(self):
5213        address = self.serv.getsockname()
5214        filename = support.TESTFN + "2"
5215        with open(filename, 'wb'):
5216            self.addCleanup(support.unlink, filename)
5217        file = open(filename, 'rb')
5218        with socket.create_connection(address) as sock, file as file:
5219            meth = self.meth_from_sock(sock)
5220            sent = meth(file)
5221            self.assertEqual(sent, 0)
5222            self.assertEqual(file.tell(), 0)
5223
5224    def testEmptyFileSend(self):
5225        conn = self.accept_conn()
5226        data = self.recv_data(conn)
5227        self.assertEqual(data, b"")
5228
5229    # offset
5230
5231    def _testOffset(self):
5232        address = self.serv.getsockname()
5233        file = open(support.TESTFN, 'rb')
5234        with socket.create_connection(address) as sock, file as file:
5235            meth = self.meth_from_sock(sock)
5236            sent = meth(file, offset=5000)
5237            self.assertEqual(sent, self.FILESIZE - 5000)
5238            self.assertEqual(file.tell(), self.FILESIZE)
5239
5240    def testOffset(self):
5241        conn = self.accept_conn()
5242        data = self.recv_data(conn)
5243        self.assertEqual(len(data), self.FILESIZE - 5000)
5244        self.assertEqual(data, self.FILEDATA[5000:])
5245
5246    # count
5247
5248    def _testCount(self):
5249        address = self.serv.getsockname()
5250        file = open(support.TESTFN, 'rb')
5251        with socket.create_connection(address, timeout=2) as sock, file as file:
5252            count = 5000007
5253            meth = self.meth_from_sock(sock)
5254            sent = meth(file, count=count)
5255            self.assertEqual(sent, count)
5256            self.assertEqual(file.tell(), count)
5257
5258    def testCount(self):
5259        count = 5000007
5260        conn = self.accept_conn()
5261        data = self.recv_data(conn)
5262        self.assertEqual(len(data), count)
5263        self.assertEqual(data, self.FILEDATA[:count])
5264
5265    # count small
5266
5267    def _testCountSmall(self):
5268        address = self.serv.getsockname()
5269        file = open(support.TESTFN, 'rb')
5270        with socket.create_connection(address, timeout=2) as sock, file as file:
5271            count = 1
5272            meth = self.meth_from_sock(sock)
5273            sent = meth(file, count=count)
5274            self.assertEqual(sent, count)
5275            self.assertEqual(file.tell(), count)
5276
5277    def testCountSmall(self):
5278        count = 1
5279        conn = self.accept_conn()
5280        data = self.recv_data(conn)
5281        self.assertEqual(len(data), count)
5282        self.assertEqual(data, self.FILEDATA[:count])
5283
5284    # count + offset
5285
5286    def _testCountWithOffset(self):
5287        address = self.serv.getsockname()
5288        file = open(support.TESTFN, 'rb')
5289        with socket.create_connection(address, timeout=2) as sock, file as file:
5290            count = 100007
5291            meth = self.meth_from_sock(sock)
5292            sent = meth(file, offset=2007, count=count)
5293            self.assertEqual(sent, count)
5294            self.assertEqual(file.tell(), count + 2007)
5295
5296    def testCountWithOffset(self):
5297        count = 100007
5298        conn = self.accept_conn()
5299        data = self.recv_data(conn)
5300        self.assertEqual(len(data), count)
5301        self.assertEqual(data, self.FILEDATA[2007:count+2007])
5302
5303    # non blocking sockets are not supposed to work
5304
5305    def _testNonBlocking(self):
5306        address = self.serv.getsockname()
5307        file = open(support.TESTFN, 'rb')
5308        with socket.create_connection(address) as sock, file as file:
5309            sock.setblocking(False)
5310            meth = self.meth_from_sock(sock)
5311            self.assertRaises(ValueError, meth, file)
5312            self.assertRaises(ValueError, sock.sendfile, file)
5313
5314    def testNonBlocking(self):
5315        conn = self.accept_conn()
5316        if conn.recv(8192):
5317            self.fail('was not supposed to receive any data')
5318
5319    # timeout (non-triggered)
5320
5321    def _testWithTimeout(self):
5322        address = self.serv.getsockname()
5323        file = open(support.TESTFN, 'rb')
5324        with socket.create_connection(address, timeout=2) as sock, file as file:
5325            meth = self.meth_from_sock(sock)
5326            sent = meth(file)
5327            self.assertEqual(sent, self.FILESIZE)
5328
5329    def testWithTimeout(self):
5330        conn = self.accept_conn()
5331        data = self.recv_data(conn)
5332        self.assertEqual(len(data), self.FILESIZE)
5333        self.assertEqual(data, self.FILEDATA)
5334
5335    # timeout (triggered)
5336
5337    def _testWithTimeoutTriggeredSend(self):
5338        address = self.serv.getsockname()
5339        file = open(support.TESTFN, 'rb')
5340        with socket.create_connection(address, timeout=0.01) as sock, \
5341                file as file:
5342            meth = self.meth_from_sock(sock)
5343            self.assertRaises(socket.timeout, meth, file)
5344
5345    def testWithTimeoutTriggeredSend(self):
5346        conn = self.accept_conn()
5347        conn.recv(88192)
5348
5349    # errors
5350
5351    def _test_errors(self):
5352        pass
5353
5354    def test_errors(self):
5355        with open(support.TESTFN, 'rb') as file:
5356            with socket.socket(type=socket.SOCK_DGRAM) as s:
5357                meth = self.meth_from_sock(s)
5358                self.assertRaisesRegex(
5359                    ValueError, "SOCK_STREAM", meth, file)
5360        with open(support.TESTFN, 'rt') as file:
5361            with socket.socket() as s:
5362                meth = self.meth_from_sock(s)
5363                self.assertRaisesRegex(
5364                    ValueError, "binary mode", meth, file)
5365        with open(support.TESTFN, 'rb') as file:
5366            with socket.socket() as s:
5367                meth = self.meth_from_sock(s)
5368                self.assertRaisesRegex(TypeError, "positive integer",
5369                                       meth, file, count='2')
5370                self.assertRaisesRegex(TypeError, "positive integer",
5371                                       meth, file, count=0.1)
5372                self.assertRaisesRegex(ValueError, "positive integer",
5373                                       meth, file, count=0)
5374                self.assertRaisesRegex(ValueError, "positive integer",
5375                                       meth, file, count=-1)
5376
5377
5378@unittest.skipUnless(thread, 'Threading required for this test.')
5379@unittest.skipUnless(hasattr(os, "sendfile"),
5380                     'os.sendfile() required for this test.')
5381class SendfileUsingSendfileTest(SendfileUsingSendTest):
5382    """
5383    Test the sendfile() implementation of socket.sendfile().
5384    """
5385    def meth_from_sock(self, sock):
5386        return getattr(sock, "_sendfile_use_sendfile")
5387
5388
5389@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
5390class LinuxKernelCryptoAPI(unittest.TestCase):
5391    # tests for AF_ALG
5392    def create_alg(self, typ, name):
5393        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5394        try:
5395            sock.bind((typ, name))
5396        except FileNotFoundError as e:
5397            # type / algorithm is not available
5398            sock.close()
5399            raise unittest.SkipTest(str(e), typ, name)
5400        else:
5401            return sock
5402
5403    def test_sha256(self):
5404        expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
5405                                 "177a9cb410ff61f20015ad")
5406        with self.create_alg('hash', 'sha256') as algo:
5407            op, _ = algo.accept()
5408            with op:
5409                op.sendall(b"abc")
5410                self.assertEqual(op.recv(512), expected)
5411
5412            op, _ = algo.accept()
5413            with op:
5414                op.send(b'a', socket.MSG_MORE)
5415                op.send(b'b', socket.MSG_MORE)
5416                op.send(b'c', socket.MSG_MORE)
5417                op.send(b'')
5418                self.assertEqual(op.recv(512), expected)
5419
5420    def test_hmac_sha1(self):
5421        expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
5422        with self.create_alg('hash', 'hmac(sha1)') as algo:
5423            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
5424            op, _ = algo.accept()
5425            with op:
5426                op.sendall(b"what do ya want for nothing?")
5427                self.assertEqual(op.recv(512), expected)
5428
5429    # Although it should work with 3.19 and newer the test blocks on
5430    # Ubuntu 15.10 with Kernel 4.2.0-19.
5431    @support.requires_linux_version(4, 3)
5432    def test_aes_cbc(self):
5433        key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
5434        iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
5435        msg = b"Single block msg"
5436        ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
5437        msglen = len(msg)
5438        with self.create_alg('skcipher', 'cbc(aes)') as algo:
5439            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5440            op, _ = algo.accept()
5441            with op:
5442                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5443                                 flags=socket.MSG_MORE)
5444                op.sendall(msg)
5445                self.assertEqual(op.recv(msglen), ciphertext)
5446
5447            op, _ = algo.accept()
5448            with op:
5449                op.sendmsg_afalg([ciphertext],
5450                                 op=socket.ALG_OP_DECRYPT, iv=iv)
5451                self.assertEqual(op.recv(msglen), msg)
5452
5453            # long message
5454            multiplier = 1024
5455            longmsg = [msg] * multiplier
5456            op, _ = algo.accept()
5457            with op:
5458                op.sendmsg_afalg(longmsg,
5459                                 op=socket.ALG_OP_ENCRYPT, iv=iv)
5460                enc = op.recv(msglen * multiplier)
5461            self.assertEqual(len(enc), msglen * multiplier)
5462            self.assertTrue(enc[:msglen], ciphertext)
5463
5464            op, _ = algo.accept()
5465            with op:
5466                op.sendmsg_afalg([enc],
5467                                 op=socket.ALG_OP_DECRYPT, iv=iv)
5468                dec = op.recv(msglen * multiplier)
5469            self.assertEqual(len(dec), msglen * multiplier)
5470            self.assertEqual(dec, msg * multiplier)
5471
5472    @support.requires_linux_version(4, 3)  # see test_aes_cbc
5473    def test_aead_aes_gcm(self):
5474        key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
5475        iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
5476        plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
5477        assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
5478        expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
5479        expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
5480
5481        taglen = len(expected_tag)
5482        assoclen = len(assoc)
5483
5484        with self.create_alg('aead', 'gcm(aes)') as algo:
5485            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5486            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
5487                            None, taglen)
5488
5489            # send assoc, plain and tag buffer in separate steps
5490            op, _ = algo.accept()
5491            with op:
5492                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5493                                 assoclen=assoclen, flags=socket.MSG_MORE)
5494                op.sendall(assoc, socket.MSG_MORE)
5495                op.sendall(plain, socket.MSG_MORE)
5496                op.sendall(b'\x00' * taglen)
5497                res = op.recv(assoclen + len(plain) + taglen)
5498                self.assertEqual(expected_ct, res[assoclen:-taglen])
5499                self.assertEqual(expected_tag, res[-taglen:])
5500
5501            # now with msg
5502            op, _ = algo.accept()
5503            with op:
5504                msg = assoc + plain + b'\x00' * taglen
5505                op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
5506                                 assoclen=assoclen)
5507                res = op.recv(assoclen + len(plain) + taglen)
5508                self.assertEqual(expected_ct, res[assoclen:-taglen])
5509                self.assertEqual(expected_tag, res[-taglen:])
5510
5511            # create anc data manually
5512            pack_uint32 = struct.Struct('I').pack
5513            op, _ = algo.accept()
5514            with op:
5515                msg = assoc + plain + b'\x00' * taglen
5516                op.sendmsg(
5517                    [msg],
5518                    ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
5519                     [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
5520                     [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
5521                    )
5522                )
5523                res = op.recv(len(msg))
5524                self.assertEqual(expected_ct, res[assoclen:-taglen])
5525                self.assertEqual(expected_tag, res[-taglen:])
5526
5527            # decrypt and verify
5528            op, _ = algo.accept()
5529            with op:
5530                msg = assoc + expected_ct + expected_tag
5531                op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
5532                                 assoclen=assoclen)
5533                res = op.recv(len(msg))
5534                self.assertEqual(plain, res[assoclen:-taglen])
5535
5536    @support.requires_linux_version(4, 3)  # see test_aes_cbc
5537    def test_drbg_pr_sha256(self):
5538        # deterministic random bit generator, prediction resistance, sha256
5539        with self.create_alg('rng', 'drbg_pr_sha256') as algo:
5540            extra_seed = os.urandom(32)
5541            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
5542            op, _ = algo.accept()
5543            with op:
5544                rn = op.recv(32)
5545                self.assertEqual(len(rn), 32)
5546
5547    def test_sendmsg_afalg_args(self):
5548        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5549        with sock:
5550            with self.assertRaises(TypeError):
5551                sock.sendmsg_afalg()
5552
5553            with self.assertRaises(TypeError):
5554                sock.sendmsg_afalg(op=None)
5555
5556            with self.assertRaises(TypeError):
5557                sock.sendmsg_afalg(1)
5558
5559            with self.assertRaises(TypeError):
5560                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
5561
5562            with self.assertRaises(TypeError):
5563                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
5564
5565
5566def test_main():
5567    tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
5568             TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ]
5569
5570    tests.extend([
5571        NonBlockingTCPTests,
5572        FileObjectClassTestCase,
5573        UnbufferedFileObjectClassTestCase,
5574        LineBufferedFileObjectClassTestCase,
5575        SmallBufferedFileObjectClassTestCase,
5576        UnicodeReadFileObjectClassTestCase,
5577        UnicodeWriteFileObjectClassTestCase,
5578        UnicodeReadWriteFileObjectClassTestCase,
5579        NetworkConnectionNoServer,
5580        NetworkConnectionAttributesTest,
5581        NetworkConnectionBehaviourTest,
5582        ContextManagersTest,
5583        InheritanceTest,
5584        NonblockConstantTest
5585    ])
5586    tests.append(BasicSocketPairTest)
5587    tests.append(TestUnixDomain)
5588    tests.append(TestLinuxAbstractNamespace)
5589    tests.extend([TIPCTest, TIPCThreadableTest])
5590    tests.extend([BasicCANTest, CANTest])
5591    tests.extend([BasicRDSTest, RDSTest])
5592    tests.append(LinuxKernelCryptoAPI)
5593    tests.extend([
5594        CmsgMacroTests,
5595        SendmsgUDPTest,
5596        RecvmsgUDPTest,
5597        RecvmsgIntoUDPTest,
5598        SendmsgUDP6Test,
5599        RecvmsgUDP6Test,
5600        RecvmsgRFC3542AncillaryUDP6Test,
5601        RecvmsgIntoRFC3542AncillaryUDP6Test,
5602        RecvmsgIntoUDP6Test,
5603        SendmsgTCPTest,
5604        RecvmsgTCPTest,
5605        RecvmsgIntoTCPTest,
5606        SendmsgSCTPStreamTest,
5607        RecvmsgSCTPStreamTest,
5608        RecvmsgIntoSCTPStreamTest,
5609        SendmsgUnixStreamTest,
5610        RecvmsgUnixStreamTest,
5611        RecvmsgIntoUnixStreamTest,
5612        RecvmsgSCMRightsStreamTest,
5613        RecvmsgIntoSCMRightsStreamTest,
5614        # These are slow when setitimer() is not available
5615        InterruptedRecvTimeoutTest,
5616        InterruptedSendTimeoutTest,
5617        TestSocketSharing,
5618        SendfileUsingSendTest,
5619        SendfileUsingSendfileTest,
5620    ])
5621
5622    thread_info = support.threading_setup()
5623    support.run_unittest(*tests)
5624    support.threading_cleanup(*thread_info)
5625
5626if __name__ == "__main__":
5627    test_main()
5628