1# Test case for the select.devpoll() function 2 3# Initial tests are copied as is from "test_poll.py" 4 5import os 6import random 7import select 8import unittest 9from test.support import cpython_only 10 11if not hasattr(select, 'devpoll') : 12 raise unittest.SkipTest('test works only on Solaris OS family') 13 14 15def find_ready_matching(ready, flag): 16 match = [] 17 for fd, mode in ready: 18 if mode & flag: 19 match.append(fd) 20 return match 21 22class DevPollTests(unittest.TestCase): 23 24 def test_devpoll1(self): 25 # Basic functional test of poll object 26 # Create a bunch of pipe and test that poll works with them. 27 28 p = select.devpoll() 29 30 NUM_PIPES = 12 31 MSG = b" This is a test." 32 MSG_LEN = len(MSG) 33 readers = [] 34 writers = [] 35 r2w = {} 36 w2r = {} 37 38 for i in range(NUM_PIPES): 39 rd, wr = os.pipe() 40 p.register(rd) 41 p.modify(rd, select.POLLIN) 42 p.register(wr, select.POLLOUT) 43 readers.append(rd) 44 writers.append(wr) 45 r2w[rd] = wr 46 w2r[wr] = rd 47 48 bufs = [] 49 50 while writers: 51 ready = p.poll() 52 ready_writers = find_ready_matching(ready, select.POLLOUT) 53 if not ready_writers: 54 self.fail("no pipes ready for writing") 55 wr = random.choice(ready_writers) 56 os.write(wr, MSG) 57 58 ready = p.poll() 59 ready_readers = find_ready_matching(ready, select.POLLIN) 60 if not ready_readers: 61 self.fail("no pipes ready for reading") 62 self.assertEqual([w2r[wr]], ready_readers) 63 rd = ready_readers[0] 64 buf = os.read(rd, MSG_LEN) 65 self.assertEqual(len(buf), MSG_LEN) 66 bufs.append(buf) 67 os.close(r2w[rd]) ; os.close(rd) 68 p.unregister(r2w[rd]) 69 p.unregister(rd) 70 writers.remove(r2w[rd]) 71 72 self.assertEqual(bufs, [MSG] * NUM_PIPES) 73 74 def test_timeout_overflow(self): 75 pollster = select.devpoll() 76 w, r = os.pipe() 77 pollster.register(w) 78 79 pollster.poll(-1) 80 self.assertRaises(OverflowError, pollster.poll, -2) 81 self.assertRaises(OverflowError, pollster.poll, -1 << 31) 82 self.assertRaises(OverflowError, pollster.poll, -1 << 64) 83 84 pollster.poll(0) 85 pollster.poll(1) 86 pollster.poll(1 << 30) 87 self.assertRaises(OverflowError, pollster.poll, 1 << 31) 88 self.assertRaises(OverflowError, pollster.poll, 1 << 63) 89 self.assertRaises(OverflowError, pollster.poll, 1 << 64) 90 91 def test_close(self): 92 open_file = open(__file__, "rb") 93 self.addCleanup(open_file.close) 94 fd = open_file.fileno() 95 devpoll = select.devpoll() 96 97 # test fileno() method and closed attribute 98 self.assertIsInstance(devpoll.fileno(), int) 99 self.assertFalse(devpoll.closed) 100 101 # test close() 102 devpoll.close() 103 self.assertTrue(devpoll.closed) 104 self.assertRaises(ValueError, devpoll.fileno) 105 106 # close() can be called more than once 107 devpoll.close() 108 109 # operations must fail with ValueError("I/O operation on closed ...") 110 self.assertRaises(ValueError, devpoll.modify, fd, select.POLLIN) 111 self.assertRaises(ValueError, devpoll.poll) 112 self.assertRaises(ValueError, devpoll.register, fd, select.POLLIN) 113 self.assertRaises(ValueError, devpoll.unregister, fd) 114 115 def test_fd_non_inheritable(self): 116 devpoll = select.devpoll() 117 self.addCleanup(devpoll.close) 118 self.assertEqual(os.get_inheritable(devpoll.fileno()), False) 119 120 def test_events_mask_overflow(self): 121 pollster = select.devpoll() 122 w, r = os.pipe() 123 pollster.register(w) 124 # Issue #17919 125 self.assertRaises(ValueError, pollster.register, 0, -1) 126 self.assertRaises(OverflowError, pollster.register, 0, 1 << 64) 127 self.assertRaises(ValueError, pollster.modify, 1, -1) 128 self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64) 129 130 @cpython_only 131 def test_events_mask_overflow_c_limits(self): 132 from _testcapi import USHRT_MAX 133 pollster = select.devpoll() 134 w, r = os.pipe() 135 pollster.register(w) 136 # Issue #17919 137 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1) 138 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1) 139 140 141if __name__ == '__main__': 142 unittest.main() 143