1""" 2Tests for kqueue wrapper. 3""" 4import errno 5import os 6import select 7import socket 8from test import support 9import time 10import unittest 11 12if not hasattr(select, "kqueue"): 13 raise unittest.SkipTest("test works only on BSD") 14 15class TestKQueue(unittest.TestCase): 16 def test_create_queue(self): 17 kq = select.kqueue() 18 self.assertTrue(kq.fileno() > 0, kq.fileno()) 19 self.assertTrue(not kq.closed) 20 kq.close() 21 self.assertTrue(kq.closed) 22 self.assertRaises(ValueError, kq.fileno) 23 24 def test_create_event(self): 25 from operator import lt, le, gt, ge 26 27 fd = os.open(os.devnull, os.O_WRONLY) 28 self.addCleanup(os.close, fd) 29 30 ev = select.kevent(fd) 31 other = select.kevent(1000) 32 self.assertEqual(ev.ident, fd) 33 self.assertEqual(ev.filter, select.KQ_FILTER_READ) 34 self.assertEqual(ev.flags, select.KQ_EV_ADD) 35 self.assertEqual(ev.fflags, 0) 36 self.assertEqual(ev.data, 0) 37 self.assertEqual(ev.udata, 0) 38 self.assertEqual(ev, ev) 39 self.assertNotEqual(ev, other) 40 self.assertTrue(ev < other) 41 self.assertTrue(other >= ev) 42 for op in lt, le, gt, ge: 43 self.assertRaises(TypeError, op, ev, None) 44 self.assertRaises(TypeError, op, ev, 1) 45 self.assertRaises(TypeError, op, ev, "ev") 46 47 ev = select.kevent(fd, select.KQ_FILTER_WRITE) 48 self.assertEqual(ev.ident, fd) 49 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 50 self.assertEqual(ev.flags, select.KQ_EV_ADD) 51 self.assertEqual(ev.fflags, 0) 52 self.assertEqual(ev.data, 0) 53 self.assertEqual(ev.udata, 0) 54 self.assertEqual(ev, ev) 55 self.assertNotEqual(ev, other) 56 57 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) 58 self.assertEqual(ev.ident, fd) 59 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 60 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) 61 self.assertEqual(ev.fflags, 0) 62 self.assertEqual(ev.data, 0) 63 self.assertEqual(ev.udata, 0) 64 self.assertEqual(ev, ev) 65 self.assertNotEqual(ev, other) 66 67 ev = select.kevent(1, 2, 3, 4, 5, 6) 68 self.assertEqual(ev.ident, 1) 69 self.assertEqual(ev.filter, 2) 70 self.assertEqual(ev.flags, 3) 71 self.assertEqual(ev.fflags, 4) 72 self.assertEqual(ev.data, 5) 73 self.assertEqual(ev.udata, 6) 74 self.assertEqual(ev, ev) 75 self.assertNotEqual(ev, other) 76 77 bignum = 0x7fff 78 ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum) 79 self.assertEqual(ev.ident, bignum) 80 self.assertEqual(ev.filter, 1) 81 self.assertEqual(ev.flags, 2) 82 self.assertEqual(ev.fflags, 3) 83 self.assertEqual(ev.data, bignum - 1) 84 self.assertEqual(ev.udata, bignum) 85 self.assertEqual(ev, ev) 86 self.assertNotEqual(ev, other) 87 88 # Issue 11973 89 bignum = 0xffff 90 ev = select.kevent(0, 1, bignum) 91 self.assertEqual(ev.ident, 0) 92 self.assertEqual(ev.filter, 1) 93 self.assertEqual(ev.flags, bignum) 94 self.assertEqual(ev.fflags, 0) 95 self.assertEqual(ev.data, 0) 96 self.assertEqual(ev.udata, 0) 97 self.assertEqual(ev, ev) 98 self.assertNotEqual(ev, other) 99 100 # Issue 11973 101 bignum = 0xffffffff 102 ev = select.kevent(0, 1, 2, bignum) 103 self.assertEqual(ev.ident, 0) 104 self.assertEqual(ev.filter, 1) 105 self.assertEqual(ev.flags, 2) 106 self.assertEqual(ev.fflags, bignum) 107 self.assertEqual(ev.data, 0) 108 self.assertEqual(ev.udata, 0) 109 self.assertEqual(ev, ev) 110 self.assertNotEqual(ev, other) 111 112 113 def test_queue_event(self): 114 serverSocket = socket.create_server(('127.0.0.1', 0)) 115 client = socket.socket() 116 client.setblocking(False) 117 try: 118 client.connect(('127.0.0.1', serverSocket.getsockname()[1])) 119 except OSError as e: 120 self.assertEqual(e.args[0], errno.EINPROGRESS) 121 else: 122 #raise AssertionError("Connect should have raised EINPROGRESS") 123 pass # FreeBSD doesn't raise an exception here 124 server, addr = serverSocket.accept() 125 126 kq = select.kqueue() 127 kq2 = select.kqueue.fromfd(kq.fileno()) 128 129 ev = select.kevent(server.fileno(), 130 select.KQ_FILTER_WRITE, 131 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 132 kq.control([ev], 0) 133 ev = select.kevent(server.fileno(), 134 select.KQ_FILTER_READ, 135 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 136 kq.control([ev], 0) 137 ev = select.kevent(client.fileno(), 138 select.KQ_FILTER_WRITE, 139 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 140 kq2.control([ev], 0) 141 ev = select.kevent(client.fileno(), 142 select.KQ_FILTER_READ, 143 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 144 kq2.control([ev], 0) 145 146 events = kq.control(None, 4, 1) 147 events = set((e.ident, e.filter) for e in events) 148 self.assertEqual(events, set([ 149 (client.fileno(), select.KQ_FILTER_WRITE), 150 (server.fileno(), select.KQ_FILTER_WRITE)])) 151 152 client.send(b"Hello!") 153 server.send(b"world!!!") 154 155 # We may need to call it several times 156 for i in range(10): 157 events = kq.control(None, 4, 1) 158 if len(events) == 4: 159 break 160 time.sleep(1.0) 161 else: 162 self.fail('timeout waiting for event notifications') 163 164 events = set((e.ident, e.filter) for e in events) 165 self.assertEqual(events, set([ 166 (client.fileno(), select.KQ_FILTER_WRITE), 167 (client.fileno(), select.KQ_FILTER_READ), 168 (server.fileno(), select.KQ_FILTER_WRITE), 169 (server.fileno(), select.KQ_FILTER_READ)])) 170 171 # Remove completely client, and server read part 172 ev = select.kevent(client.fileno(), 173 select.KQ_FILTER_WRITE, 174 select.KQ_EV_DELETE) 175 kq.control([ev], 0) 176 ev = select.kevent(client.fileno(), 177 select.KQ_FILTER_READ, 178 select.KQ_EV_DELETE) 179 kq.control([ev], 0) 180 ev = select.kevent(server.fileno(), 181 select.KQ_FILTER_READ, 182 select.KQ_EV_DELETE) 183 kq.control([ev], 0, 0) 184 185 events = kq.control([], 4, 0.99) 186 events = set((e.ident, e.filter) for e in events) 187 self.assertEqual(events, set([ 188 (server.fileno(), select.KQ_FILTER_WRITE)])) 189 190 client.close() 191 server.close() 192 serverSocket.close() 193 194 def testPair(self): 195 kq = select.kqueue() 196 a, b = socket.socketpair() 197 198 a.send(b'foo') 199 event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 200 event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 201 r = kq.control([event1, event2], 1, 1) 202 self.assertTrue(r) 203 self.assertFalse(r[0].flags & select.KQ_EV_ERROR) 204 self.assertEqual(b.recv(r[0].data), b'foo') 205 206 a.close() 207 b.close() 208 kq.close() 209 210 def test_issue30058(self): 211 # changelist must be an iterable 212 kq = select.kqueue() 213 a, b = socket.socketpair() 214 ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 215 216 kq.control([ev], 0) 217 # not a list 218 kq.control((ev,), 0) 219 # __len__ is not consistent with __iter__ 220 class BadList: 221 def __len__(self): 222 return 0 223 def __iter__(self): 224 for i in range(100): 225 yield ev 226 kq.control(BadList(), 0) 227 # doesn't have __len__ 228 kq.control(iter([ev]), 0) 229 230 a.close() 231 b.close() 232 kq.close() 233 234 def test_close(self): 235 open_file = open(__file__, "rb") 236 self.addCleanup(open_file.close) 237 fd = open_file.fileno() 238 kqueue = select.kqueue() 239 240 # test fileno() method and closed attribute 241 self.assertIsInstance(kqueue.fileno(), int) 242 self.assertFalse(kqueue.closed) 243 244 # test close() 245 kqueue.close() 246 self.assertTrue(kqueue.closed) 247 self.assertRaises(ValueError, kqueue.fileno) 248 249 # close() can be called more than once 250 kqueue.close() 251 252 # operations must fail with ValueError("I/O operation on closed ...") 253 self.assertRaises(ValueError, kqueue.control, None, 4) 254 255 def test_fd_non_inheritable(self): 256 kqueue = select.kqueue() 257 self.addCleanup(kqueue.close) 258 self.assertEqual(os.get_inheritable(kqueue.fileno()), False) 259 260 @support.requires_fork() 261 def test_fork(self): 262 # gh-110395: kqueue objects must be closed after fork 263 kqueue = select.kqueue() 264 if (pid := os.fork()) == 0: 265 try: 266 self.assertTrue(kqueue.closed) 267 with self.assertRaisesRegex(ValueError, "closed kqueue"): 268 kqueue.fileno() 269 except: 270 os._exit(1) 271 finally: 272 os._exit(0) 273 else: 274 support.wait_process(pid, exitcode=0) 275 self.assertFalse(kqueue.closed) # child done, we're still open. 276 277 278if __name__ == "__main__": 279 unittest.main() 280