• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Test case for the os.poll() function
2
3import os
4import subprocess
5import random
6import select
7import threading
8import time
9import unittest
10from test.support import TESTFN, run_unittest, reap_threads, cpython_only
11
12try:
13    select.poll
14except AttributeError:
15    raise unittest.SkipTest("select.poll not defined")
16
17
18def find_ready_matching(ready, flag):
19    match = []
20    for fd, mode in ready:
21        if mode & flag:
22            match.append(fd)
23    return match
24
25class PollTests(unittest.TestCase):
26
27    def test_poll1(self):
28        # Basic functional test of poll object
29        # Create a bunch of pipe and test that poll works with them.
30
31        p = select.poll()
32
33        NUM_PIPES = 12
34        MSG = b" This is a test."
35        MSG_LEN = len(MSG)
36        readers = []
37        writers = []
38        r2w = {}
39        w2r = {}
40
41        for i in range(NUM_PIPES):
42            rd, wr = os.pipe()
43            p.register(rd)
44            p.modify(rd, select.POLLIN)
45            p.register(wr, select.POLLOUT)
46            readers.append(rd)
47            writers.append(wr)
48            r2w[rd] = wr
49            w2r[wr] = rd
50
51        bufs = []
52
53        while writers:
54            ready = p.poll()
55            ready_writers = find_ready_matching(ready, select.POLLOUT)
56            if not ready_writers:
57                raise RuntimeError("no pipes ready for writing")
58            wr = random.choice(ready_writers)
59            os.write(wr, MSG)
60
61            ready = p.poll()
62            ready_readers = find_ready_matching(ready, select.POLLIN)
63            if not ready_readers:
64                raise RuntimeError("no pipes ready for reading")
65            rd = random.choice(ready_readers)
66            buf = os.read(rd, MSG_LEN)
67            self.assertEqual(len(buf), MSG_LEN)
68            bufs.append(buf)
69            os.close(r2w[rd]) ; os.close( rd )
70            p.unregister( r2w[rd] )
71            p.unregister( rd )
72            writers.remove(r2w[rd])
73
74        self.assertEqual(bufs, [MSG] * NUM_PIPES)
75
76    def test_poll_unit_tests(self):
77        # returns NVAL for invalid file descriptor
78        FD, w = os.pipe()
79        os.close(FD)
80        os.close(w)
81        p = select.poll()
82        p.register(FD)
83        r = p.poll()
84        self.assertEqual(r[0], (FD, select.POLLNVAL))
85
86        f = open(TESTFN, 'w')
87        fd = f.fileno()
88        p = select.poll()
89        p.register(f)
90        r = p.poll()
91        self.assertEqual(r[0][0], fd)
92        f.close()
93        r = p.poll()
94        self.assertEqual(r[0], (fd, select.POLLNVAL))
95        os.unlink(TESTFN)
96
97        # type error for invalid arguments
98        p = select.poll()
99        self.assertRaises(TypeError, p.register, p)
100        self.assertRaises(TypeError, p.unregister, p)
101
102        # can't unregister non-existent object
103        p = select.poll()
104        self.assertRaises(KeyError, p.unregister, 3)
105
106        # Test error cases
107        pollster = select.poll()
108        class Nope:
109            pass
110
111        class Almost:
112            def fileno(self):
113                return 'fileno'
114
115        self.assertRaises(TypeError, pollster.register, Nope(), 0)
116        self.assertRaises(TypeError, pollster.register, Almost(), 0)
117
118    # Another test case for poll().  This is copied from the test case for
119    # select(), modified to use poll() instead.
120
121    def test_poll2(self):
122        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
123        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
124                                bufsize=0)
125        proc.__enter__()
126        self.addCleanup(proc.__exit__, None, None, None)
127        p = proc.stdout
128        pollster = select.poll()
129        pollster.register( p, select.POLLIN )
130        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
131            fdlist = pollster.poll(tout)
132            if (fdlist == []):
133                continue
134            fd, flags = fdlist[0]
135            if flags & select.POLLHUP:
136                line = p.readline()
137                if line != b"":
138                    self.fail('error: pipe seems to be closed, but still returns data')
139                continue
140
141            elif flags & select.POLLIN:
142                line = p.readline()
143                if not line:
144                    break
145                self.assertEqual(line, b'testing...\n')
146                continue
147            else:
148                self.fail('Unexpected return value from select.poll: %s' % fdlist)
149
150    def test_poll3(self):
151        # test int overflow
152        pollster = select.poll()
153        pollster.register(1)
154
155        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
156
157        x = 2 + 3
158        if x != 5:
159            self.fail('Overflow must have occurred')
160
161        # Issues #15989, #17919
162        self.assertRaises(OverflowError, pollster.register, 0, -1)
163        self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
164        self.assertRaises(OverflowError, pollster.modify, 1, -1)
165        self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
166
167    @cpython_only
168    def test_poll_c_limits(self):
169        from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
170        pollster = select.poll()
171        pollster.register(1)
172
173        # Issues #15989, #17919
174        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
175        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
176        self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
177        self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
178
179    @reap_threads
180    def test_threaded_poll(self):
181        r, w = os.pipe()
182        self.addCleanup(os.close, r)
183        self.addCleanup(os.close, w)
184        rfds = []
185        for i in range(10):
186            fd = os.dup(r)
187            self.addCleanup(os.close, fd)
188            rfds.append(fd)
189        pollster = select.poll()
190        for fd in rfds:
191            pollster.register(fd, select.POLLIN)
192
193        t = threading.Thread(target=pollster.poll)
194        t.start()
195        try:
196            time.sleep(0.5)
197            # trigger ufds array reallocation
198            for fd in rfds:
199                pollster.unregister(fd)
200            pollster.register(w, select.POLLOUT)
201            self.assertRaises(RuntimeError, pollster.poll)
202        finally:
203            # and make the call to poll() from the thread return
204            os.write(w, b'spam')
205            t.join()
206
207    @unittest.skipUnless(threading, 'Threading required for this test.')
208    @reap_threads
209    def test_poll_blocks_with_negative_ms(self):
210        for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
211            # Create two file descriptors. This will be used to unlock
212            # the blocking call to poll.poll inside the thread
213            r, w = os.pipe()
214            pollster = select.poll()
215            pollster.register(r, select.POLLIN)
216
217            poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
218            poll_thread.start()
219            poll_thread.join(timeout=0.1)
220            self.assertTrue(poll_thread.is_alive())
221
222            # Write to the pipe so pollster.poll unblocks and the thread ends.
223            os.write(w, b'spam')
224            poll_thread.join()
225            self.assertFalse(poll_thread.is_alive())
226            os.close(r)
227            os.close(w)
228
229
230def test_main():
231    run_unittest(PollTests)
232
233if __name__ == '__main__':
234    test_main()
235