1import asyncore 2import unittest 3import select 4import os 5import socket 6import sys 7import time 8import warnings 9import errno 10import struct 11 12from test import test_support 13from test.test_support import TESTFN, run_unittest, unlink, HOST 14from StringIO import StringIO 15 16try: 17 import threading 18except ImportError: 19 threading = None 20 21 22class dummysocket: 23 def __init__(self): 24 self.closed = False 25 26 def close(self): 27 self.closed = True 28 29 def fileno(self): 30 return 42 31 32class dummychannel: 33 def __init__(self): 34 self.socket = dummysocket() 35 36 def close(self): 37 self.socket.close() 38 39class exitingdummy: 40 def __init__(self): 41 pass 42 43 def handle_read_event(self): 44 raise asyncore.ExitNow() 45 46 handle_write_event = handle_read_event 47 handle_close = handle_read_event 48 handle_expt_event = handle_read_event 49 50class crashingdummy: 51 def __init__(self): 52 self.error_handled = False 53 54 def handle_read_event(self): 55 raise Exception() 56 57 handle_write_event = handle_read_event 58 handle_close = handle_read_event 59 handle_expt_event = handle_read_event 60 61 def handle_error(self): 62 self.error_handled = True 63 64# used when testing senders; just collects what it gets until newline is sent 65def capture_server(evt, buf, serv): 66 try: 67 serv.listen(5) 68 conn, addr = serv.accept() 69 except socket.timeout: 70 pass 71 else: 72 n = 200 73 while n > 0: 74 r, w, e = select.select([conn], [], []) 75 if r: 76 data = conn.recv(10) 77 # keep everything except for the newline terminator 78 buf.write(data.replace('\n', '')) 79 if '\n' in data: 80 break 81 n -= 1 82 time.sleep(0.01) 83 84 conn.close() 85 finally: 86 serv.close() 87 evt.set() 88 89 90class HelperFunctionTests(unittest.TestCase): 91 def test_readwriteexc(self): 92 # Check exception handling behavior of read, write and _exception 93 94 # check that ExitNow exceptions in the object handler method 95 # bubbles all the way up through asyncore read/write/_exception calls 96 tr1 = exitingdummy() 97 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) 98 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) 99 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) 100 101 # check that an exception other than ExitNow in the object handler 102 # method causes the handle_error method to get called 103 tr2 = crashingdummy() 104 asyncore.read(tr2) 105 self.assertEqual(tr2.error_handled, True) 106 107 tr2 = crashingdummy() 108 asyncore.write(tr2) 109 self.assertEqual(tr2.error_handled, True) 110 111 tr2 = crashingdummy() 112 asyncore._exception(tr2) 113 self.assertEqual(tr2.error_handled, True) 114 115 # asyncore.readwrite uses constants in the select module that 116 # are not present in Windows systems (see this thread: 117 # http://mail.python.org/pipermail/python-list/2001-October/109973.html) 118 # These constants should be present as long as poll is available 119 120 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 121 def test_readwrite(self): 122 # Check that correct methods are called by readwrite() 123 124 attributes = ('read', 'expt', 'write', 'closed', 'error_handled') 125 126 expected = ( 127 (select.POLLIN, 'read'), 128 (select.POLLPRI, 'expt'), 129 (select.POLLOUT, 'write'), 130 (select.POLLERR, 'closed'), 131 (select.POLLHUP, 'closed'), 132 (select.POLLNVAL, 'closed'), 133 ) 134 135 class testobj: 136 def __init__(self): 137 self.read = False 138 self.write = False 139 self.closed = False 140 self.expt = False 141 self.error_handled = False 142 143 def handle_read_event(self): 144 self.read = True 145 146 def handle_write_event(self): 147 self.write = True 148 149 def handle_close(self): 150 self.closed = True 151 152 def handle_expt_event(self): 153 self.expt = True 154 155 def handle_error(self): 156 self.error_handled = True 157 158 for flag, expectedattr in expected: 159 tobj = testobj() 160 self.assertEqual(getattr(tobj, expectedattr), False) 161 asyncore.readwrite(tobj, flag) 162 163 # Only the attribute modified by the routine we expect to be 164 # called should be True. 165 for attr in attributes: 166 self.assertEqual(getattr(tobj, attr), attr==expectedattr) 167 168 # check that ExitNow exceptions in the object handler method 169 # bubbles all the way up through asyncore readwrite call 170 tr1 = exitingdummy() 171 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) 172 173 # check that an exception other than ExitNow in the object handler 174 # method causes the handle_error method to get called 175 tr2 = crashingdummy() 176 self.assertEqual(tr2.error_handled, False) 177 asyncore.readwrite(tr2, flag) 178 self.assertEqual(tr2.error_handled, True) 179 180 def test_closeall(self): 181 self.closeall_check(False) 182 183 def test_closeall_default(self): 184 self.closeall_check(True) 185 186 def closeall_check(self, usedefault): 187 # Check that close_all() closes everything in a given map 188 189 l = [] 190 testmap = {} 191 for i in range(10): 192 c = dummychannel() 193 l.append(c) 194 self.assertEqual(c.socket.closed, False) 195 testmap[i] = c 196 197 if usedefault: 198 socketmap = asyncore.socket_map 199 try: 200 asyncore.socket_map = testmap 201 asyncore.close_all() 202 finally: 203 testmap, asyncore.socket_map = asyncore.socket_map, socketmap 204 else: 205 asyncore.close_all(testmap) 206 207 self.assertEqual(len(testmap), 0) 208 209 for c in l: 210 self.assertEqual(c.socket.closed, True) 211 212 def test_compact_traceback(self): 213 try: 214 raise Exception("I don't like spam!") 215 except: 216 real_t, real_v, real_tb = sys.exc_info() 217 r = asyncore.compact_traceback() 218 else: 219 self.fail("Expected exception") 220 221 (f, function, line), t, v, info = r 222 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') 223 self.assertEqual(function, 'test_compact_traceback') 224 self.assertEqual(t, real_t) 225 self.assertEqual(v, real_v) 226 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) 227 228 229class DispatcherTests(unittest.TestCase): 230 def setUp(self): 231 pass 232 233 def tearDown(self): 234 asyncore.close_all() 235 236 def test_basic(self): 237 d = asyncore.dispatcher() 238 self.assertEqual(d.readable(), True) 239 self.assertEqual(d.writable(), True) 240 241 def test_repr(self): 242 d = asyncore.dispatcher() 243 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) 244 245 def test_log(self): 246 d = asyncore.dispatcher() 247 248 # capture output of dispatcher.log() (to stderr) 249 fp = StringIO() 250 stderr = sys.stderr 251 l1 = "Lovely spam! Wonderful spam!" 252 l2 = "I don't like spam!" 253 try: 254 sys.stderr = fp 255 d.log(l1) 256 d.log(l2) 257 finally: 258 sys.stderr = stderr 259 260 lines = fp.getvalue().splitlines() 261 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) 262 263 def test_log_info(self): 264 d = asyncore.dispatcher() 265 266 # capture output of dispatcher.log_info() (to stdout via print) 267 fp = StringIO() 268 stdout = sys.stdout 269 l1 = "Have you got anything without spam?" 270 l2 = "Why can't she have egg bacon spam and sausage?" 271 l3 = "THAT'S got spam in it!" 272 try: 273 sys.stdout = fp 274 d.log_info(l1, 'EGGS') 275 d.log_info(l2) 276 d.log_info(l3, 'SPAM') 277 finally: 278 sys.stdout = stdout 279 280 lines = fp.getvalue().splitlines() 281 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] 282 283 self.assertEqual(lines, expected) 284 285 def test_unhandled(self): 286 d = asyncore.dispatcher() 287 d.ignore_log_types = () 288 289 # capture output of dispatcher.log_info() (to stdout via print) 290 fp = StringIO() 291 stdout = sys.stdout 292 try: 293 sys.stdout = fp 294 d.handle_expt() 295 d.handle_read() 296 d.handle_write() 297 d.handle_connect() 298 d.handle_accept() 299 finally: 300 sys.stdout = stdout 301 302 lines = fp.getvalue().splitlines() 303 expected = ['warning: unhandled incoming priority event', 304 'warning: unhandled read event', 305 'warning: unhandled write event', 306 'warning: unhandled connect event', 307 'warning: unhandled accept event'] 308 self.assertEqual(lines, expected) 309 310 def test_issue_8594(self): 311 # XXX - this test is supposed to be removed in next major Python 312 # version 313 d = asyncore.dispatcher(socket.socket()) 314 # make sure the error message no longer refers to the socket 315 # object but the dispatcher instance instead 316 self.assertRaisesRegexp(AttributeError, 'dispatcher instance', 317 getattr, d, 'foo') 318 # cheap inheritance with the underlying socket is supposed 319 # to still work but a DeprecationWarning is expected 320 with warnings.catch_warnings(record=True) as w: 321 warnings.simplefilter("always") 322 family = d.family 323 self.assertEqual(family, socket.AF_INET) 324 self.assertEqual(len(w), 1) 325 self.assertTrue(issubclass(w[0].category, DeprecationWarning)) 326 327 def test_strerror(self): 328 # refers to bug #8573 329 err = asyncore._strerror(errno.EPERM) 330 if hasattr(os, 'strerror'): 331 self.assertEqual(err, os.strerror(errno.EPERM)) 332 err = asyncore._strerror(-1) 333 self.assertTrue(err != "") 334 335 336class dispatcherwithsend_noread(asyncore.dispatcher_with_send): 337 def readable(self): 338 return False 339 340 def handle_connect(self): 341 pass 342 343class DispatcherWithSendTests(unittest.TestCase): 344 usepoll = False 345 346 def setUp(self): 347 pass 348 349 def tearDown(self): 350 asyncore.close_all() 351 352 @unittest.skipUnless(threading, 'Threading required for this test.') 353 @test_support.reap_threads 354 def test_send(self): 355 evt = threading.Event() 356 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 357 sock.settimeout(3) 358 port = test_support.bind_port(sock) 359 360 cap = StringIO() 361 args = (evt, cap, sock) 362 t = threading.Thread(target=capture_server, args=args) 363 t.start() 364 try: 365 # wait a little longer for the server to initialize (it sometimes 366 # refuses connections on slow machines without this wait) 367 time.sleep(0.2) 368 369 data = "Suppose there isn't a 16-ton weight?" 370 d = dispatcherwithsend_noread() 371 d.create_socket(socket.AF_INET, socket.SOCK_STREAM) 372 d.connect((HOST, port)) 373 374 # give time for socket to connect 375 time.sleep(0.1) 376 377 d.send(data) 378 d.send(data) 379 d.send('\n') 380 381 n = 1000 382 while d.out_buffer and n > 0: 383 asyncore.poll() 384 n -= 1 385 386 evt.wait() 387 388 self.assertEqual(cap.getvalue(), data*2) 389 finally: 390 t.join() 391 392 393class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): 394 usepoll = True 395 396@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 397 'asyncore.file_wrapper required') 398class FileWrapperTest(unittest.TestCase): 399 def setUp(self): 400 self.d = "It's not dead, it's sleeping!" 401 with file(TESTFN, 'w') as h: 402 h.write(self.d) 403 404 def tearDown(self): 405 unlink(TESTFN) 406 407 def test_recv(self): 408 fd = os.open(TESTFN, os.O_RDONLY) 409 w = asyncore.file_wrapper(fd) 410 os.close(fd) 411 412 self.assertNotEqual(w.fd, fd) 413 self.assertNotEqual(w.fileno(), fd) 414 self.assertEqual(w.recv(13), "It's not dead") 415 self.assertEqual(w.read(6), ", it's") 416 w.close() 417 self.assertRaises(OSError, w.read, 1) 418 419 420 def test_send(self): 421 d1 = "Come again?" 422 d2 = "I want to buy some cheese." 423 fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) 424 w = asyncore.file_wrapper(fd) 425 os.close(fd) 426 427 w.write(d1) 428 w.send(d2) 429 w.close() 430 self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) 431 432 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 433 'asyncore.file_dispatcher required') 434 def test_dispatcher(self): 435 fd = os.open(TESTFN, os.O_RDONLY) 436 data = [] 437 class FileDispatcher(asyncore.file_dispatcher): 438 def handle_read(self): 439 data.append(self.recv(29)) 440 s = FileDispatcher(fd) 441 os.close(fd) 442 asyncore.loop(timeout=0.01, use_poll=True, count=2) 443 self.assertEqual(b"".join(data), self.d) 444 445 446class BaseTestHandler(asyncore.dispatcher): 447 448 def __init__(self, sock=None): 449 asyncore.dispatcher.__init__(self, sock) 450 self.flag = False 451 452 def handle_accept(self): 453 raise Exception("handle_accept not supposed to be called") 454 455 def handle_connect(self): 456 raise Exception("handle_connect not supposed to be called") 457 458 def handle_expt(self): 459 raise Exception("handle_expt not supposed to be called") 460 461 def handle_close(self): 462 raise Exception("handle_close not supposed to be called") 463 464 def handle_error(self): 465 raise 466 467 468class TCPServer(asyncore.dispatcher): 469 """A server which listens on an address and dispatches the 470 connection to a handler. 471 """ 472 473 def __init__(self, handler=BaseTestHandler, host=HOST, port=0): 474 asyncore.dispatcher.__init__(self) 475 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 476 self.set_reuse_addr() 477 self.bind((host, port)) 478 self.listen(5) 479 self.handler = handler 480 481 @property 482 def address(self): 483 return self.socket.getsockname()[:2] 484 485 def handle_accept(self): 486 pair = self.accept() 487 if pair is not None: 488 self.handler(pair[0]) 489 490 def handle_error(self): 491 raise 492 493 494class BaseClient(BaseTestHandler): 495 496 def __init__(self, address): 497 BaseTestHandler.__init__(self) 498 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 499 self.connect(address) 500 501 def handle_connect(self): 502 pass 503 504 505class BaseTestAPI(unittest.TestCase): 506 507 def tearDown(self): 508 asyncore.close_all() 509 510 def loop_waiting_for_flag(self, instance, timeout=5): 511 timeout = float(timeout) / 100 512 count = 100 513 while asyncore.socket_map and count > 0: 514 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) 515 if instance.flag: 516 return 517 count -= 1 518 time.sleep(timeout) 519 self.fail("flag not set") 520 521 def test_handle_connect(self): 522 # make sure handle_connect is called on connect() 523 524 class TestClient(BaseClient): 525 def handle_connect(self): 526 self.flag = True 527 528 server = TCPServer() 529 client = TestClient(server.address) 530 self.loop_waiting_for_flag(client) 531 532 def test_handle_accept(self): 533 # make sure handle_accept() is called when a client connects 534 535 class TestListener(BaseTestHandler): 536 537 def __init__(self): 538 BaseTestHandler.__init__(self) 539 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 540 self.bind((HOST, 0)) 541 self.listen(5) 542 self.address = self.socket.getsockname()[:2] 543 544 def handle_accept(self): 545 self.flag = True 546 547 server = TestListener() 548 client = BaseClient(server.address) 549 self.loop_waiting_for_flag(server) 550 551 def test_handle_read(self): 552 # make sure handle_read is called on data received 553 554 class TestClient(BaseClient): 555 def handle_read(self): 556 self.flag = True 557 558 class TestHandler(BaseTestHandler): 559 def __init__(self, conn): 560 BaseTestHandler.__init__(self, conn) 561 self.send('x' * 1024) 562 563 server = TCPServer(TestHandler) 564 client = TestClient(server.address) 565 self.loop_waiting_for_flag(client) 566 567 def test_handle_write(self): 568 # make sure handle_write is called 569 570 class TestClient(BaseClient): 571 def handle_write(self): 572 self.flag = True 573 574 server = TCPServer() 575 client = TestClient(server.address) 576 self.loop_waiting_for_flag(client) 577 578 def test_handle_close(self): 579 # make sure handle_close is called when the other end closes 580 # the connection 581 582 class TestClient(BaseClient): 583 584 def handle_read(self): 585 # in order to make handle_close be called we are supposed 586 # to make at least one recv() call 587 self.recv(1024) 588 589 def handle_close(self): 590 self.flag = True 591 self.close() 592 593 class TestHandler(BaseTestHandler): 594 def __init__(self, conn): 595 BaseTestHandler.__init__(self, conn) 596 self.close() 597 598 server = TCPServer(TestHandler) 599 client = TestClient(server.address) 600 self.loop_waiting_for_flag(client) 601 602 @unittest.skipIf(sys.platform.startswith("sunos"), 603 "OOB support is broken on Solaris") 604 def test_handle_expt(self): 605 # Make sure handle_expt is called on OOB data received. 606 # Note: this might fail on some platforms as OOB data is 607 # tenuously supported and rarely used. 608 609 class TestClient(BaseClient): 610 def handle_expt(self): 611 self.flag = True 612 613 class TestHandler(BaseTestHandler): 614 def __init__(self, conn): 615 BaseTestHandler.__init__(self, conn) 616 self.socket.send(chr(244), socket.MSG_OOB) 617 618 server = TCPServer(TestHandler) 619 client = TestClient(server.address) 620 self.loop_waiting_for_flag(client) 621 622 def test_handle_error(self): 623 624 class TestClient(BaseClient): 625 def handle_write(self): 626 1.0 / 0 627 def handle_error(self): 628 self.flag = True 629 try: 630 raise 631 except ZeroDivisionError: 632 pass 633 else: 634 raise Exception("exception not raised") 635 636 server = TCPServer() 637 client = TestClient(server.address) 638 self.loop_waiting_for_flag(client) 639 640 def test_connection_attributes(self): 641 server = TCPServer() 642 client = BaseClient(server.address) 643 644 # we start disconnected 645 self.assertFalse(server.connected) 646 self.assertTrue(server.accepting) 647 # this can't be taken for granted across all platforms 648 #self.assertFalse(client.connected) 649 self.assertFalse(client.accepting) 650 651 # execute some loops so that client connects to server 652 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) 653 self.assertFalse(server.connected) 654 self.assertTrue(server.accepting) 655 self.assertTrue(client.connected) 656 self.assertFalse(client.accepting) 657 658 # disconnect the client 659 client.close() 660 self.assertFalse(server.connected) 661 self.assertTrue(server.accepting) 662 self.assertFalse(client.connected) 663 self.assertFalse(client.accepting) 664 665 # stop serving 666 server.close() 667 self.assertFalse(server.connected) 668 self.assertFalse(server.accepting) 669 670 def test_create_socket(self): 671 s = asyncore.dispatcher() 672 s.create_socket(socket.AF_INET, socket.SOCK_STREAM) 673 self.assertEqual(s.socket.family, socket.AF_INET) 674 self.assertEqual(s.socket.type, socket.SOCK_STREAM) 675 676 def test_bind(self): 677 s1 = asyncore.dispatcher() 678 s1.create_socket(socket.AF_INET, socket.SOCK_STREAM) 679 s1.bind((HOST, 0)) 680 s1.listen(5) 681 port = s1.socket.getsockname()[1] 682 683 s2 = asyncore.dispatcher() 684 s2.create_socket(socket.AF_INET, socket.SOCK_STREAM) 685 # EADDRINUSE indicates the socket was correctly bound 686 self.assertRaises(socket.error, s2.bind, (HOST, port)) 687 688 def test_set_reuse_addr(self): 689 sock = socket.socket() 690 try: 691 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 692 except socket.error: 693 unittest.skip("SO_REUSEADDR not supported on this platform") 694 else: 695 # if SO_REUSEADDR succeeded for sock we expect asyncore 696 # to do the same 697 s = asyncore.dispatcher(socket.socket()) 698 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, 699 socket.SO_REUSEADDR)) 700 s.create_socket(socket.AF_INET, socket.SOCK_STREAM) 701 s.set_reuse_addr() 702 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, 703 socket.SO_REUSEADDR)) 704 finally: 705 sock.close() 706 707 @unittest.skipUnless(threading, 'Threading required for this test.') 708 @test_support.reap_threads 709 def test_quick_connect(self): 710 # see: http://bugs.python.org/issue10340 711 server = TCPServer() 712 t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500)) 713 t.start() 714 self.addCleanup(t.join) 715 716 for x in xrange(20): 717 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 718 s.settimeout(.2) 719 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 720 struct.pack('ii', 1, 0)) 721 try: 722 s.connect(server.address) 723 except socket.error: 724 pass 725 finally: 726 s.close() 727 728 729class TestAPI_UseSelect(BaseTestAPI): 730 use_poll = False 731 732@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 733class TestAPI_UsePoll(BaseTestAPI): 734 use_poll = True 735 736 737def test_main(): 738 tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, 739 DispatcherWithSendTests_UsePoll, TestAPI_UseSelect, 740 TestAPI_UsePoll, FileWrapperTest] 741 run_unittest(*tests) 742 743if __name__ == "__main__": 744 test_main() 745