• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2import os
3import select
4import subprocess
5import sys
6import textwrap
7import unittest
8from test import support
9
10@unittest.skipIf((sys.platform[:3]=='win'),
11                 "can't easily test on this system")
12class SelectTestCase(unittest.TestCase):
13
14    class Nope:
15        pass
16
17    class Almost:
18        def fileno(self):
19            return 'fileno'
20
21    def test_error_conditions(self):
22        self.assertRaises(TypeError, select.select, 1, 2, 3)
23        self.assertRaises(TypeError, select.select, [self.Nope()], [], [])
24        self.assertRaises(TypeError, select.select, [self.Almost()], [], [])
25        self.assertRaises(TypeError, select.select, [], [], [], "not a number")
26        self.assertRaises(ValueError, select.select, [], [], [], -1)
27
28    # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
29    @unittest.skipIf(sys.platform.startswith('freebsd'),
30                     'skip because of a FreeBSD bug: kern/155606')
31    def test_errno(self):
32        with open(__file__, 'rb') as fp:
33            fd = fp.fileno()
34            fp.close()
35            try:
36                select.select([fd], [], [], 0)
37            except OSError as err:
38                self.assertEqual(err.errno, errno.EBADF)
39            else:
40                self.fail("exception not raised")
41
42    def test_returned_list_identity(self):
43        # See issue #8329
44        r, w, x = select.select([], [], [], 1)
45        self.assertIsNot(r, w)
46        self.assertIsNot(r, x)
47        self.assertIsNot(w, x)
48
49    @unittest.skipUnless(hasattr(os, 'popen'), "need os.popen()")
50    def test_select(self):
51        code = textwrap.dedent('''
52            import time
53            for i in range(10):
54                print("testing...", flush=True)
55                time.sleep(0.050)
56        ''')
57        cmd = [sys.executable, '-I', '-c', code]
58        with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
59            pipe = proc.stdout
60            for timeout in (0, 1, 2, 4, 8, 16) + (None,)*10:
61                if support.verbose:
62                    print(f'timeout = {timeout}')
63                rfd, wfd, xfd = select.select([pipe], [], [], timeout)
64                self.assertEqual(wfd, [])
65                self.assertEqual(xfd, [])
66                if not rfd:
67                    continue
68                if rfd == [pipe]:
69                    line = pipe.readline()
70                    if support.verbose:
71                        print(repr(line))
72                    if not line:
73                        if support.verbose:
74                            print('EOF')
75                        break
76                    continue
77                self.fail('Unexpected return values from select():',
78                          rfd, wfd, xfd)
79
80    # Issue 16230: Crash on select resized list
81    def test_select_mutated(self):
82        a = []
83        class F:
84            def fileno(self):
85                del a[-1]
86                return sys.__stdout__.fileno()
87        a[:] = [F()] * 10
88        self.assertEqual(select.select([], a, []), ([], a[:5], []))
89
90    def test_disallow_instantiation(self):
91        support.check_disallow_instantiation(self, type(select.poll()))
92
93        if hasattr(select, 'devpoll'):
94            support.check_disallow_instantiation(self, type(select.devpoll()))
95
96def tearDownModule():
97    support.reap_children()
98
99if __name__ == "__main__":
100    unittest.main()
101