• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2Tests for kqueue wrapper.
3"""
4import socket
5import errno
6import time
7import select
8import sys
9import unittest
10
11from test import test_support
12if not hasattr(select, "kqueue"):
13    raise unittest.SkipTest("test works only on BSD")
14
15class TestKQueue(unittest.TestCase):
16    def test_create_queue(self):
17        kq = select.kqueue()
18        self.assertTrue(kq.fileno() > 0, kq.fileno())
19        self.assertTrue(not kq.closed)
20        kq.close()
21        self.assertTrue(kq.closed)
22        self.assertRaises(ValueError, kq.fileno)
23
24    def test_create_event(self):
25        fd = sys.stderr.fileno()
26        ev = select.kevent(fd)
27        other = select.kevent(1000)
28        self.assertEqual(ev.ident, fd)
29        self.assertEqual(ev.filter, select.KQ_FILTER_READ)
30        self.assertEqual(ev.flags, select.KQ_EV_ADD)
31        self.assertEqual(ev.fflags, 0)
32        self.assertEqual(ev.data, 0)
33        self.assertEqual(ev.udata, 0)
34        self.assertEqual(ev, ev)
35        self.assertNotEqual(ev, other)
36        self.assertEqual(cmp(ev, other), -1)
37        self.assertTrue(ev < other)
38        self.assertTrue(other >= ev)
39        self.assertRaises(TypeError, cmp, ev, None)
40        self.assertRaises(TypeError, cmp, ev, 1)
41        self.assertRaises(TypeError, cmp, ev, "ev")
42
43        ev = select.kevent(fd, select.KQ_FILTER_WRITE)
44        self.assertEqual(ev.ident, fd)
45        self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
46        self.assertEqual(ev.flags, select.KQ_EV_ADD)
47        self.assertEqual(ev.fflags, 0)
48        self.assertEqual(ev.data, 0)
49        self.assertEqual(ev.udata, 0)
50        self.assertEqual(ev, ev)
51        self.assertNotEqual(ev, other)
52
53        ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
54        self.assertEqual(ev.ident, fd)
55        self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
56        self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
57        self.assertEqual(ev.fflags, 0)
58        self.assertEqual(ev.data, 0)
59        self.assertEqual(ev.udata, 0)
60        self.assertEqual(ev, ev)
61        self.assertNotEqual(ev, other)
62
63        ev = select.kevent(1, 2, 3, 4, 5, 6)
64        self.assertEqual(ev.ident, 1)
65        self.assertEqual(ev.filter, 2)
66        self.assertEqual(ev.flags, 3)
67        self.assertEqual(ev.fflags, 4)
68        self.assertEqual(ev.data, 5)
69        self.assertEqual(ev.udata, 6)
70        self.assertEqual(ev, ev)
71        self.assertNotEqual(ev, other)
72
73        bignum = 0x7fff
74        ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
75        self.assertEqual(ev.ident, bignum)
76        self.assertEqual(ev.filter, 1)
77        self.assertEqual(ev.flags, 2)
78        self.assertEqual(ev.fflags, 3)
79        self.assertEqual(ev.data, bignum - 1)
80        self.assertEqual(ev.udata, bignum)
81        self.assertEqual(ev, ev)
82        self.assertNotEqual(ev, other)
83
84        # Issue 11973
85        bignum = 0xffff
86        ev = select.kevent(0, 1, bignum)
87        self.assertEqual(ev.ident, 0)
88        self.assertEqual(ev.filter, 1)
89        self.assertEqual(ev.flags, bignum)
90        self.assertEqual(ev.fflags, 0)
91        self.assertEqual(ev.data, 0)
92        self.assertEqual(ev.udata, 0)
93        self.assertEqual(ev, ev)
94        self.assertNotEqual(ev, other)
95
96        # Issue 11973
97        bignum = 0xffffffff
98        ev = select.kevent(0, 1, 2, bignum)
99        self.assertEqual(ev.ident, 0)
100        self.assertEqual(ev.filter, 1)
101        self.assertEqual(ev.flags, 2)
102        self.assertEqual(ev.fflags, bignum)
103        self.assertEqual(ev.data, 0)
104        self.assertEqual(ev.udata, 0)
105        self.assertEqual(ev, ev)
106        self.assertNotEqual(ev, other)
107
108
109    def test_queue_event(self):
110        serverSocket = socket.socket()
111        serverSocket.bind(('127.0.0.1', 0))
112        serverSocket.listen(1)
113        client = socket.socket()
114        client.setblocking(False)
115        try:
116            client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
117        except socket.error, e:
118            self.assertEqual(e.args[0], errno.EINPROGRESS)
119        else:
120            #raise AssertionError("Connect should have raised EINPROGRESS")
121            pass # FreeBSD doesn't raise an exception here
122        server, addr = serverSocket.accept()
123
124        kq = select.kqueue()
125        kq2 = select.kqueue.fromfd(kq.fileno())
126
127        ev = select.kevent(server.fileno(),
128                           select.KQ_FILTER_WRITE,
129                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
130        kq.control([ev], 0)
131        ev = select.kevent(server.fileno(),
132                           select.KQ_FILTER_READ,
133                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
134        kq.control([ev], 0)
135        ev = select.kevent(client.fileno(),
136                           select.KQ_FILTER_WRITE,
137                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
138        kq2.control([ev], 0)
139        ev = select.kevent(client.fileno(),
140                           select.KQ_FILTER_READ,
141                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
142        kq2.control([ev], 0)
143
144        events = kq.control(None, 4, 1)
145        events = set((e.ident, e.filter) for e in events)
146        self.assertEqual(events, set([
147            (client.fileno(), select.KQ_FILTER_WRITE),
148            (server.fileno(), select.KQ_FILTER_WRITE)]))
149
150        client.send("Hello!")
151        server.send("world!!!")
152
153        # We may need to call it several times
154        for i in range(10):
155            events = kq.control(None, 4, 1)
156            if len(events) == 4:
157                break
158            time.sleep(1.0)
159        else:
160            self.fail('timeout waiting for event notifications')
161
162        events = set((e.ident, e.filter) for e in events)
163        self.assertEqual(events, set([
164            (client.fileno(), select.KQ_FILTER_WRITE),
165            (client.fileno(), select.KQ_FILTER_READ),
166            (server.fileno(), select.KQ_FILTER_WRITE),
167            (server.fileno(), select.KQ_FILTER_READ)]))
168
169        # Remove completely client, and server read part
170        ev = select.kevent(client.fileno(),
171                           select.KQ_FILTER_WRITE,
172                           select.KQ_EV_DELETE)
173        kq.control([ev], 0)
174        ev = select.kevent(client.fileno(),
175                           select.KQ_FILTER_READ,
176                           select.KQ_EV_DELETE)
177        kq.control([ev], 0)
178        ev = select.kevent(server.fileno(),
179                           select.KQ_FILTER_READ,
180                           select.KQ_EV_DELETE)
181        kq.control([ev], 0, 0)
182
183        events = kq.control([], 4, 0.99)
184        events = set((e.ident, e.filter) for e in events)
185        self.assertEqual(events, set([
186            (server.fileno(), select.KQ_FILTER_WRITE)]))
187
188        client.close()
189        server.close()
190        serverSocket.close()
191
192    def testPair(self):
193        kq = select.kqueue()
194        a, b = socket.socketpair()
195
196        a.send(b'foo')
197        event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
198        event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
199        r = kq.control([event1, event2], 1, 1)
200        self.assertTrue(r)
201        self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
202        self.assertEqual(b.recv(r[0].data), b'foo')
203
204        a.close()
205        b.close()
206        kq.close()
207
208def test_main():
209    test_support.run_unittest(TestKQueue)
210
211if __name__ == "__main__":
212    test_main()
213