1# Test case for the os.poll() function 2 3import os 4import subprocess 5import random 6import select 7import threading 8import time 9import unittest 10from test.support import TESTFN, run_unittest, reap_threads, cpython_only 11 12try: 13 select.poll 14except AttributeError: 15 raise unittest.SkipTest("select.poll not defined") 16 17 18def find_ready_matching(ready, flag): 19 match = [] 20 for fd, mode in ready: 21 if mode & flag: 22 match.append(fd) 23 return match 24 25class PollTests(unittest.TestCase): 26 27 def test_poll1(self): 28 # Basic functional test of poll object 29 # Create a bunch of pipe and test that poll works with them. 30 31 p = select.poll() 32 33 NUM_PIPES = 12 34 MSG = b" This is a test." 35 MSG_LEN = len(MSG) 36 readers = [] 37 writers = [] 38 r2w = {} 39 w2r = {} 40 41 for i in range(NUM_PIPES): 42 rd, wr = os.pipe() 43 p.register(rd) 44 p.modify(rd, select.POLLIN) 45 p.register(wr, select.POLLOUT) 46 readers.append(rd) 47 writers.append(wr) 48 r2w[rd] = wr 49 w2r[wr] = rd 50 51 bufs = [] 52 53 while writers: 54 ready = p.poll() 55 ready_writers = find_ready_matching(ready, select.POLLOUT) 56 if not ready_writers: 57 raise RuntimeError("no pipes ready for writing") 58 wr = random.choice(ready_writers) 59 os.write(wr, MSG) 60 61 ready = p.poll() 62 ready_readers = find_ready_matching(ready, select.POLLIN) 63 if not ready_readers: 64 raise RuntimeError("no pipes ready for reading") 65 rd = random.choice(ready_readers) 66 buf = os.read(rd, MSG_LEN) 67 self.assertEqual(len(buf), MSG_LEN) 68 bufs.append(buf) 69 os.close(r2w[rd]) ; os.close( rd ) 70 p.unregister( r2w[rd] ) 71 p.unregister( rd ) 72 writers.remove(r2w[rd]) 73 74 self.assertEqual(bufs, [MSG] * NUM_PIPES) 75 76 def test_poll_unit_tests(self): 77 # returns NVAL for invalid file descriptor 78 FD, w = os.pipe() 79 os.close(FD) 80 os.close(w) 81 p = select.poll() 82 p.register(FD) 83 r = p.poll() 84 self.assertEqual(r[0], (FD, select.POLLNVAL)) 85 86 f = open(TESTFN, 'w') 87 fd = f.fileno() 88 p = select.poll() 89 p.register(f) 90 r = p.poll() 91 self.assertEqual(r[0][0], fd) 92 f.close() 93 r = p.poll() 94 self.assertEqual(r[0], (fd, select.POLLNVAL)) 95 os.unlink(TESTFN) 96 97 # type error for invalid arguments 98 p = select.poll() 99 self.assertRaises(TypeError, p.register, p) 100 self.assertRaises(TypeError, p.unregister, p) 101 102 # can't unregister non-existent object 103 p = select.poll() 104 self.assertRaises(KeyError, p.unregister, 3) 105 106 # Test error cases 107 pollster = select.poll() 108 class Nope: 109 pass 110 111 class Almost: 112 def fileno(self): 113 return 'fileno' 114 115 self.assertRaises(TypeError, pollster.register, Nope(), 0) 116 self.assertRaises(TypeError, pollster.register, Almost(), 0) 117 118 # Another test case for poll(). This is copied from the test case for 119 # select(), modified to use poll() instead. 120 121 def test_poll2(self): 122 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' 123 proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 124 bufsize=0) 125 proc.__enter__() 126 self.addCleanup(proc.__exit__, None, None, None) 127 p = proc.stdout 128 pollster = select.poll() 129 pollster.register( p, select.POLLIN ) 130 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: 131 fdlist = pollster.poll(tout) 132 if (fdlist == []): 133 continue 134 fd, flags = fdlist[0] 135 if flags & select.POLLHUP: 136 line = p.readline() 137 if line != b"": 138 self.fail('error: pipe seems to be closed, but still returns data') 139 continue 140 141 elif flags & select.POLLIN: 142 line = p.readline() 143 if not line: 144 break 145 self.assertEqual(line, b'testing...\n') 146 continue 147 else: 148 self.fail('Unexpected return value from select.poll: %s' % fdlist) 149 150 def test_poll3(self): 151 # test int overflow 152 pollster = select.poll() 153 pollster.register(1) 154 155 self.assertRaises(OverflowError, pollster.poll, 1 << 64) 156 157 x = 2 + 3 158 if x != 5: 159 self.fail('Overflow must have occurred') 160 161 # Issues #15989, #17919 162 self.assertRaises(OverflowError, pollster.register, 0, -1) 163 self.assertRaises(OverflowError, pollster.register, 0, 1 << 64) 164 self.assertRaises(OverflowError, pollster.modify, 1, -1) 165 self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64) 166 167 @cpython_only 168 def test_poll_c_limits(self): 169 from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX 170 pollster = select.poll() 171 pollster.register(1) 172 173 # Issues #15989, #17919 174 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1) 175 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1) 176 self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1) 177 self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1) 178 179 @reap_threads 180 def test_threaded_poll(self): 181 r, w = os.pipe() 182 self.addCleanup(os.close, r) 183 self.addCleanup(os.close, w) 184 rfds = [] 185 for i in range(10): 186 fd = os.dup(r) 187 self.addCleanup(os.close, fd) 188 rfds.append(fd) 189 pollster = select.poll() 190 for fd in rfds: 191 pollster.register(fd, select.POLLIN) 192 193 t = threading.Thread(target=pollster.poll) 194 t.start() 195 try: 196 time.sleep(0.5) 197 # trigger ufds array reallocation 198 for fd in rfds: 199 pollster.unregister(fd) 200 pollster.register(w, select.POLLOUT) 201 self.assertRaises(RuntimeError, pollster.poll) 202 finally: 203 # and make the call to poll() from the thread return 204 os.write(w, b'spam') 205 t.join() 206 207 @unittest.skipUnless(threading, 'Threading required for this test.') 208 @reap_threads 209 def test_poll_blocks_with_negative_ms(self): 210 for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]: 211 # Create two file descriptors. This will be used to unlock 212 # the blocking call to poll.poll inside the thread 213 r, w = os.pipe() 214 pollster = select.poll() 215 pollster.register(r, select.POLLIN) 216 217 poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,)) 218 poll_thread.start() 219 poll_thread.join(timeout=0.1) 220 self.assertTrue(poll_thread.is_alive()) 221 222 # Write to the pipe so pollster.poll unblocks and the thread ends. 223 os.write(w, b'spam') 224 poll_thread.join() 225 self.assertFalse(poll_thread.is_alive()) 226 os.close(r) 227 os.close(w) 228 229 230def test_main(): 231 run_unittest(PollTests) 232 233if __name__ == '__main__': 234 test_main() 235