1# Copyright (c) 2001-2006 Twisted Matrix Laboratories. 2# 3# Permission is hereby granted, free of charge, to any person obtaining 4# a copy of this software and associated documentation files (the 5# "Software"), to deal in the Software without restriction, including 6# without limitation the rights to use, copy, modify, merge, publish, 7# distribute, sublicense, and/or sell copies of the Software, and to 8# permit persons to whom the Software is furnished to do so, subject to 9# the following conditions: 10# 11# The above copyright notice and this permission notice shall be 12# included in all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 15# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 17# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 19# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 20# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 21""" 22Tests for epoll wrapper. 23""" 24import errno 25import os 26import select 27import socket 28import time 29import unittest 30 31if not hasattr(select, "epoll"): 32 raise unittest.SkipTest("test works only on Linux 2.6") 33 34try: 35 select.epoll() 36except OSError as e: 37 if e.errno == errno.ENOSYS: 38 raise unittest.SkipTest("kernel doesn't support epoll()") 39 raise 40 41class TestEPoll(unittest.TestCase): 42 43 def setUp(self): 44 self.serverSocket = socket.create_server(('127.0.0.1', 0)) 45 self.connections = [self.serverSocket] 46 47 def tearDown(self): 48 for skt in self.connections: 49 skt.close() 50 51 def _connected_pair(self): 52 client = socket.socket() 53 client.setblocking(False) 54 try: 55 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1])) 56 except OSError as e: 57 self.assertEqual(e.args[0], errno.EINPROGRESS) 58 else: 59 raise AssertionError("Connect should have raised EINPROGRESS") 60 server, addr = self.serverSocket.accept() 61 62 self.connections.extend((client, server)) 63 return client, server 64 65 def test_create(self): 66 try: 67 ep = select.epoll(16) 68 except OSError as e: 69 raise AssertionError(str(e)) 70 self.assertTrue(ep.fileno() > 0, ep.fileno()) 71 self.assertTrue(not ep.closed) 72 ep.close() 73 self.assertTrue(ep.closed) 74 self.assertRaises(ValueError, ep.fileno) 75 76 if hasattr(select, "EPOLL_CLOEXEC"): 77 select.epoll(-1, select.EPOLL_CLOEXEC).close() 78 select.epoll(flags=select.EPOLL_CLOEXEC).close() 79 select.epoll(flags=0).close() 80 81 def test_badcreate(self): 82 self.assertRaises(TypeError, select.epoll, 1, 2, 3) 83 self.assertRaises(TypeError, select.epoll, 'foo') 84 self.assertRaises(TypeError, select.epoll, None) 85 self.assertRaises(TypeError, select.epoll, ()) 86 self.assertRaises(TypeError, select.epoll, ['foo']) 87 self.assertRaises(TypeError, select.epoll, {}) 88 89 self.assertRaises(ValueError, select.epoll, 0) 90 self.assertRaises(ValueError, select.epoll, -2) 91 self.assertRaises(ValueError, select.epoll, sizehint=-2) 92 93 if hasattr(select, "EPOLL_CLOEXEC"): 94 self.assertRaises(OSError, select.epoll, flags=12356) 95 96 def test_context_manager(self): 97 with select.epoll(16) as ep: 98 self.assertGreater(ep.fileno(), 0) 99 self.assertFalse(ep.closed) 100 self.assertTrue(ep.closed) 101 self.assertRaises(ValueError, ep.fileno) 102 103 def test_add(self): 104 server, client = self._connected_pair() 105 106 ep = select.epoll(2) 107 try: 108 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) 109 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) 110 finally: 111 ep.close() 112 113 # adding by object w/ fileno works, too. 114 ep = select.epoll(2) 115 try: 116 ep.register(server, select.EPOLLIN | select.EPOLLOUT) 117 ep.register(client, select.EPOLLIN | select.EPOLLOUT) 118 finally: 119 ep.close() 120 121 ep = select.epoll(2) 122 try: 123 # TypeError: argument must be an int, or have a fileno() method. 124 self.assertRaises(TypeError, ep.register, object(), 125 select.EPOLLIN | select.EPOLLOUT) 126 self.assertRaises(TypeError, ep.register, None, 127 select.EPOLLIN | select.EPOLLOUT) 128 # ValueError: file descriptor cannot be a negative integer (-1) 129 self.assertRaises(ValueError, ep.register, -1, 130 select.EPOLLIN | select.EPOLLOUT) 131 # OSError: [Errno 9] Bad file descriptor 132 self.assertRaises(OSError, ep.register, 10000, 133 select.EPOLLIN | select.EPOLLOUT) 134 # registering twice also raises an exception 135 ep.register(server, select.EPOLLIN | select.EPOLLOUT) 136 self.assertRaises(OSError, ep.register, server, 137 select.EPOLLIN | select.EPOLLOUT) 138 finally: 139 ep.close() 140 141 def test_fromfd(self): 142 server, client = self._connected_pair() 143 144 with select.epoll(2) as ep: 145 ep2 = select.epoll.fromfd(ep.fileno()) 146 147 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) 148 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) 149 150 events = ep.poll(1, 4) 151 events2 = ep2.poll(0.9, 4) 152 self.assertEqual(len(events), 2) 153 self.assertEqual(len(events2), 2) 154 155 try: 156 ep2.poll(1, 4) 157 except OSError as e: 158 self.assertEqual(e.args[0], errno.EBADF, e) 159 else: 160 self.fail("epoll on closed fd didn't raise EBADF") 161 162 def test_control_and_wait(self): 163 client, server = self._connected_pair() 164 165 ep = select.epoll(16) 166 ep.register(server.fileno(), 167 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) 168 ep.register(client.fileno(), 169 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) 170 171 now = time.monotonic() 172 events = ep.poll(1, 4) 173 then = time.monotonic() 174 self.assertFalse(then - now > 0.1, then - now) 175 176 events.sort() 177 expected = [(client.fileno(), select.EPOLLOUT), 178 (server.fileno(), select.EPOLLOUT)] 179 expected.sort() 180 181 self.assertEqual(events, expected) 182 183 events = ep.poll(timeout=2.1, maxevents=4) 184 self.assertFalse(events) 185 186 client.send(b"Hello!") 187 server.send(b"world!!!") 188 189 now = time.monotonic() 190 events = ep.poll(1, 4) 191 then = time.monotonic() 192 self.assertFalse(then - now > 0.01) 193 194 events.sort() 195 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), 196 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] 197 expected.sort() 198 199 self.assertEqual(events, expected) 200 201 ep.unregister(client.fileno()) 202 ep.modify(server.fileno(), select.EPOLLOUT) 203 now = time.monotonic() 204 events = ep.poll(1, 4) 205 then = time.monotonic() 206 self.assertFalse(then - now > 0.01) 207 208 expected = [(server.fileno(), select.EPOLLOUT)] 209 self.assertEqual(events, expected) 210 211 def test_errors(self): 212 self.assertRaises(ValueError, select.epoll, -2) 213 self.assertRaises(ValueError, select.epoll().register, -1, 214 select.EPOLLIN) 215 216 def test_unregister_closed(self): 217 server, client = self._connected_pair() 218 fd = server.fileno() 219 ep = select.epoll(16) 220 ep.register(server) 221 222 now = time.monotonic() 223 events = ep.poll(1, 4) 224 then = time.monotonic() 225 self.assertFalse(then - now > 0.01) 226 227 server.close() 228 ep.unregister(fd) 229 230 def test_close(self): 231 open_file = open(__file__, "rb") 232 self.addCleanup(open_file.close) 233 fd = open_file.fileno() 234 epoll = select.epoll() 235 236 # test fileno() method and closed attribute 237 self.assertIsInstance(epoll.fileno(), int) 238 self.assertFalse(epoll.closed) 239 240 # test close() 241 epoll.close() 242 self.assertTrue(epoll.closed) 243 self.assertRaises(ValueError, epoll.fileno) 244 245 # close() can be called more than once 246 epoll.close() 247 248 # operations must fail with ValueError("I/O operation on closed ...") 249 self.assertRaises(ValueError, epoll.modify, fd, select.EPOLLIN) 250 self.assertRaises(ValueError, epoll.poll, 1.0) 251 self.assertRaises(ValueError, epoll.register, fd, select.EPOLLIN) 252 self.assertRaises(ValueError, epoll.unregister, fd) 253 254 def test_fd_non_inheritable(self): 255 epoll = select.epoll() 256 self.addCleanup(epoll.close) 257 self.assertEqual(os.get_inheritable(epoll.fileno()), False) 258 259 260if __name__ == "__main__": 261 unittest.main() 262