1# Copyright (c) 2001-2006 Twisted Matrix Laboratories. 2# 3# Permission is hereby granted, free of charge, to any person obtaining 4# a copy of this software and associated documentation files (the 5# "Software"), to deal in the Software without restriction, including 6# without limitation the rights to use, copy, modify, merge, publish, 7# distribute, sublicense, and/or sell copies of the Software, and to 8# permit persons to whom the Software is furnished to do so, subject to 9# the following conditions: 10# 11# The above copyright notice and this permission notice shall be 12# included in all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 21""" 22Tests for epoll wrapper. 23""" 24import errno 25import os 26import select 27import socket 28import time 29import unittest 30from test import support 31 32if not hasattr(select, "epoll"): 33 raise unittest.SkipTest("test works only on Linux 2.6") 34 35try: 36 select.epoll() 37except OSError as e: 38 if e.errno == errno.ENOSYS: 39 raise unittest.SkipTest("kernel doesn't support epoll()") 40 raise 41 42class TestEPoll(unittest.TestCase): 43 44 def setUp(self): 45 self.serverSocket = socket.create_server(('127.0.0.1', 0)) 46 self.connections = [self.serverSocket] 47 48 def tearDown(self): 49 for skt in self.connections: 50 skt.close() 51 52 def _connected_pair(self): 53 client = socket.socket() 54 client.setblocking(False) 55 try: 56 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1])) 57 except OSError as e: 58 self.assertEqual(e.args[0], errno.EINPROGRESS) 59 else: 60 raise AssertionError("Connect should have raised EINPROGRESS") 61 server, addr = self.serverSocket.accept() 62 63 self.connections.extend((client, server)) 64 return client, server 65 66 def test_create(self): 67 try: 68 ep = select.epoll(16) 69 except OSError as e: 70 raise AssertionError(str(e)) 71 self.assertTrue(ep.fileno() > 0, ep.fileno()) 72 self.assertTrue(not ep.closed) 73 ep.close() 74 self.assertTrue(ep.closed) 75 self.assertRaises(ValueError, ep.fileno) 76 77 if hasattr(select, "EPOLL_CLOEXEC"): 78 select.epoll(-1, select.EPOLL_CLOEXEC).close() 79 select.epoll(flags=select.EPOLL_CLOEXEC).close() 80 select.epoll(flags=0).close() 81 82 def test_badcreate(self): 83 self.assertRaises(TypeError, select.epoll, 1, 2, 3) 84 self.assertRaises(TypeError, select.epoll, 'foo') 85 self.assertRaises(TypeError, select.epoll, None) 86 self.assertRaises(TypeError, select.epoll, ()) 87 self.assertRaises(TypeError, select.epoll, ['foo']) 88 self.assertRaises(TypeError, select.epoll, {}) 89 90 self.assertRaises(ValueError, select.epoll, 0) 91 self.assertRaises(ValueError, select.epoll, -2) 92 self.assertRaises(ValueError, select.epoll, sizehint=-2) 93 94 if hasattr(select, "EPOLL_CLOEXEC"): 95 self.assertRaises(OSError, select.epoll, flags=12356) 96 97 def test_context_manager(self): 98 with select.epoll(16) as ep: 99 self.assertGreater(ep.fileno(), 0) 100 self.assertFalse(ep.closed) 101 self.assertTrue(ep.closed) 102 self.assertRaises(ValueError, ep.fileno) 103 104 def test_add(self): 105 server, client = self._connected_pair() 106 107 ep = select.epoll(2) 108 try: 109 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) 110 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) 111 finally: 112 ep.close() 113 114 # adding by object w/ fileno works, too. 115 ep = select.epoll(2) 116 try: 117 ep.register(server, select.EPOLLIN | select.EPOLLOUT) 118 ep.register(client, select.EPOLLIN | select.EPOLLOUT) 119 finally: 120 ep.close() 121 122 ep = select.epoll(2) 123 try: 124 # TypeError: argument must be an int, or have a fileno() method. 125 self.assertRaises(TypeError, ep.register, object(), 126 select.EPOLLIN | select.EPOLLOUT) 127 self.assertRaises(TypeError, ep.register, None, 128 select.EPOLLIN | select.EPOLLOUT) 129 # ValueError: file descriptor cannot be a negative integer (-1) 130 self.assertRaises(ValueError, ep.register, -1, 131 select.EPOLLIN | select.EPOLLOUT) 132 # OSError: [Errno 9] Bad file descriptor 133 self.assertRaises(OSError, ep.register, 10000, 134 select.EPOLLIN | select.EPOLLOUT) 135 # registering twice also raises an exception 136 ep.register(server, select.EPOLLIN | select.EPOLLOUT) 137 self.assertRaises(OSError, ep.register, server, 138 select.EPOLLIN | select.EPOLLOUT) 139 finally: 140 ep.close() 141 142 def test_fromfd(self): 143 server, client = self._connected_pair() 144 145 with select.epoll(2) as ep: 146 ep2 = select.epoll.fromfd(ep.fileno()) 147 148 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) 149 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) 150 151 events = ep.poll(1, 4) 152 events2 = ep2.poll(0.9, 4) 153 self.assertEqual(len(events), 2) 154 self.assertEqual(len(events2), 2) 155 156 try: 157 ep2.poll(1, 4) 158 except OSError as e: 159 self.assertEqual(e.args[0], errno.EBADF, e) 160 else: 161 self.fail("epoll on closed fd didn't raise EBADF") 162 163 def test_control_and_wait(self): 164 # create the epoll object 165 client, server = self._connected_pair() 166 ep = select.epoll(16) 167 ep.register(server.fileno(), 168 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) 169 ep.register(client.fileno(), 170 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) 171 172 # EPOLLOUT 173 now = time.monotonic() 174 events = ep.poll(1, 4) 175 then = time.monotonic() 176 self.assertFalse(then - now > 0.1, then - now) 177 178 expected = [(client.fileno(), select.EPOLLOUT), 179 (server.fileno(), select.EPOLLOUT)] 180 self.assertEqual(sorted(events), sorted(expected)) 181 182 # no event 183 events = ep.poll(timeout=0.1, maxevents=4) 184 self.assertFalse(events) 185 186 # send: EPOLLIN and EPOLLOUT 187 client.sendall(b"Hello!") 188 server.sendall(b"world!!!") 189 190 # we might receive events one at a time, necessitating multiple calls to 191 # poll 192 events = [] 193 for _ in support.busy_retry(support.SHORT_TIMEOUT): 194 now = time.monotonic() 195 events += ep.poll(1.0, 4) 196 then = time.monotonic() 197 self.assertFalse(then - now > 0.01) 198 if len(events) >= 2: 199 break 200 201 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), 202 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] 203 self.assertEqual(sorted(events), sorted(expected)) 204 205 # unregister, modify 206 ep.unregister(client.fileno()) 207 ep.modify(server.fileno(), select.EPOLLOUT) 208 now = time.monotonic() 209 events = ep.poll(1, 4) 210 then = time.monotonic() 211 self.assertFalse(then - now > 0.01) 212 213 expected = [(server.fileno(), select.EPOLLOUT)] 214 self.assertEqual(events, expected) 215 216 def test_errors(self): 217 self.assertRaises(ValueError, select.epoll, -2) 218 self.assertRaises(ValueError, select.epoll().register, -1, 219 select.EPOLLIN) 220 221 def test_unregister_closed(self): 222 server, client = self._connected_pair() 223 fd = server.fileno() 224 ep = select.epoll(16) 225 ep.register(server) 226 227 now = time.monotonic() 228 events = ep.poll(1, 4) 229 then = time.monotonic() 230 self.assertFalse(then - now > 0.01) 231 232 server.close() 233 234 with self.assertRaises(OSError) as cm: 235 ep.unregister(fd) 236 self.assertEqual(cm.exception.errno, errno.EBADF) 237 238 def test_close(self): 239 open_file = open(__file__, "rb") 240 self.addCleanup(open_file.close) 241 fd = open_file.fileno() 242 epoll = select.epoll() 243 244 # test fileno() method and closed attribute 245 self.assertIsInstance(epoll.fileno(), int) 246 self.assertFalse(epoll.closed) 247 248 # test close() 249 epoll.close() 250 self.assertTrue(epoll.closed) 251 self.assertRaises(ValueError, epoll.fileno) 252 253 # close() can be called more than once 254 epoll.close() 255 256 # operations must fail with ValueError("I/O operation on closed ...") 257 self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN) 258 self.assertRaises(ValueError, epoll.poll, 1.0) 259 self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN) 260 self.assertRaises(ValueError, epoll.unregister, fd) 261 262 def test_fd_non_inheritable(self): 263 epoll = select.epoll() 264 self.addCleanup(epoll.close) 265 self.assertEqual(os.get_inheritable(epoll.fileno()), False) 266 267 268if __name__ == "__main__": 269 unittest.main() 270