1""" 2Tests for kqueue wrapper. 3""" 4import socket 5import errno 6import time 7import select 8import sys 9import unittest 10 11from test import test_support 12if not hasattr(select, "kqueue"): 13 raise unittest.SkipTest("test works only on BSD") 14 15class TestKQueue(unittest.TestCase): 16 def test_create_queue(self): 17 kq = select.kqueue() 18 self.assertTrue(kq.fileno() > 0, kq.fileno()) 19 self.assertTrue(not kq.closed) 20 kq.close() 21 self.assertTrue(kq.closed) 22 self.assertRaises(ValueError, kq.fileno) 23 24 def test_create_event(self): 25 fd = sys.stderr.fileno() 26 ev = select.kevent(fd) 27 other = select.kevent(1000) 28 self.assertEqual(ev.ident, fd) 29 self.assertEqual(ev.filter, select.KQ_FILTER_READ) 30 self.assertEqual(ev.flags, select.KQ_EV_ADD) 31 self.assertEqual(ev.fflags, 0) 32 self.assertEqual(ev.data, 0) 33 self.assertEqual(ev.udata, 0) 34 self.assertEqual(ev, ev) 35 self.assertNotEqual(ev, other) 36 self.assertEqual(cmp(ev, other), -1) 37 self.assertTrue(ev < other) 38 self.assertTrue(other >= ev) 39 self.assertRaises(TypeError, cmp, ev, None) 40 self.assertRaises(TypeError, cmp, ev, 1) 41 self.assertRaises(TypeError, cmp, ev, "ev") 42 43 ev = select.kevent(fd, select.KQ_FILTER_WRITE) 44 self.assertEqual(ev.ident, fd) 45 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 46 self.assertEqual(ev.flags, select.KQ_EV_ADD) 47 self.assertEqual(ev.fflags, 0) 48 self.assertEqual(ev.data, 0) 49 self.assertEqual(ev.udata, 0) 50 self.assertEqual(ev, ev) 51 self.assertNotEqual(ev, other) 52 53 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) 54 self.assertEqual(ev.ident, fd) 55 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 56 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) 57 self.assertEqual(ev.fflags, 0) 58 self.assertEqual(ev.data, 0) 59 self.assertEqual(ev.udata, 0) 60 self.assertEqual(ev, ev) 61 self.assertNotEqual(ev, other) 62 63 ev = select.kevent(1, 2, 3, 4, 5, 6) 64 self.assertEqual(ev.ident, 1) 65 self.assertEqual(ev.filter, 2) 66 self.assertEqual(ev.flags, 3) 67 self.assertEqual(ev.fflags, 4) 68 self.assertEqual(ev.data, 5) 69 self.assertEqual(ev.udata, 6) 70 self.assertEqual(ev, ev) 71 self.assertNotEqual(ev, other) 72 73 bignum = 0x7fff 74 ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum) 75 self.assertEqual(ev.ident, bignum) 76 self.assertEqual(ev.filter, 1) 77 self.assertEqual(ev.flags, 2) 78 self.assertEqual(ev.fflags, 3) 79 self.assertEqual(ev.data, bignum - 1) 80 self.assertEqual(ev.udata, bignum) 81 self.assertEqual(ev, ev) 82 self.assertNotEqual(ev, other) 83 84 # Issue 11973 85 bignum = 0xffff 86 ev = select.kevent(0, 1, bignum) 87 self.assertEqual(ev.ident, 0) 88 self.assertEqual(ev.filter, 1) 89 self.assertEqual(ev.flags, bignum) 90 self.assertEqual(ev.fflags, 0) 91 self.assertEqual(ev.data, 0) 92 self.assertEqual(ev.udata, 0) 93 self.assertEqual(ev, ev) 94 self.assertNotEqual(ev, other) 95 96 # Issue 11973 97 bignum = 0xffffffff 98 ev = select.kevent(0, 1, 2, bignum) 99 self.assertEqual(ev.ident, 0) 100 self.assertEqual(ev.filter, 1) 101 self.assertEqual(ev.flags, 2) 102 self.assertEqual(ev.fflags, bignum) 103 self.assertEqual(ev.data, 0) 104 self.assertEqual(ev.udata, 0) 105 self.assertEqual(ev, ev) 106 self.assertNotEqual(ev, other) 107 108 109 def test_queue_event(self): 110 serverSocket = socket.socket() 111 serverSocket.bind(('127.0.0.1', 0)) 112 serverSocket.listen(1) 113 client = socket.socket() 114 client.setblocking(False) 115 try: 116 client.connect(('127.0.0.1', serverSocket.getsockname()[1])) 117 except socket.error, e: 118 self.assertEqual(e.args[0], errno.EINPROGRESS) 119 else: 120 #raise AssertionError("Connect should have raised EINPROGRESS") 121 pass # FreeBSD doesn't raise an exception here 122 server, addr = serverSocket.accept() 123 124 kq = select.kqueue() 125 kq2 = select.kqueue.fromfd(kq.fileno()) 126 127 ev = select.kevent(server.fileno(), 128 select.KQ_FILTER_WRITE, 129 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 130 kq.control([ev], 0) 131 ev = select.kevent(server.fileno(), 132 select.KQ_FILTER_READ, 133 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 134 kq.control([ev], 0) 135 ev = select.kevent(client.fileno(), 136 select.KQ_FILTER_WRITE, 137 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 138 kq2.control([ev], 0) 139 ev = select.kevent(client.fileno(), 140 select.KQ_FILTER_READ, 141 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 142 kq2.control([ev], 0) 143 144 events = kq.control(None, 4, 1) 145 events = set((e.ident, e.filter) for e in events) 146 self.assertEqual(events, set([ 147 (client.fileno(), select.KQ_FILTER_WRITE), 148 (server.fileno(), select.KQ_FILTER_WRITE)])) 149 150 client.send("Hello!") 151 server.send("world!!!") 152 153 # We may need to call it several times 154 for i in range(10): 155 events = kq.control(None, 4, 1) 156 if len(events) == 4: 157 break 158 time.sleep(1.0) 159 else: 160 self.fail('timeout waiting for event notifications') 161 162 events = set((e.ident, e.filter) for e in events) 163 self.assertEqual(events, set([ 164 (client.fileno(), select.KQ_FILTER_WRITE), 165 (client.fileno(), select.KQ_FILTER_READ), 166 (server.fileno(), select.KQ_FILTER_WRITE), 167 (server.fileno(), select.KQ_FILTER_READ)])) 168 169 # Remove completely client, and server read part 170 ev = select.kevent(client.fileno(), 171 select.KQ_FILTER_WRITE, 172 select.KQ_EV_DELETE) 173 kq.control([ev], 0) 174 ev = select.kevent(client.fileno(), 175 select.KQ_FILTER_READ, 176 select.KQ_EV_DELETE) 177 kq.control([ev], 0) 178 ev = select.kevent(server.fileno(), 179 select.KQ_FILTER_READ, 180 select.KQ_EV_DELETE) 181 kq.control([ev], 0, 0) 182 183 events = kq.control([], 4, 0.99) 184 events = set((e.ident, e.filter) for e in events) 185 self.assertEqual(events, set([ 186 (server.fileno(), select.KQ_FILTER_WRITE)])) 187 188 client.close() 189 server.close() 190 serverSocket.close() 191 192 def testPair(self): 193 kq = select.kqueue() 194 a, b = socket.socketpair() 195 196 a.send(b'foo') 197 event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 198 event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 199 r = kq.control([event1, event2], 1, 1) 200 self.assertTrue(r) 201 self.assertFalse(r[0].flags & select.KQ_EV_ERROR) 202 self.assertEqual(b.recv(r[0].data), b'foo') 203 204 a.close() 205 b.close() 206 kq.close() 207 208def test_main(): 209 test_support.run_unittest(TestKQueue) 210 211if __name__ == "__main__": 212 test_main() 213