• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Test case for the os.poll() function
2
3import os
4import subprocess
5import random
6import select
7import threading
8import time
9import unittest
10from test.support import (
11    cpython_only, requires_subprocess, requires_working_socket, requires_resource
12)
13from test.support import threading_helper
14from test.support.os_helper import TESTFN
15
16
17try:
18    select.poll
19except AttributeError:
20    raise unittest.SkipTest("select.poll not defined")
21
22requires_working_socket(module=True)
23
24def find_ready_matching(ready, flag):
25    match = []
26    for fd, mode in ready:
27        if mode & flag:
28            match.append(fd)
29    return match
30
31class PollTests(unittest.TestCase):
32
33    def test_poll1(self):
34        # Basic functional test of poll object
35        # Create a bunch of pipe and test that poll works with them.
36
37        p = select.poll()
38
39        NUM_PIPES = 12
40        MSG = b" This is a test."
41        MSG_LEN = len(MSG)
42        readers = []
43        writers = []
44        r2w = {}
45        w2r = {}
46
47        for i in range(NUM_PIPES):
48            rd, wr = os.pipe()
49            p.register(rd)
50            p.modify(rd, select.POLLIN)
51            p.register(wr, select.POLLOUT)
52            readers.append(rd)
53            writers.append(wr)
54            r2w[rd] = wr
55            w2r[wr] = rd
56
57        bufs = []
58
59        while writers:
60            ready = p.poll()
61            ready_writers = find_ready_matching(ready, select.POLLOUT)
62            if not ready_writers:
63                raise RuntimeError("no pipes ready for writing")
64            wr = random.choice(ready_writers)
65            os.write(wr, MSG)
66
67            ready = p.poll()
68            ready_readers = find_ready_matching(ready, select.POLLIN)
69            if not ready_readers:
70                raise RuntimeError("no pipes ready for reading")
71            rd = random.choice(ready_readers)
72            buf = os.read(rd, MSG_LEN)
73            self.assertEqual(len(buf), MSG_LEN)
74            bufs.append(buf)
75            os.close(r2w[rd]) ; os.close( rd )
76            p.unregister( r2w[rd] )
77            p.unregister( rd )
78            writers.remove(r2w[rd])
79
80        self.assertEqual(bufs, [MSG] * NUM_PIPES)
81
82    def test_poll_unit_tests(self):
83        # returns NVAL for invalid file descriptor
84        FD, w = os.pipe()
85        os.close(FD)
86        os.close(w)
87        p = select.poll()
88        p.register(FD)
89        r = p.poll()
90        self.assertEqual(r[0], (FD, select.POLLNVAL))
91
92        with open(TESTFN, 'w') as f:
93            fd = f.fileno()
94            p = select.poll()
95            p.register(f)
96            r = p.poll()
97            self.assertEqual(r[0][0], fd)
98        r = p.poll()
99        self.assertEqual(r[0], (fd, select.POLLNVAL))
100        os.unlink(TESTFN)
101
102        # type error for invalid arguments
103        p = select.poll()
104        self.assertRaises(TypeError, p.register, p)
105        self.assertRaises(TypeError, p.unregister, p)
106
107        # can't unregister non-existent object
108        p = select.poll()
109        self.assertRaises(KeyError, p.unregister, 3)
110
111        # Test error cases
112        pollster = select.poll()
113        class Nope:
114            pass
115
116        class Almost:
117            def fileno(self):
118                return 'fileno'
119
120        self.assertRaises(TypeError, pollster.register, Nope(), 0)
121        self.assertRaises(TypeError, pollster.register, Almost(), 0)
122
123    # Another test case for poll().  This is copied from the test case for
124    # select(), modified to use poll() instead.
125
126    @requires_subprocess()
127    @requires_resource('walltime')
128    def test_poll2(self):
129        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
130        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
131                                bufsize=0)
132        self.enterContext(proc)
133        p = proc.stdout
134        pollster = select.poll()
135        pollster.register( p, select.POLLIN )
136        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
137            fdlist = pollster.poll(tout)
138            if (fdlist == []):
139                continue
140            fd, flags = fdlist[0]
141            if flags & select.POLLHUP:
142                line = p.readline()
143                if line != b"":
144                    self.fail('error: pipe seems to be closed, but still returns data')
145                continue
146
147            elif flags & select.POLLIN:
148                line = p.readline()
149                if not line:
150                    break
151                self.assertEqual(line, b'testing...\n')
152                continue
153            else:
154                self.fail('Unexpected return value from select.poll: %s' % fdlist)
155
156    def test_poll3(self):
157        # test int overflow
158        pollster = select.poll()
159        pollster.register(1)
160
161        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
162
163        x = 2 + 3
164        if x != 5:
165            self.fail('Overflow must have occurred')
166
167        # Issues #15989, #17919
168        self.assertRaises(ValueError, pollster.register, 0, -1)
169        self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
170        self.assertRaises(ValueError, pollster.modify, 1, -1)
171        self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
172
173    @cpython_only
174    def test_poll_c_limits(self):
175        try:
176            from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
177        except ImportError:
178            raise unittest.SkipTest("requires _testcapi")
179        pollster = select.poll()
180        pollster.register(1)
181
182        # Issues #15989, #17919
183        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
184        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
185        self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
186        self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
187
188    @threading_helper.reap_threads
189    def test_threaded_poll(self):
190        r, w = os.pipe()
191        self.addCleanup(os.close, r)
192        self.addCleanup(os.close, w)
193        rfds = []
194        for i in range(10):
195            fd = os.dup(r)
196            self.addCleanup(os.close, fd)
197            rfds.append(fd)
198        pollster = select.poll()
199        for fd in rfds:
200            pollster.register(fd, select.POLLIN)
201
202        t = threading.Thread(target=pollster.poll)
203        t.start()
204        try:
205            time.sleep(0.5)
206            # trigger ufds array reallocation
207            for fd in rfds:
208                pollster.unregister(fd)
209            pollster.register(w, select.POLLOUT)
210            self.assertRaises(RuntimeError, pollster.poll)
211        finally:
212            # and make the call to poll() from the thread return
213            os.write(w, b'spam')
214            t.join()
215
216    @unittest.skipUnless(threading, 'Threading required for this test.')
217    @threading_helper.reap_threads
218    def test_poll_blocks_with_negative_ms(self):
219        for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
220            # Create two file descriptors. This will be used to unlock
221            # the blocking call to poll.poll inside the thread
222            r, w = os.pipe()
223            pollster = select.poll()
224            pollster.register(r, select.POLLIN)
225
226            poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
227            poll_thread.start()
228            poll_thread.join(timeout=0.1)
229            self.assertTrue(poll_thread.is_alive())
230
231            # Write to the pipe so pollster.poll unblocks and the thread ends.
232            os.write(w, b'spam')
233            poll_thread.join()
234            self.assertFalse(poll_thread.is_alive())
235            os.close(r)
236            os.close(w)
237
238
239if __name__ == '__main__':
240    unittest.main()
241