• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2#
3# Permission is hereby granted, free of charge, to any person obtaining
4# a copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish,
7# distribute, sublicense, and/or sell copies of the Software, and to
8# permit persons to whom the Software is furnished to do so, subject to
9# the following conditions:
10#
11# The above copyright notice and this permission notice shall be
12# included in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21"""
22Tests for epoll wrapper.
23"""
24import errno
25import os
26import select
27import socket
28import time
29import unittest
30from test import support
31
32if not hasattr(select, "epoll"):
33    raise unittest.SkipTest("test works only on Linux 2.6")
34
35try:
36    select.epoll()
37except OSError as e:
38    if e.errno == errno.ENOSYS:
39        raise unittest.SkipTest("kernel doesn't support epoll()")
40    raise
41
42class TestEPoll(unittest.TestCase):
43
44    def setUp(self):
45        self.serverSocket = socket.create_server(('127.0.0.1', 0))
46        self.connections = [self.serverSocket]
47
48    def tearDown(self):
49        for skt in self.connections:
50            skt.close()
51
52    def _connected_pair(self):
53        client = socket.socket()
54        client.setblocking(False)
55        try:
56            client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
57        except OSError as e:
58            self.assertEqual(e.args[0], errno.EINPROGRESS)
59        else:
60            raise AssertionError("Connect should have raised EINPROGRESS")
61        server, addr = self.serverSocket.accept()
62
63        self.connections.extend((client, server))
64        return client, server
65
66    def test_create(self):
67        try:
68            ep = select.epoll(16)
69        except OSError as e:
70            raise AssertionError(str(e))
71        self.assertTrue(ep.fileno() > 0, ep.fileno())
72        self.assertTrue(not ep.closed)
73        ep.close()
74        self.assertTrue(ep.closed)
75        self.assertRaises(ValueError, ep.fileno)
76
77        if hasattr(select, "EPOLL_CLOEXEC"):
78            select.epoll(-1, select.EPOLL_CLOEXEC).close()
79            select.epoll(flags=select.EPOLL_CLOEXEC).close()
80            select.epoll(flags=0).close()
81
82    def test_badcreate(self):
83        self.assertRaises(TypeError, select.epoll, 1, 2, 3)
84        self.assertRaises(TypeError, select.epoll, 'foo')
85        self.assertRaises(TypeError, select.epoll, None)
86        self.assertRaises(TypeError, select.epoll, ())
87        self.assertRaises(TypeError, select.epoll, ['foo'])
88        self.assertRaises(TypeError, select.epoll, {})
89
90        self.assertRaises(ValueError, select.epoll, 0)
91        self.assertRaises(ValueError, select.epoll, -2)
92        self.assertRaises(ValueError, select.epoll, sizehint=-2)
93
94        if hasattr(select, "EPOLL_CLOEXEC"):
95            self.assertRaises(OSError, select.epoll, flags=12356)
96
97    def test_context_manager(self):
98        with select.epoll(16) as ep:
99            self.assertGreater(ep.fileno(), 0)
100            self.assertFalse(ep.closed)
101        self.assertTrue(ep.closed)
102        self.assertRaises(ValueError, ep.fileno)
103
104    def test_add(self):
105        server, client = self._connected_pair()
106
107        ep = select.epoll(2)
108        try:
109            ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
110            ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
111        finally:
112            ep.close()
113
114        # adding by object w/ fileno works, too.
115        ep = select.epoll(2)
116        try:
117            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
118            ep.register(client, select.EPOLLIN | select.EPOLLOUT)
119        finally:
120            ep.close()
121
122        ep = select.epoll(2)
123        try:
124            # TypeError: argument must be an int, or have a fileno() method.
125            self.assertRaises(TypeError, ep.register, object(),
126                              select.EPOLLIN | select.EPOLLOUT)
127            self.assertRaises(TypeError, ep.register, None,
128                              select.EPOLLIN | select.EPOLLOUT)
129            # ValueError: file descriptor cannot be a negative integer (-1)
130            self.assertRaises(ValueError, ep.register, -1,
131                              select.EPOLLIN | select.EPOLLOUT)
132            # OSError: [Errno 9] Bad file descriptor
133            self.assertRaises(OSError, ep.register, 10000,
134                              select.EPOLLIN | select.EPOLLOUT)
135            # registering twice also raises an exception
136            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
137            self.assertRaises(OSError, ep.register, server,
138                              select.EPOLLIN | select.EPOLLOUT)
139        finally:
140            ep.close()
141
142    def test_fromfd(self):
143        server, client = self._connected_pair()
144
145        with select.epoll(2) as ep:
146            ep2 = select.epoll.fromfd(ep.fileno())
147
148            ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
149            ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
150
151            events = ep.poll(1, 4)
152            events2 = ep2.poll(0.9, 4)
153            self.assertEqual(len(events), 2)
154            self.assertEqual(len(events2), 2)
155
156        try:
157            ep2.poll(1, 4)
158        except OSError as e:
159            self.assertEqual(e.args[0], errno.EBADF, e)
160        else:
161            self.fail("epoll on closed fd didn't raise EBADF")
162
163    def test_control_and_wait(self):
164        # create the epoll object
165        client, server = self._connected_pair()
166        ep = select.epoll(16)
167        ep.register(server.fileno(),
168                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
169        ep.register(client.fileno(),
170                    select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
171
172        # EPOLLOUT
173        now = time.monotonic()
174        events = ep.poll(1, 4)
175        then = time.monotonic()
176        self.assertFalse(then - now > 0.1, then - now)
177
178        expected = [(client.fileno(), select.EPOLLOUT),
179                    (server.fileno(), select.EPOLLOUT)]
180        self.assertEqual(sorted(events), sorted(expected))
181
182        # no event
183        events = ep.poll(timeout=0.1, maxevents=4)
184        self.assertFalse(events)
185
186        # send: EPOLLIN and EPOLLOUT
187        client.sendall(b"Hello!")
188        server.sendall(b"world!!!")
189
190        # we might receive events one at a time, necessitating multiple calls to
191        # poll
192        events = []
193        for _ in support.busy_retry(support.SHORT_TIMEOUT):
194            now = time.monotonic()
195            events += ep.poll(1.0, 4)
196            then = time.monotonic()
197            self.assertFalse(then - now > 0.01)
198            if len(events) >= 2:
199                break
200
201        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
202                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
203        self.assertEqual(sorted(events), sorted(expected))
204
205        # unregister, modify
206        ep.unregister(client.fileno())
207        ep.modify(server.fileno(), select.EPOLLOUT)
208        now = time.monotonic()
209        events = ep.poll(1, 4)
210        then = time.monotonic()
211        self.assertFalse(then - now > 0.01)
212
213        expected = [(server.fileno(), select.EPOLLOUT)]
214        self.assertEqual(events, expected)
215
216    def test_errors(self):
217        self.assertRaises(ValueError, select.epoll, -2)
218        self.assertRaises(ValueError, select.epoll().register, -1,
219                          select.EPOLLIN)
220
221    def test_unregister_closed(self):
222        server, client = self._connected_pair()
223        fd = server.fileno()
224        ep = select.epoll(16)
225        ep.register(server)
226
227        now = time.monotonic()
228        events = ep.poll(1, 4)
229        then = time.monotonic()
230        self.assertFalse(then - now > 0.01)
231
232        server.close()
233
234        with self.assertRaises(OSError) as cm:
235            ep.unregister(fd)
236        self.assertEqual(cm.exception.errno, errno.EBADF)
237
238    def test_close(self):
239        open_file = open(__file__, "rb")
240        self.addCleanup(open_file.close)
241        fd = open_file.fileno()
242        epoll = select.epoll()
243
244        # test fileno() method and closed attribute
245        self.assertIsInstance(epoll.fileno(), int)
246        self.assertFalse(epoll.closed)
247
248        # test close()
249        epoll.close()
250        self.assertTrue(epoll.closed)
251        self.assertRaises(ValueError, epoll.fileno)
252
253        # close() can be called more than once
254        epoll.close()
255
256        # operations must fail with ValueError("I/O operation on closed ...")
257        self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN)
258        self.assertRaises(ValueError, epoll.poll, 1.0)
259        self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN)
260        self.assertRaises(ValueError, epoll.unregister, fd)
261
262    def test_fd_non_inheritable(self):
263        epoll = select.epoll()
264        self.addCleanup(epoll.close)
265        self.assertEqual(os.get_inheritable(epoll.fileno()), False)
266
267
268if __name__ == "__main__":
269    unittest.main()
270