• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 # Test case for the select.devpoll() function
2 
3 # Initial tests are copied as is from "test_poll.py"
4 
5 import os
6 import random
7 import select
8 import unittest
9 from test.support import run_unittest, cpython_only
10 
11 if not hasattr(select, 'devpoll') :
12     raise unittest.SkipTest('test works only on Solaris OS family')
13 
14 
15 def find_ready_matching(ready, flag):
16     match = []
17     for fd, mode in ready:
18         if mode & flag:
19             match.append(fd)
20     return match
21 
22 class DevPollTests(unittest.TestCase):
23 
24     def test_devpoll1(self):
25         # Basic functional test of poll object
26         # Create a bunch of pipe and test that poll works with them.
27 
28         p = select.devpoll()
29 
30         NUM_PIPES = 12
31         MSG = b" This is a test."
32         MSG_LEN = len(MSG)
33         readers = []
34         writers = []
35         r2w = {}
36         w2r = {}
37 
38         for i in range(NUM_PIPES):
39             rd, wr = os.pipe()
40             p.register(rd)
41             p.modify(rd, select.POLLIN)
42             p.register(wr, select.POLLOUT)
43             readers.append(rd)
44             writers.append(wr)
45             r2w[rd] = wr
46             w2r[wr] = rd
47 
48         bufs = []
49 
50         while writers:
51             ready = p.poll()
52             ready_writers = find_ready_matching(ready, select.POLLOUT)
53             if not ready_writers:
54                 self.fail("no pipes ready for writing")
55             wr = random.choice(ready_writers)
56             os.write(wr, MSG)
57 
58             ready = p.poll()
59             ready_readers = find_ready_matching(ready, select.POLLIN)
60             if not ready_readers:
61                 self.fail("no pipes ready for reading")
62             self.assertEqual([w2r[wr]], ready_readers)
63             rd = ready_readers[0]
64             buf = os.read(rd, MSG_LEN)
65             self.assertEqual(len(buf), MSG_LEN)
66             bufs.append(buf)
67             os.close(r2w[rd]) ; os.close(rd)
68             p.unregister(r2w[rd])
69             p.unregister(rd)
70             writers.remove(r2w[rd])
71 
72         self.assertEqual(bufs, [MSG] * NUM_PIPES)
73 
74     def test_timeout_overflow(self):
75         pollster = select.devpoll()
76         w, r = os.pipe()
77         pollster.register(w)
78 
79         pollster.poll(-1)
80         self.assertRaises(OverflowError, pollster.poll, -2)
81         self.assertRaises(OverflowError, pollster.poll, -1 << 31)
82         self.assertRaises(OverflowError, pollster.poll, -1 << 64)
83 
84         pollster.poll(0)
85         pollster.poll(1)
86         pollster.poll(1 << 30)
87         self.assertRaises(OverflowError, pollster.poll, 1 << 31)
88         self.assertRaises(OverflowError, pollster.poll, 1 << 63)
89         self.assertRaises(OverflowError, pollster.poll, 1 << 64)
90 
91     def test_close(self):
92         open_file = open(__file__, "rb")
93         self.addCleanup(open_file.close)
94         fd = open_file.fileno()
95         devpoll = select.devpoll()
96 
97         # test fileno() method and closed attribute
98         self.assertIsInstance(devpoll.fileno(), int)
99         self.assertFalse(devpoll.closed)
100 
101         # test close()
102         devpoll.close()
103         self.assertTrue(devpoll.closed)
104         self.assertRaises(ValueError, devpoll.fileno)
105 
106         # close() can be called more than once
107         devpoll.close()
108 
109         # operations must fail with ValueError("I/O operation on closed ...")
110         self.assertRaises(ValueError, devpoll.modify, fd, select.POLLIN)
111         self.assertRaises(ValueError, devpoll.poll)
112         self.assertRaises(ValueError, devpoll.register, fd, select.POLLIN)
113         self.assertRaises(ValueError, devpoll.unregister, fd)
114 
115     def test_fd_non_inheritable(self):
116         devpoll = select.devpoll()
117         self.addCleanup(devpoll.close)
118         self.assertEqual(os.get_inheritable(devpoll.fileno()), False)
119 
120     def test_events_mask_overflow(self):
121         pollster = select.devpoll()
122         w, r = os.pipe()
123         pollster.register(w)
124         # Issue #17919
125         self.assertRaises(ValueError, pollster.register, 0, -1)
126         self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
127         self.assertRaises(ValueError, pollster.modify, 1, -1)
128         self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
129 
130     @cpython_only
131     def test_events_mask_overflow_c_limits(self):
132         from _testcapi import USHRT_MAX
133         pollster = select.devpoll()
134         w, r = os.pipe()
135         pollster.register(w)
136         # Issue #17919
137         self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
138         self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
139 
140 
141 def test_main():
142     run_unittest(DevPollTests)
143 
144 if __name__ == '__main__':
145     test_main()
146