1# Copyright (c) 2001-2006 Twisted Matrix Laboratories. 2# 3# Permission is hereby granted, free of charge, to any person obtaining 4# a copy of this software and associated documentation files (the 5# "Software"), to deal in the Software without restriction, including 6# without limitation the rights to use, copy, modify, merge, publish, 7# distribute, sublicense, and/or sell copies of the Software, and to 8# permit persons to whom the Software is furnished to do so, subject to 9# the following conditions: 10# 11# The above copyright notice and this permission notice shall be 12# included in all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 21""" 22Tests for epoll wrapper. 23""" 24import errno 25import os 26import select 27import socket 28import time 29import unittest 30 31if not hasattr(select, "epoll"): 32 raise unittest.SkipTest("test works only on Linux 2.6") 33 34try: 35 select.epoll() 36except OSError as e: 37 if e.errno == errno.ENOSYS: 38 raise unittest.SkipTest("kernel doesn't support epoll()") 39 raise 40 41class TestEPoll(unittest.TestCase): 42 43 def setUp(self): 44 self.serverSocket = socket.create_server(('127.0.0.1', 0)) 45 self.connections = [self.serverSocket] 46 47 def tearDown(self): 48 for skt in self.connections: 49 skt.close() 50 51 def _connected_pair(self): 52 client = socket.socket() 53 client.setblocking(False) 54 try: 55 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1])) 56 except OSError as e: 57 self.assertEqual(e.args[0], errno.EINPROGRESS) 58 else: 59 raise AssertionError("Connect should have raised EINPROGRESS") 60 server, addr = self.serverSocket.accept() 61 62 self.connections.extend((client, server)) 63 return client, server 64 65 def test_create(self): 66 try: 67 ep = select.epoll(16) 68 except OSError as e: 69 raise AssertionError(str(e)) 70 self.assertTrue(ep.fileno() > 0, ep.fileno()) 71 self.assertTrue(not ep.closed) 72 ep.close() 73 self.assertTrue(ep.closed) 74 self.assertRaises(ValueError, ep.fileno) 75 76 if hasattr(select, "EPOLL_CLOEXEC"): 77 select.epoll(-1, select.EPOLL_CLOEXEC).close() 78 select.epoll(flags=select.EPOLL_CLOEXEC).close() 79 select.epoll(flags=0).close() 80 81 def test_badcreate(self): 82 self.assertRaises(TypeError, select.epoll, 1, 2, 3) 83 self.assertRaises(TypeError, select.epoll, 'foo') 84 self.assertRaises(TypeError, select.epoll, None) 85 self.assertRaises(TypeError, select.epoll, ()) 86 self.assertRaises(TypeError, select.epoll, ['foo']) 87 self.assertRaises(TypeError, select.epoll, {}) 88 89 self.assertRaises(ValueError, select.epoll, 0) 90 self.assertRaises(ValueError, select.epoll, -2) 91 self.assertRaises(ValueError, select.epoll, sizehint=-2) 92 93 if hasattr(select, "EPOLL_CLOEXEC"): 94 self.assertRaises(OSError, select.epoll, flags=12356) 95 96 def test_context_manager(self): 97 with select.epoll(16) as ep: 98 self.assertGreater(ep.fileno(), 0) 99 self.assertFalse(ep.closed) 100 self.assertTrue(ep.closed) 101 self.assertRaises(ValueError, ep.fileno) 102 103 def test_add(self): 104 server, client = self._connected_pair() 105 106 ep = select.epoll(2) 107 try: 108 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) 109 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) 110 finally: 111 ep.close() 112 113 # adding by object w/ fileno works, too. 114 ep = select.epoll(2) 115 try: 116 ep.register(server, select.EPOLLIN | select.EPOLLOUT) 117 ep.register(client, select.EPOLLIN | select.EPOLLOUT) 118 finally: 119 ep.close() 120 121 ep = select.epoll(2) 122 try: 123 # TypeError: argument must be an int, or have a fileno() method. 124 self.assertRaises(TypeError, ep.register, object(), 125 select.EPOLLIN | select.EPOLLOUT) 126 self.assertRaises(TypeError, ep.register, None, 127 select.EPOLLIN | select.EPOLLOUT) 128 # ValueError: file descriptor cannot be a negative integer (-1) 129 self.assertRaises(ValueError, ep.register, -1, 130 select.EPOLLIN | select.EPOLLOUT) 131 # OSError: [Errno 9] Bad file descriptor 132 self.assertRaises(OSError, ep.register, 10000, 133 select.EPOLLIN | select.EPOLLOUT) 134 # registering twice also raises an exception 135 ep.register(server, select.EPOLLIN | select.EPOLLOUT) 136 self.assertRaises(OSError, ep.register, server, 137 select.EPOLLIN | select.EPOLLOUT) 138 finally: 139 ep.close() 140 141 def test_fromfd(self): 142 server, client = self._connected_pair() 143 144 with select.epoll(2) as ep: 145 ep2 = select.epoll.fromfd(ep.fileno()) 146 147 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) 148 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) 149 150 events = ep.poll(1, 4) 151 events2 = ep2.poll(0.9, 4) 152 self.assertEqual(len(events), 2) 153 self.assertEqual(len(events2), 2) 154 155 try: 156 ep2.poll(1, 4) 157 except OSError as e: 158 self.assertEqual(e.args[0], errno.EBADF, e) 159 else: 160 self.fail("epoll on closed fd didn't raise EBADF") 161 162 def test_control_and_wait(self): 163 # create the epoll object 164 client, server = self._connected_pair() 165 ep = select.epoll(16) 166 ep.register(server.fileno(), 167 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) 168 ep.register(client.fileno(), 169 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) 170 171 # EPOLLOUT 172 now = time.monotonic() 173 events = ep.poll(1, 4) 174 then = time.monotonic() 175 self.assertFalse(then - now > 0.1, then - now) 176 177 expected = [(client.fileno(), select.EPOLLOUT), 178 (server.fileno(), select.EPOLLOUT)] 179 self.assertEqual(sorted(events), sorted(expected)) 180 181 # no event 182 events = ep.poll(timeout=0.1, maxevents=4) 183 self.assertFalse(events) 184 185 # send: EPOLLIN and EPOLLOUT 186 client.sendall(b"Hello!") 187 server.sendall(b"world!!!") 188 189 now = time.monotonic() 190 events = ep.poll(1.0, 4) 191 then = time.monotonic() 192 self.assertFalse(then - now > 0.01) 193 194 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), 195 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] 196 self.assertEqual(sorted(events), sorted(expected)) 197 198 # unregister, modify 199 ep.unregister(client.fileno()) 200 ep.modify(server.fileno(), select.EPOLLOUT) 201 now = time.monotonic() 202 events = ep.poll(1, 4) 203 then = time.monotonic() 204 self.assertFalse(then - now > 0.01) 205 206 expected = [(server.fileno(), select.EPOLLOUT)] 207 self.assertEqual(events, expected) 208 209 def test_errors(self): 210 self.assertRaises(ValueError, select.epoll, -2) 211 self.assertRaises(ValueError, select.epoll().register, -1, 212 select.EPOLLIN) 213 214 def test_unregister_closed(self): 215 server, client = self._connected_pair() 216 fd = server.fileno() 217 ep = select.epoll(16) 218 ep.register(server) 219 220 now = time.monotonic() 221 events = ep.poll(1, 4) 222 then = time.monotonic() 223 self.assertFalse(then - now > 0.01) 224 225 server.close() 226 227 with self.assertRaises(OSError) as cm: 228 ep.unregister(fd) 229 self.assertEqual(cm.exception.errno, errno.EBADF) 230 231 def test_close(self): 232 open_file = open(__file__, "rb") 233 self.addCleanup(open_file.close) 234 fd = open_file.fileno() 235 epoll = select.epoll() 236 237 # test fileno() method and closed attribute 238 self.assertIsInstance(epoll.fileno(), int) 239 self.assertFalse(epoll.closed) 240 241 # test close() 242 epoll.close() 243 self.assertTrue(epoll.closed) 244 self.assertRaises(ValueError, epoll.fileno) 245 246 # close() can be called more than once 247 epoll.close() 248 249 # operations must fail with ValueError("I/O operation on closed ...") 250 self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN) 251 self.assertRaises(ValueError, epoll.poll, 1.0) 252 self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN) 253 self.assertRaises(ValueError, epoll.unregister, fd) 254 255 def test_fd_non_inheritable(self): 256 epoll = select.epoll() 257 self.addCleanup(epoll.close) 258 self.assertEqual(os.get_inheritable(epoll.fileno()), False) 259 260 261if __name__ == "__main__": 262 unittest.main() 263