• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2#
3# Permission is hereby granted, free of charge, to any person obtaining
4# a copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish,
7# distribute, sublicense, and/or sell copies of the Software, and to
8# permit persons to whom the Software is furnished to do so, subject to
9# the following conditions:
10#
11# The above copyright notice and this permission notice shall be
12# included in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21"""
22Tests for epoll wrapper.
23"""
24import errno
25import os
26import select
27import socket
28import time
29import unittest
30
31if not hasattr(select, "epoll"):
32    raise unittest.SkipTest("test works only on Linux 2.6")
33
34try:
35    select.epoll()
36except OSError as e:
37    if e.errno == errno.ENOSYS:
38        raise unittest.SkipTest("kernel doesn't support epoll()")
39    raise
40
41class TestEPoll(unittest.TestCase):
42
43    def setUp(self):
44        self.serverSocket = socket.create_server(('127.0.0.1', 0))
45        self.connections = [self.serverSocket]
46
47    def tearDown(self):
48        for skt in self.connections:
49            skt.close()
50
51    def _connected_pair(self):
52        client = socket.socket()
53        client.setblocking(False)
54        try:
55            client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
56        except OSError as e:
57            self.assertEqual(e.args[0], errno.EINPROGRESS)
58        else:
59            raise AssertionError("Connect should have raised EINPROGRESS")
60        server, addr = self.serverSocket.accept()
61
62        self.connections.extend((client, server))
63        return client, server
64
65    def test_create(self):
66        try:
67            ep = select.epoll(16)
68        except OSError as e:
69            raise AssertionError(str(e))
70        self.assertTrue(ep.fileno() > 0, ep.fileno())
71        self.assertTrue(not ep.closed)
72        ep.close()
73        self.assertTrue(ep.closed)
74        self.assertRaises(ValueError, ep.fileno)
75
76        if hasattr(select, "EPOLL_CLOEXEC"):
77            select.epoll(-1, select.EPOLL_CLOEXEC).close()
78            select.epoll(flags=select.EPOLL_CLOEXEC).close()
79            select.epoll(flags=0).close()
80
81    def test_badcreate(self):
82        self.assertRaises(TypeError, select.epoll, 1, 2, 3)
83        self.assertRaises(TypeError, select.epoll, 'foo')
84        self.assertRaises(TypeError, select.epoll, None)
85        self.assertRaises(TypeError, select.epoll, ())
86        self.assertRaises(TypeError, select.epoll, ['foo'])
87        self.assertRaises(TypeError, select.epoll, {})
88
89        self.assertRaises(ValueError, select.epoll, 0)
90        self.assertRaises(ValueError, select.epoll, -2)
91        self.assertRaises(ValueError, select.epoll, sizehint=-2)
92
93        if hasattr(select, "EPOLL_CLOEXEC"):
94            self.assertRaises(OSError, select.epoll, flags=12356)
95
96    def test_context_manager(self):
97        with select.epoll(16) as ep:
98            self.assertGreater(ep.fileno(), 0)
99            self.assertFalse(ep.closed)
100        self.assertTrue(ep.closed)
101        self.assertRaises(ValueError, ep.fileno)
102
103    def test_add(self):
104        server, client = self._connected_pair()
105
106        ep = select.epoll(2)
107        try:
108            ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
109            ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
110        finally:
111            ep.close()
112
113        # adding by object w/ fileno works, too.
114        ep = select.epoll(2)
115        try:
116            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
117            ep.register(client, select.EPOLLIN | select.EPOLLOUT)
118        finally:
119            ep.close()
120
121        ep = select.epoll(2)
122        try:
123            # TypeError: argument must be an int, or have a fileno() method.
124            self.assertRaises(TypeError, ep.register, object(),
125                              select.EPOLLIN | select.EPOLLOUT)
126            self.assertRaises(TypeError, ep.register, None,
127                              select.EPOLLIN | select.EPOLLOUT)
128            # ValueError: file descriptor cannot be a negative integer (-1)
129            self.assertRaises(ValueError, ep.register, -1,
130                              select.EPOLLIN | select.EPOLLOUT)
131            # OSError: [Errno 9] Bad file descriptor
132            self.assertRaises(OSError, ep.register, 10000,
133                              select.EPOLLIN | select.EPOLLOUT)
134            # registering twice also raises an exception
135            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
136            self.assertRaises(OSError, ep.register, server,
137                              select.EPOLLIN | select.EPOLLOUT)
138        finally:
139            ep.close()
140
141    def test_fromfd(self):
142        server, client = self._connected_pair()
143
144        with select.epoll(2) as ep:
145            ep2 = select.epoll.fromfd(ep.fileno())
146
147            ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
148            ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
149
150            events = ep.poll(1, 4)
151            events2 = ep2.poll(0.9, 4)
152            self.assertEqual(len(events), 2)
153            self.assertEqual(len(events2), 2)
154
155        try:
156            ep2.poll(1, 4)
157        except OSError as e:
158            self.assertEqual(e.args[0], errno.EBADF, e)
159        else:
160            self.fail("epoll on closed fd didn't raise EBADF")
161
162    def test_control_and_wait(self):
163        client, server = self._connected_pair()
164
165        ep = select.epoll(16)
166        ep.register(server.fileno(),
167                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
168        ep.register(client.fileno(),
169                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
170
171        now = time.monotonic()
172        events = ep.poll(1, 4)
173        then = time.monotonic()
174        self.assertFalse(then - now > 0.1, then - now)
175
176        events.sort()
177        expected = [(client.fileno(), select.EPOLLOUT),
178                    (server.fileno(), select.EPOLLOUT)]
179        expected.sort()
180
181        self.assertEqual(events, expected)
182
183        events = ep.poll(timeout=2.1, maxevents=4)
184        self.assertFalse(events)
185
186        client.send(b"Hello!")
187        server.send(b"world!!!")
188
189        now = time.monotonic()
190        events = ep.poll(1, 4)
191        then = time.monotonic()
192        self.assertFalse(then - now > 0.01)
193
194        events.sort()
195        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
196                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
197        expected.sort()
198
199        self.assertEqual(events, expected)
200
201        ep.unregister(client.fileno())
202        ep.modify(server.fileno(), select.EPOLLOUT)
203        now = time.monotonic()
204        events = ep.poll(1, 4)
205        then = time.monotonic()
206        self.assertFalse(then - now > 0.01)
207
208        expected = [(server.fileno(), select.EPOLLOUT)]
209        self.assertEqual(events, expected)
210
211    def test_errors(self):
212        self.assertRaises(ValueError, select.epoll, -2)
213        self.assertRaises(ValueError, select.epoll().register, -1,
214                          select.EPOLLIN)
215
216    def test_unregister_closed(self):
217        server, client = self._connected_pair()
218        fd = server.fileno()
219        ep = select.epoll(16)
220        ep.register(server)
221
222        now = time.monotonic()
223        events = ep.poll(1, 4)
224        then = time.monotonic()
225        self.assertFalse(then - now > 0.01)
226
227        server.close()
228        ep.unregister(fd)
229
230    def test_close(self):
231        open_file = open(__file__, "rb")
232        self.addCleanup(open_file.close)
233        fd = open_file.fileno()
234        epoll = select.epoll()
235
236        # test fileno() method and closed attribute
237        self.assertIsInstance(epoll.fileno(), int)
238        self.assertFalse(epoll.closed)
239
240        # test close()
241        epoll.close()
242        self.assertTrue(epoll.closed)
243        self.assertRaises(ValueError, epoll.fileno)
244
245        # close() can be called more than once
246        epoll.close()
247
248        # operations must fail with ValueError("I/O operation on closed ...")
249        self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
250        self.assertRaises(ValueError, epoll.poll, 1.0)
251        self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
252        self.assertRaises(ValueError, epoll.unregister, fd)
253
254    def test_fd_non_inheritable(self):
255        epoll = select.epoll()
256        self.addCleanup(epoll.close)
257        self.assertEqual(os.get_inheritable(epoll.fileno()), False)
258
259
260if __name__ == "__main__":
261    unittest.main()
262