• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2#
3# Permission is hereby granted, free of charge, to any person obtaining
4# a copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish,
7# distribute, sublicense, and/or sell copies of the Software, and to
8# permit persons to whom the Software is furnished to do so, subject to
9# the following conditions:
10#
11# The above copyright notice and this permission notice shall be
12# included in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21"""
22Tests for epoll wrapper.
23"""
24import errno
25import os
26import select
27import socket
28import time
29import unittest
30
31if not hasattr(select, "epoll"):
32    raise unittest.SkipTest("test works only on Linux 2.6")
33
34try:
35    select.epoll()
36except OSError as e:
37    if e.errno == errno.ENOSYS:
38        raise unittest.SkipTest("kernel doesn't support epoll()")
39    raise
40
41class TestEPoll(unittest.TestCase):
42
43    def setUp(self):
44        self.serverSocket = socket.create_server(('127.0.0.1', 0))
45        self.connections = [self.serverSocket]
46
47    def tearDown(self):
48        for skt in self.connections:
49            skt.close()
50
51    def _connected_pair(self):
52        client = socket.socket()
53        client.setblocking(False)
54        try:
55            client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
56        except OSError as e:
57            self.assertEqual(e.args[0], errno.EINPROGRESS)
58        else:
59            raise AssertionError("Connect should have raised EINPROGRESS")
60        server, addr = self.serverSocket.accept()
61
62        self.connections.extend((client, server))
63        return client, server
64
65    def test_create(self):
66        try:
67            ep = select.epoll(16)
68        except OSError as e:
69            raise AssertionError(str(e))
70        self.assertTrue(ep.fileno() > 0, ep.fileno())
71        self.assertTrue(not ep.closed)
72        ep.close()
73        self.assertTrue(ep.closed)
74        self.assertRaises(ValueError, ep.fileno)
75
76        if hasattr(select, "EPOLL_CLOEXEC"):
77            select.epoll(-1, select.EPOLL_CLOEXEC).close()
78            select.epoll(flags=select.EPOLL_CLOEXEC).close()
79            select.epoll(flags=0).close()
80
81    def test_badcreate(self):
82        self.assertRaises(TypeError, select.epoll, 1, 2, 3)
83        self.assertRaises(TypeError, select.epoll, 'foo')
84        self.assertRaises(TypeError, select.epoll, None)
85        self.assertRaises(TypeError, select.epoll, ())
86        self.assertRaises(TypeError, select.epoll, ['foo'])
87        self.assertRaises(TypeError, select.epoll, {})
88
89        self.assertRaises(ValueError, select.epoll, 0)
90        self.assertRaises(ValueError, select.epoll, -2)
91        self.assertRaises(ValueError, select.epoll, sizehint=-2)
92
93        if hasattr(select, "EPOLL_CLOEXEC"):
94            self.assertRaises(OSError, select.epoll, flags=12356)
95
96    def test_context_manager(self):
97        with select.epoll(16) as ep:
98            self.assertGreater(ep.fileno(), 0)
99            self.assertFalse(ep.closed)
100        self.assertTrue(ep.closed)
101        self.assertRaises(ValueError, ep.fileno)
102
103    def test_add(self):
104        server, client = self._connected_pair()
105
106        ep = select.epoll(2)
107        try:
108            ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
109            ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
110        finally:
111            ep.close()
112
113        # adding by object w/ fileno works, too.
114        ep = select.epoll(2)
115        try:
116            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
117            ep.register(client, select.EPOLLIN | select.EPOLLOUT)
118        finally:
119            ep.close()
120
121        ep = select.epoll(2)
122        try:
123            # TypeError: argument must be an int, or have a fileno() method.
124            self.assertRaises(TypeError, ep.register, object(),
125                              select.EPOLLIN | select.EPOLLOUT)
126            self.assertRaises(TypeError, ep.register, None,
127                              select.EPOLLIN | select.EPOLLOUT)
128            # ValueError: file descriptor cannot be a negative integer (-1)
129            self.assertRaises(ValueError, ep.register, -1,
130                              select.EPOLLIN | select.EPOLLOUT)
131            # OSError: [Errno 9] Bad file descriptor
132            self.assertRaises(OSError, ep.register, 10000,
133                              select.EPOLLIN | select.EPOLLOUT)
134            # registering twice also raises an exception
135            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
136            self.assertRaises(OSError, ep.register, server,
137                              select.EPOLLIN | select.EPOLLOUT)
138        finally:
139            ep.close()
140
141    def test_fromfd(self):
142        server, client = self._connected_pair()
143
144        with select.epoll(2) as ep:
145            ep2 = select.epoll.fromfd(ep.fileno())
146
147            ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
148            ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
149
150            events = ep.poll(1, 4)
151            events2 = ep2.poll(0.9, 4)
152            self.assertEqual(len(events), 2)
153            self.assertEqual(len(events2), 2)
154
155        try:
156            ep2.poll(1, 4)
157        except OSError as e:
158            self.assertEqual(e.args[0], errno.EBADF, e)
159        else:
160            self.fail("epoll on closed fd didn't raise EBADF")
161
162    def test_control_and_wait(self):
163        # create the epoll object
164        client, server = self._connected_pair()
165        ep = select.epoll(16)
166        ep.register(server.fileno(),
167                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
168        ep.register(client.fileno(),
169                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
170
171        # EPOLLOUT
172        now = time.monotonic()
173        events = ep.poll(1, 4)
174        then = time.monotonic()
175        self.assertFalse(then - now > 0.1, then - now)
176
177        expected = [(client.fileno(), select.EPOLLOUT),
178                    (server.fileno(), select.EPOLLOUT)]
179        self.assertEqual(sorted(events), sorted(expected))
180
181        # no event
182        events = ep.poll(timeout=0.1, maxevents=4)
183        self.assertFalse(events)
184
185        # send: EPOLLIN and EPOLLOUT
186        client.sendall(b"Hello!")
187        server.sendall(b"world!!!")
188
189        now = time.monotonic()
190        events = ep.poll(1.0, 4)
191        then = time.monotonic()
192        self.assertFalse(then - now > 0.01)
193
194        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
195                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
196        self.assertEqual(sorted(events), sorted(expected))
197
198        # unregister, modify
199        ep.unregister(client.fileno())
200        ep.modify(server.fileno(), select.EPOLLOUT)
201        now = time.monotonic()
202        events = ep.poll(1, 4)
203        then = time.monotonic()
204        self.assertFalse(then - now > 0.01)
205
206        expected = [(server.fileno(), select.EPOLLOUT)]
207        self.assertEqual(events, expected)
208
209    def test_errors(self):
210        self.assertRaises(ValueError, select.epoll, -2)
211        self.assertRaises(ValueError, select.epoll().register, -1,
212                          select.EPOLLIN)
213
214    def test_unregister_closed(self):
215        server, client = self._connected_pair()
216        fd = server.fileno()
217        ep = select.epoll(16)
218        ep.register(server)
219
220        now = time.monotonic()
221        events = ep.poll(1, 4)
222        then = time.monotonic()
223        self.assertFalse(then - now > 0.01)
224
225        server.close()
226
227        with self.assertRaises(OSError) as cm:
228            ep.unregister(fd)
229        self.assertEqual(cm.exception.errno, errno.EBADF)
230
231    def test_close(self):
232        open_file = open(__file__, "rb")
233        self.addCleanup(open_file.close)
234        fd = open_file.fileno()
235        epoll = select.epoll()
236
237        # test fileno() method and closed attribute
238        self.assertIsInstance(epoll.fileno(), int)
239        self.assertFalse(epoll.closed)
240
241        # test close()
242        epoll.close()
243        self.assertTrue(epoll.closed)
244        self.assertRaises(ValueError, epoll.fileno)
245
246        # close() can be called more than once
247        epoll.close()
248
249        # operations must fail with ValueError("I/O operation on closed ...")
250        self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
251        self.assertRaises(ValueError, epoll.poll, 1.0)
252        self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
253        self.assertRaises(ValueError, epoll.unregister, fd)
254
255    def test_fd_non_inheritable(self):
256        epoll = select.epoll()
257        self.addCleanup(epoll.close)
258        self.assertEqual(os.get_inheritable(epoll.fileno()), False)
259
260
261if __name__ == "__main__":
262    unittest.main()
263