• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Test case for the select.devpoll() function
2
3# Initial tests are copied as is from "test_poll.py"
4
5import os
6import random
7import select
8import unittest
9from test.support import cpython_only
10
11if not hasattr(select, 'devpoll') :
12    raise unittest.SkipTest('test works only on Solaris OS family')
13
14
15def find_ready_matching(ready, flag):
16    match = []
17    for fd, mode in ready:
18        if mode & flag:
19            match.append(fd)
20    return match
21
22class DevPollTests(unittest.TestCase):
23
24    def test_devpoll1(self):
25        # Basic functional test of poll object
26        # Create a bunch of pipe and test that poll works with them.
27
28        p = select.devpoll()
29
30        NUM_PIPES = 12
31        MSG = b" This is a test."
32        MSG_LEN = len(MSG)
33        readers = []
34        writers = []
35        r2w = {}
36        w2r = {}
37
38        for i in range(NUM_PIPES):
39            rd, wr = os.pipe()
40            p.register(rd)
41            p.modify(rd, select.POLLIN)
42            p.register(wr, select.POLLOUT)
43            readers.append(rd)
44            writers.append(wr)
45            r2w[rd] = wr
46            w2r[wr] = rd
47
48        bufs = []
49
50        while writers:
51            ready = p.poll()
52            ready_writers = find_ready_matching(ready, select.POLLOUT)
53            if not ready_writers:
54                self.fail("no pipes ready for writing")
55            wr = random.choice(ready_writers)
56            os.write(wr, MSG)
57
58            ready = p.poll()
59            ready_readers = find_ready_matching(ready, select.POLLIN)
60            if not ready_readers:
61                self.fail("no pipes ready for reading")
62            self.assertEqual([w2r[wr]], ready_readers)
63            rd = ready_readers[0]
64            buf = os.read(rd, MSG_LEN)
65            self.assertEqual(len(buf), MSG_LEN)
66            bufs.append(buf)
67            os.close(r2w[rd]) ; os.close(rd)
68            p.unregister(r2w[rd])
69            p.unregister(rd)
70            writers.remove(r2w[rd])
71
72        self.assertEqual(bufs, [MSG] * NUM_PIPES)
73
74    def test_timeout_overflow(self):
75        pollster = select.devpoll()
76        w, r = os.pipe()
77        pollster.register(w)
78
79        pollster.poll(-1)
80        self.assertRaises(OverflowError, pollster.poll, -2)
81        self.assertRaises(OverflowError, pollster.poll, -1 << 31)
82        self.assertRaises(OverflowError, pollster.poll, -1 << 64)
83
84        pollster.poll(0)
85        pollster.poll(1)
86        pollster.poll(1 << 30)
87        self.assertRaises(OverflowError, pollster.poll, 1 << 31)
88        self.assertRaises(OverflowError, pollster.poll, 1 << 63)
89        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
90
91    def test_close(self):
92        open_file = open(__file__, "rb")
93        self.addCleanup(open_file.close)
94        fd = open_file.fileno()
95        devpoll = select.devpoll()
96
97        # test fileno() method and closed attribute
98        self.assertIsInstance(devpoll.fileno(), int)
99        self.assertFalse(devpoll.closed)
100
101        # test close()
102        devpoll.close()
103        self.assertTrue(devpoll.closed)
104        self.assertRaises(ValueError, devpoll.fileno)
105
106        # close() can be called more than once
107        devpoll.close()
108
109        # operations must fail with ValueError("I/O operation on closed ...")
110        self.assertRaises(ValueError, devpoll.modify, fd, select.POLLIN)
111        self.assertRaises(ValueError, devpoll.poll)
112        self.assertRaises(ValueError, devpoll.register, fd, select.POLLIN)
113        self.assertRaises(ValueError, devpoll.unregister, fd)
114
115    def test_fd_non_inheritable(self):
116        devpoll = select.devpoll()
117        self.addCleanup(devpoll.close)
118        self.assertEqual(os.get_inheritable(devpoll.fileno()), False)
119
120    def test_events_mask_overflow(self):
121        pollster = select.devpoll()
122        w, r = os.pipe()
123        pollster.register(w)
124        # Issue #17919
125        self.assertRaises(ValueError, pollster.register, 0, -1)
126        self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
127        self.assertRaises(ValueError, pollster.modify, 1, -1)
128        self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
129
130    @cpython_only
131    def test_events_mask_overflow_c_limits(self):
132        from _testcapi import USHRT_MAX
133        pollster = select.devpoll()
134        w, r = os.pipe()
135        pollster.register(w)
136        # Issue #17919
137        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
138        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
139
140
141if __name__ == '__main__':
142    unittest.main()
143