1""" 2Tests for kqueue wrapper. 3""" 4import errno 5import os 6import select 7import socket 8import time 9import unittest 10 11if not hasattr(select, "kqueue"): 12 raise unittest.SkipTest("test works only on BSD") 13 14class TestKQueue(unittest.TestCase): 15 def test_create_queue(self): 16 kq = select.kqueue() 17 self.assertTrue(kq.fileno() > 0, kq.fileno()) 18 self.assertTrue(not kq.closed) 19 kq.close() 20 self.assertTrue(kq.closed) 21 self.assertRaises(ValueError, kq.fileno) 22 23 def test_create_event(self): 24 from operator import lt, le, gt, ge 25 26 fd = os.open(os.devnull, os.O_WRONLY) 27 self.addCleanup(os.close, fd) 28 29 ev = select.kevent(fd) 30 other = select.kevent(1000) 31 self.assertEqual(ev.ident, fd) 32 self.assertEqual(ev.filter, select.KQ_FILTER_READ) 33 self.assertEqual(ev.flags, select.KQ_EV_ADD) 34 self.assertEqual(ev.fflags, 0) 35 self.assertEqual(ev.data, 0) 36 self.assertEqual(ev.udata, 0) 37 self.assertEqual(ev, ev) 38 self.assertNotEqual(ev, other) 39 self.assertTrue(ev < other) 40 self.assertTrue(other >= ev) 41 for op in lt, le, gt, ge: 42 self.assertRaises(TypeError, op, ev, None) 43 self.assertRaises(TypeError, op, ev, 1) 44 self.assertRaises(TypeError, op, ev, "ev") 45 46 ev = select.kevent(fd, select.KQ_FILTER_WRITE) 47 self.assertEqual(ev.ident, fd) 48 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 49 self.assertEqual(ev.flags, select.KQ_EV_ADD) 50 self.assertEqual(ev.fflags, 0) 51 self.assertEqual(ev.data, 0) 52 self.assertEqual(ev.udata, 0) 53 self.assertEqual(ev, ev) 54 self.assertNotEqual(ev, other) 55 56 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) 57 self.assertEqual(ev.ident, fd) 58 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 59 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) 60 self.assertEqual(ev.fflags, 0) 61 self.assertEqual(ev.data, 0) 62 self.assertEqual(ev.udata, 0) 63 self.assertEqual(ev, ev) 64 self.assertNotEqual(ev, other) 65 66 ev = select.kevent(1, 2, 3, 4, 5, 6) 67 self.assertEqual(ev.ident, 1) 68 self.assertEqual(ev.filter, 2) 69 self.assertEqual(ev.flags, 3) 70 self.assertEqual(ev.fflags, 4) 71 self.assertEqual(ev.data, 5) 72 self.assertEqual(ev.udata, 6) 73 self.assertEqual(ev, ev) 74 self.assertNotEqual(ev, other) 75 76 bignum = 0x7fff 77 ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum) 78 self.assertEqual(ev.ident, bignum) 79 self.assertEqual(ev.filter, 1) 80 self.assertEqual(ev.flags, 2) 81 self.assertEqual(ev.fflags, 3) 82 self.assertEqual(ev.data, bignum - 1) 83 self.assertEqual(ev.udata, bignum) 84 self.assertEqual(ev, ev) 85 self.assertNotEqual(ev, other) 86 87 # Issue 11973 88 bignum = 0xffff 89 ev = select.kevent(0, 1, bignum) 90 self.assertEqual(ev.ident, 0) 91 self.assertEqual(ev.filter, 1) 92 self.assertEqual(ev.flags, bignum) 93 self.assertEqual(ev.fflags, 0) 94 self.assertEqual(ev.data, 0) 95 self.assertEqual(ev.udata, 0) 96 self.assertEqual(ev, ev) 97 self.assertNotEqual(ev, other) 98 99 # Issue 11973 100 bignum = 0xffffffff 101 ev = select.kevent(0, 1, 2, bignum) 102 self.assertEqual(ev.ident, 0) 103 self.assertEqual(ev.filter, 1) 104 self.assertEqual(ev.flags, 2) 105 self.assertEqual(ev.fflags, bignum) 106 self.assertEqual(ev.data, 0) 107 self.assertEqual(ev.udata, 0) 108 self.assertEqual(ev, ev) 109 self.assertNotEqual(ev, other) 110 111 112 def test_queue_event(self): 113 serverSocket = socket.create_server(('127.0.0.1', 0)) 114 client = socket.socket() 115 client.setblocking(False) 116 try: 117 client.connect(('127.0.0.1', serverSocket.getsockname()[1])) 118 except OSError as e: 119 self.assertEqual(e.args[0], errno.EINPROGRESS) 120 else: 121 #raise AssertionError("Connect should have raised EINPROGRESS") 122 pass # FreeBSD doesn't raise an exception here 123 server, addr = serverSocket.accept() 124 125 kq = select.kqueue() 126 kq2 = select.kqueue.fromfd(kq.fileno()) 127 128 ev = select.kevent(server.fileno(), 129 select.KQ_FILTER_WRITE, 130 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 131 kq.control([ev], 0) 132 ev = select.kevent(server.fileno(), 133 select.KQ_FILTER_READ, 134 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 135 kq.control([ev], 0) 136 ev = select.kevent(client.fileno(), 137 select.KQ_FILTER_WRITE, 138 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 139 kq2.control([ev], 0) 140 ev = select.kevent(client.fileno(), 141 select.KQ_FILTER_READ, 142 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 143 kq2.control([ev], 0) 144 145 events = kq.control(None, 4, 1) 146 events = set((e.ident, e.filter) for e in events) 147 self.assertEqual(events, set([ 148 (client.fileno(), select.KQ_FILTER_WRITE), 149 (server.fileno(), select.KQ_FILTER_WRITE)])) 150 151 client.send(b"Hello!") 152 server.send(b"world!!!") 153 154 # We may need to call it several times 155 for i in range(10): 156 events = kq.control(None, 4, 1) 157 if len(events) == 4: 158 break 159 time.sleep(1.0) 160 else: 161 self.fail('timeout waiting for event notifications') 162 163 events = set((e.ident, e.filter) for e in events) 164 self.assertEqual(events, set([ 165 (client.fileno(), select.KQ_FILTER_WRITE), 166 (client.fileno(), select.KQ_FILTER_READ), 167 (server.fileno(), select.KQ_FILTER_WRITE), 168 (server.fileno(), select.KQ_FILTER_READ)])) 169 170 # Remove completely client, and server read part 171 ev = select.kevent(client.fileno(), 172 select.KQ_FILTER_WRITE, 173 select.KQ_EV_DELETE) 174 kq.control([ev], 0) 175 ev = select.kevent(client.fileno(), 176 select.KQ_FILTER_READ, 177 select.KQ_EV_DELETE) 178 kq.control([ev], 0) 179 ev = select.kevent(server.fileno(), 180 select.KQ_FILTER_READ, 181 select.KQ_EV_DELETE) 182 kq.control([ev], 0, 0) 183 184 events = kq.control([], 4, 0.99) 185 events = set((e.ident, e.filter) for e in events) 186 self.assertEqual(events, set([ 187 (server.fileno(), select.KQ_FILTER_WRITE)])) 188 189 client.close() 190 server.close() 191 serverSocket.close() 192 193 def testPair(self): 194 kq = select.kqueue() 195 a, b = socket.socketpair() 196 197 a.send(b'foo') 198 event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 199 event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 200 r = kq.control([event1, event2], 1, 1) 201 self.assertTrue(r) 202 self.assertFalse(r[0].flags & select.KQ_EV_ERROR) 203 self.assertEqual(b.recv(r[0].data), b'foo') 204 205 a.close() 206 b.close() 207 kq.close() 208 209 def test_issue30058(self): 210 # changelist must be an iterable 211 kq = select.kqueue() 212 a, b = socket.socketpair() 213 ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 214 215 kq.control([ev], 0) 216 # not a list 217 kq.control((ev,), 0) 218 # __len__ is not consistent with __iter__ 219 class BadList: 220 def __len__(self): 221 return 0 222 def __iter__(self): 223 for i in range(100): 224 yield ev 225 kq.control(BadList(), 0) 226 # doesn't have __len__ 227 kq.control(iter([ev]), 0) 228 229 a.close() 230 b.close() 231 kq.close() 232 233 def test_close(self): 234 open_file = open(__file__, "rb") 235 self.addCleanup(open_file.close) 236 fd = open_file.fileno() 237 kqueue = select.kqueue() 238 239 # test fileno() method and closed attribute 240 self.assertIsInstance(kqueue.fileno(), int) 241 self.assertFalse(kqueue.closed) 242 243 # test close() 244 kqueue.close() 245 self.assertTrue(kqueue.closed) 246 self.assertRaises(ValueError, kqueue.fileno) 247 248 # close() can be called more than once 249 kqueue.close() 250 251 # operations must fail with ValueError("I/O operation on closed ...") 252 self.assertRaises(ValueError, kqueue.control, None, 4) 253 254 def test_fd_non_inheritable(self): 255 kqueue = select.kqueue() 256 self.addCleanup(kqueue.close) 257 self.assertEqual(os.get_inheritable(kqueue.fileno()), False) 258 259 260if __name__ == "__main__": 261 unittest.main() 262