• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import asyncore
2import unittest
3import select
4import os
5import socket
6import sys
7import time
8import warnings
9import errno
10import struct
11
12from test import test_support
13from test.test_support import TESTFN, run_unittest, unlink, HOST
14from StringIO import StringIO
15
16try:
17    import threading
18except ImportError:
19    threading = None
20
21
22class dummysocket:
23    def __init__(self):
24        self.closed = False
25
26    def close(self):
27        self.closed = True
28
29    def fileno(self):
30        return 42
31
32class dummychannel:
33    def __init__(self):
34        self.socket = dummysocket()
35
36    def close(self):
37        self.socket.close()
38
39class exitingdummy:
40    def __init__(self):
41        pass
42
43    def handle_read_event(self):
44        raise asyncore.ExitNow()
45
46    handle_write_event = handle_read_event
47    handle_close = handle_read_event
48    handle_expt_event = handle_read_event
49
50class crashingdummy:
51    def __init__(self):
52        self.error_handled = False
53
54    def handle_read_event(self):
55        raise Exception()
56
57    handle_write_event = handle_read_event
58    handle_close = handle_read_event
59    handle_expt_event = handle_read_event
60
61    def handle_error(self):
62        self.error_handled = True
63
64# used when testing senders; just collects what it gets until newline is sent
65def capture_server(evt, buf, serv):
66    try:
67        serv.listen(5)
68        conn, addr = serv.accept()
69    except socket.timeout:
70        pass
71    else:
72        n = 200
73        while n > 0:
74            r, w, e = select.select([conn], [], [])
75            if r:
76                data = conn.recv(10)
77                # keep everything except for the newline terminator
78                buf.write(data.replace('\n', ''))
79                if '\n' in data:
80                    break
81            n -= 1
82            time.sleep(0.01)
83
84        conn.close()
85    finally:
86        serv.close()
87        evt.set()
88
89
90class HelperFunctionTests(unittest.TestCase):
91    def test_readwriteexc(self):
92        # Check exception handling behavior of read, write and _exception
93
94        # check that ExitNow exceptions in the object handler method
95        # bubbles all the way up through asyncore read/write/_exception calls
96        tr1 = exitingdummy()
97        self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
98        self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
99        self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
100
101        # check that an exception other than ExitNow in the object handler
102        # method causes the handle_error method to get called
103        tr2 = crashingdummy()
104        asyncore.read(tr2)
105        self.assertEqual(tr2.error_handled, True)
106
107        tr2 = crashingdummy()
108        asyncore.write(tr2)
109        self.assertEqual(tr2.error_handled, True)
110
111        tr2 = crashingdummy()
112        asyncore._exception(tr2)
113        self.assertEqual(tr2.error_handled, True)
114
115    # asyncore.readwrite uses constants in the select module that
116    # are not present in Windows systems (see this thread:
117    # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
118    # These constants should be present as long as poll is available
119
120    @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
121    def test_readwrite(self):
122        # Check that correct methods are called by readwrite()
123
124        attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
125
126        expected = (
127            (select.POLLIN, 'read'),
128            (select.POLLPRI, 'expt'),
129            (select.POLLOUT, 'write'),
130            (select.POLLERR, 'closed'),
131            (select.POLLHUP, 'closed'),
132            (select.POLLNVAL, 'closed'),
133            )
134
135        class testobj:
136            def __init__(self):
137                self.read = False
138                self.write = False
139                self.closed = False
140                self.expt = False
141                self.error_handled = False
142
143            def handle_read_event(self):
144                self.read = True
145
146            def handle_write_event(self):
147                self.write = True
148
149            def handle_close(self):
150                self.closed = True
151
152            def handle_expt_event(self):
153                self.expt = True
154
155            def handle_error(self):
156                self.error_handled = True
157
158        for flag, expectedattr in expected:
159            tobj = testobj()
160            self.assertEqual(getattr(tobj, expectedattr), False)
161            asyncore.readwrite(tobj, flag)
162
163            # Only the attribute modified by the routine we expect to be
164            # called should be True.
165            for attr in attributes:
166                self.assertEqual(getattr(tobj, attr), attr==expectedattr)
167
168            # check that ExitNow exceptions in the object handler method
169            # bubbles all the way up through asyncore readwrite call
170            tr1 = exitingdummy()
171            self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
172
173            # check that an exception other than ExitNow in the object handler
174            # method causes the handle_error method to get called
175            tr2 = crashingdummy()
176            self.assertEqual(tr2.error_handled, False)
177            asyncore.readwrite(tr2, flag)
178            self.assertEqual(tr2.error_handled, True)
179
180    def test_closeall(self):
181        self.closeall_check(False)
182
183    def test_closeall_default(self):
184        self.closeall_check(True)
185
186    def closeall_check(self, usedefault):
187        # Check that close_all() closes everything in a given map
188
189        l = []
190        testmap = {}
191        for i in range(10):
192            c = dummychannel()
193            l.append(c)
194            self.assertEqual(c.socket.closed, False)
195            testmap[i] = c
196
197        if usedefault:
198            socketmap = asyncore.socket_map
199            try:
200                asyncore.socket_map = testmap
201                asyncore.close_all()
202            finally:
203                testmap, asyncore.socket_map = asyncore.socket_map, socketmap
204        else:
205            asyncore.close_all(testmap)
206
207        self.assertEqual(len(testmap), 0)
208
209        for c in l:
210            self.assertEqual(c.socket.closed, True)
211
212    def test_compact_traceback(self):
213        try:
214            raise Exception("I don't like spam!")
215        except:
216            real_t, real_v, real_tb = sys.exc_info()
217            r = asyncore.compact_traceback()
218        else:
219            self.fail("Expected exception")
220
221        (f, function, line), t, v, info = r
222        self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
223        self.assertEqual(function, 'test_compact_traceback')
224        self.assertEqual(t, real_t)
225        self.assertEqual(v, real_v)
226        self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
227
228
229class DispatcherTests(unittest.TestCase):
230    def setUp(self):
231        pass
232
233    def tearDown(self):
234        asyncore.close_all()
235
236    def test_basic(self):
237        d = asyncore.dispatcher()
238        self.assertEqual(d.readable(), True)
239        self.assertEqual(d.writable(), True)
240
241    def test_repr(self):
242        d = asyncore.dispatcher()
243        self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
244
245    def test_log(self):
246        d = asyncore.dispatcher()
247
248        # capture output of dispatcher.log() (to stderr)
249        fp = StringIO()
250        stderr = sys.stderr
251        l1 = "Lovely spam! Wonderful spam!"
252        l2 = "I don't like spam!"
253        try:
254            sys.stderr = fp
255            d.log(l1)
256            d.log(l2)
257        finally:
258            sys.stderr = stderr
259
260        lines = fp.getvalue().splitlines()
261        self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
262
263    def test_log_info(self):
264        d = asyncore.dispatcher()
265
266        # capture output of dispatcher.log_info() (to stdout via print)
267        fp = StringIO()
268        stdout = sys.stdout
269        l1 = "Have you got anything without spam?"
270        l2 = "Why can't she have egg bacon spam and sausage?"
271        l3 = "THAT'S got spam in it!"
272        try:
273            sys.stdout = fp
274            d.log_info(l1, 'EGGS')
275            d.log_info(l2)
276            d.log_info(l3, 'SPAM')
277        finally:
278            sys.stdout = stdout
279
280        lines = fp.getvalue().splitlines()
281        expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
282
283        self.assertEqual(lines, expected)
284
285    def test_unhandled(self):
286        d = asyncore.dispatcher()
287        d.ignore_log_types = ()
288
289        # capture output of dispatcher.log_info() (to stdout via print)
290        fp = StringIO()
291        stdout = sys.stdout
292        try:
293            sys.stdout = fp
294            d.handle_expt()
295            d.handle_read()
296            d.handle_write()
297            d.handle_connect()
298            d.handle_accept()
299        finally:
300            sys.stdout = stdout
301
302        lines = fp.getvalue().splitlines()
303        expected = ['warning: unhandled incoming priority event',
304                    'warning: unhandled read event',
305                    'warning: unhandled write event',
306                    'warning: unhandled connect event',
307                    'warning: unhandled accept event']
308        self.assertEqual(lines, expected)
309
310    def test_issue_8594(self):
311        # XXX - this test is supposed to be removed in next major Python
312        # version
313        d = asyncore.dispatcher(socket.socket())
314        # make sure the error message no longer refers to the socket
315        # object but the dispatcher instance instead
316        self.assertRaisesRegexp(AttributeError, 'dispatcher instance',
317                                getattr, d, 'foo')
318        # cheap inheritance with the underlying socket is supposed
319        # to still work but a DeprecationWarning is expected
320        with warnings.catch_warnings(record=True) as w:
321            warnings.simplefilter("always")
322            family = d.family
323            self.assertEqual(family, socket.AF_INET)
324            self.assertEqual(len(w), 1)
325            self.assertTrue(issubclass(w[0].category, DeprecationWarning))
326
327    def test_strerror(self):
328        # refers to bug #8573
329        err = asyncore._strerror(errno.EPERM)
330        if hasattr(os, 'strerror'):
331            self.assertEqual(err, os.strerror(errno.EPERM))
332        err = asyncore._strerror(-1)
333        self.assertTrue(err != "")
334
335
336class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
337    def readable(self):
338        return False
339
340    def handle_connect(self):
341        pass
342
343class DispatcherWithSendTests(unittest.TestCase):
344    usepoll = False
345
346    def setUp(self):
347        pass
348
349    def tearDown(self):
350        asyncore.close_all()
351
352    @unittest.skipUnless(threading, 'Threading required for this test.')
353    @test_support.reap_threads
354    def test_send(self):
355        evt = threading.Event()
356        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
357        sock.settimeout(3)
358        port = test_support.bind_port(sock)
359
360        cap = StringIO()
361        args = (evt, cap, sock)
362        t = threading.Thread(target=capture_server, args=args)
363        t.start()
364        try:
365            # wait a little longer for the server to initialize (it sometimes
366            # refuses connections on slow machines without this wait)
367            time.sleep(0.2)
368
369            data = "Suppose there isn't a 16-ton weight?"
370            d = dispatcherwithsend_noread()
371            d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
372            d.connect((HOST, port))
373
374            # give time for socket to connect
375            time.sleep(0.1)
376
377            d.send(data)
378            d.send(data)
379            d.send('\n')
380
381            n = 1000
382            while d.out_buffer and n > 0:
383                asyncore.poll()
384                n -= 1
385
386            evt.wait()
387
388            self.assertEqual(cap.getvalue(), data*2)
389        finally:
390            t.join()
391
392
393class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
394    usepoll = True
395
396@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
397                     'asyncore.file_wrapper required')
398class FileWrapperTest(unittest.TestCase):
399    def setUp(self):
400        self.d = "It's not dead, it's sleeping!"
401        with file(TESTFN, 'w') as h:
402            h.write(self.d)
403
404    def tearDown(self):
405        unlink(TESTFN)
406
407    def test_recv(self):
408        fd = os.open(TESTFN, os.O_RDONLY)
409        w = asyncore.file_wrapper(fd)
410        os.close(fd)
411
412        self.assertNotEqual(w.fd, fd)
413        self.assertNotEqual(w.fileno(), fd)
414        self.assertEqual(w.recv(13), "It's not dead")
415        self.assertEqual(w.read(6), ", it's")
416        w.close()
417        self.assertRaises(OSError, w.read, 1)
418
419
420    def test_send(self):
421        d1 = "Come again?"
422        d2 = "I want to buy some cheese."
423        fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
424        w = asyncore.file_wrapper(fd)
425        os.close(fd)
426
427        w.write(d1)
428        w.send(d2)
429        w.close()
430        self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
431
432    @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
433                         'asyncore.file_dispatcher required')
434    def test_dispatcher(self):
435        fd = os.open(TESTFN, os.O_RDONLY)
436        data = []
437        class FileDispatcher(asyncore.file_dispatcher):
438            def handle_read(self):
439                data.append(self.recv(29))
440        s = FileDispatcher(fd)
441        os.close(fd)
442        asyncore.loop(timeout=0.01, use_poll=True, count=2)
443        self.assertEqual(b"".join(data), self.d)
444
445
446class BaseTestHandler(asyncore.dispatcher):
447
448    def __init__(self, sock=None):
449        asyncore.dispatcher.__init__(self, sock)
450        self.flag = False
451
452    def handle_accept(self):
453        raise Exception("handle_accept not supposed to be called")
454
455    def handle_connect(self):
456        raise Exception("handle_connect not supposed to be called")
457
458    def handle_expt(self):
459        raise Exception("handle_expt not supposed to be called")
460
461    def handle_close(self):
462        raise Exception("handle_close not supposed to be called")
463
464    def handle_error(self):
465        raise
466
467
468class TCPServer(asyncore.dispatcher):
469    """A server which listens on an address and dispatches the
470    connection to a handler.
471    """
472
473    def __init__(self, handler=BaseTestHandler, host=HOST, port=0):
474        asyncore.dispatcher.__init__(self)
475        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
476        self.set_reuse_addr()
477        self.bind((host, port))
478        self.listen(5)
479        self.handler = handler
480
481    @property
482    def address(self):
483        return self.socket.getsockname()[:2]
484
485    def handle_accept(self):
486        pair = self.accept()
487        if pair is not None:
488            self.handler(pair[0])
489
490    def handle_error(self):
491        raise
492
493
494class BaseClient(BaseTestHandler):
495
496    def __init__(self, address):
497        BaseTestHandler.__init__(self)
498        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
499        self.connect(address)
500
501    def handle_connect(self):
502        pass
503
504
505class BaseTestAPI(unittest.TestCase):
506
507    def tearDown(self):
508        asyncore.close_all()
509
510    def loop_waiting_for_flag(self, instance, timeout=5):
511        timeout = float(timeout) / 100
512        count = 100
513        while asyncore.socket_map and count > 0:
514            asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
515            if instance.flag:
516                return
517            count -= 1
518            time.sleep(timeout)
519        self.fail("flag not set")
520
521    def test_handle_connect(self):
522        # make sure handle_connect is called on connect()
523
524        class TestClient(BaseClient):
525            def handle_connect(self):
526                self.flag = True
527
528        server = TCPServer()
529        client = TestClient(server.address)
530        self.loop_waiting_for_flag(client)
531
532    def test_handle_accept(self):
533        # make sure handle_accept() is called when a client connects
534
535        class TestListener(BaseTestHandler):
536
537            def __init__(self):
538                BaseTestHandler.__init__(self)
539                self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
540                self.bind((HOST, 0))
541                self.listen(5)
542                self.address = self.socket.getsockname()[:2]
543
544            def handle_accept(self):
545                self.flag = True
546
547        server = TestListener()
548        client = BaseClient(server.address)
549        self.loop_waiting_for_flag(server)
550
551    def test_handle_read(self):
552        # make sure handle_read is called on data received
553
554        class TestClient(BaseClient):
555            def handle_read(self):
556                self.flag = True
557
558        class TestHandler(BaseTestHandler):
559            def __init__(self, conn):
560                BaseTestHandler.__init__(self, conn)
561                self.send('x' * 1024)
562
563        server = TCPServer(TestHandler)
564        client = TestClient(server.address)
565        self.loop_waiting_for_flag(client)
566
567    def test_handle_write(self):
568        # make sure handle_write is called
569
570        class TestClient(BaseClient):
571            def handle_write(self):
572                self.flag = True
573
574        server = TCPServer()
575        client = TestClient(server.address)
576        self.loop_waiting_for_flag(client)
577
578    def test_handle_close(self):
579        # make sure handle_close is called when the other end closes
580        # the connection
581
582        class TestClient(BaseClient):
583
584            def handle_read(self):
585                # in order to make handle_close be called we are supposed
586                # to make at least one recv() call
587                self.recv(1024)
588
589            def handle_close(self):
590                self.flag = True
591                self.close()
592
593        class TestHandler(BaseTestHandler):
594            def __init__(self, conn):
595                BaseTestHandler.__init__(self, conn)
596                self.close()
597
598        server = TCPServer(TestHandler)
599        client = TestClient(server.address)
600        self.loop_waiting_for_flag(client)
601
602    @unittest.skipIf(sys.platform.startswith("sunos"),
603                     "OOB support is broken on Solaris")
604    def test_handle_expt(self):
605        # Make sure handle_expt is called on OOB data received.
606        # Note: this might fail on some platforms as OOB data is
607        # tenuously supported and rarely used.
608
609        class TestClient(BaseClient):
610            def handle_expt(self):
611                self.flag = True
612
613        class TestHandler(BaseTestHandler):
614            def __init__(self, conn):
615                BaseTestHandler.__init__(self, conn)
616                self.socket.send(chr(244), socket.MSG_OOB)
617
618        server = TCPServer(TestHandler)
619        client = TestClient(server.address)
620        self.loop_waiting_for_flag(client)
621
622    def test_handle_error(self):
623
624        class TestClient(BaseClient):
625            def handle_write(self):
626                1.0 / 0
627            def handle_error(self):
628                self.flag = True
629                try:
630                    raise
631                except ZeroDivisionError:
632                    pass
633                else:
634                    raise Exception("exception not raised")
635
636        server = TCPServer()
637        client = TestClient(server.address)
638        self.loop_waiting_for_flag(client)
639
640    def test_connection_attributes(self):
641        server = TCPServer()
642        client = BaseClient(server.address)
643
644        # we start disconnected
645        self.assertFalse(server.connected)
646        self.assertTrue(server.accepting)
647        # this can't be taken for granted across all platforms
648        #self.assertFalse(client.connected)
649        self.assertFalse(client.accepting)
650
651        # execute some loops so that client connects to server
652        asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
653        self.assertFalse(server.connected)
654        self.assertTrue(server.accepting)
655        self.assertTrue(client.connected)
656        self.assertFalse(client.accepting)
657
658        # disconnect the client
659        client.close()
660        self.assertFalse(server.connected)
661        self.assertTrue(server.accepting)
662        self.assertFalse(client.connected)
663        self.assertFalse(client.accepting)
664
665        # stop serving
666        server.close()
667        self.assertFalse(server.connected)
668        self.assertFalse(server.accepting)
669
670    def test_create_socket(self):
671        s = asyncore.dispatcher()
672        s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
673        self.assertEqual(s.socket.family, socket.AF_INET)
674        self.assertEqual(s.socket.type, socket.SOCK_STREAM)
675
676    def test_bind(self):
677        s1 = asyncore.dispatcher()
678        s1.create_socket(socket.AF_INET, socket.SOCK_STREAM)
679        s1.bind((HOST, 0))
680        s1.listen(5)
681        port = s1.socket.getsockname()[1]
682
683        s2 = asyncore.dispatcher()
684        s2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
685        # EADDRINUSE indicates the socket was correctly bound
686        self.assertRaises(socket.error, s2.bind, (HOST, port))
687
688    def test_set_reuse_addr(self):
689        sock = socket.socket()
690        try:
691            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
692        except socket.error:
693            unittest.skip("SO_REUSEADDR not supported on this platform")
694        else:
695            # if SO_REUSEADDR succeeded for sock we expect asyncore
696            # to do the same
697            s = asyncore.dispatcher(socket.socket())
698            self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
699                                                 socket.SO_REUSEADDR))
700            s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
701            s.set_reuse_addr()
702            self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
703                                                 socket.SO_REUSEADDR))
704        finally:
705            sock.close()
706
707    @unittest.skipUnless(threading, 'Threading required for this test.')
708    @test_support.reap_threads
709    def test_quick_connect(self):
710        # see: http://bugs.python.org/issue10340
711        server = TCPServer()
712        t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
713        t.start()
714        self.addCleanup(t.join)
715
716        for x in xrange(20):
717            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
718            s.settimeout(.2)
719            s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
720                         struct.pack('ii', 1, 0))
721            try:
722                s.connect(server.address)
723            except socket.error:
724                pass
725            finally:
726                s.close()
727
728
729class TestAPI_UseSelect(BaseTestAPI):
730    use_poll = False
731
732@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
733class TestAPI_UsePoll(BaseTestAPI):
734    use_poll = True
735
736
737def test_main():
738    tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
739             DispatcherWithSendTests_UsePoll, TestAPI_UseSelect,
740             TestAPI_UsePoll, FileWrapperTest]
741    run_unittest(*tests)
742
743if __name__ == "__main__":
744    test_main()
745