• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import threading
14import re
15import io
16import contextlib
17from test import support
18
19try:
20    import gzip
21except ImportError:
22    gzip = None
23
24alist = [{'astring': 'foo@bar.baz.spam',
25          'afloat': 7283.43,
26          'anint': 2**20,
27          'ashortlong': 2,
28          'anotherlist': ['.zyx.41'],
29          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
30          'b64bytes': b"my dog has fleas",
31          'b64bytearray': bytearray(b"my dog has fleas"),
32          'boolean': False,
33          'unicode': '\u4000\u6000\u8000',
34          'ukey\u4000': 'regular value',
35          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
36          'datetime2': xmlrpclib.DateTime(
37                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
38          'datetime3': xmlrpclib.DateTime(
39                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
40          }]
41
42class XMLRPCTestCase(unittest.TestCase):
43
44    def test_dump_load(self):
45        dump = xmlrpclib.dumps((alist,))
46        load = xmlrpclib.loads(dump)
47        self.assertEqual(alist, load[0][0])
48
49    def test_dump_bare_datetime(self):
50        # This checks that an unwrapped datetime.date object can be handled
51        # by the marshalling code.  This can't be done via test_dump_load()
52        # since with use_builtin_types set to 1 the unmarshaller would create
53        # datetime objects for the 'datetime[123]' keys as well
54        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
55        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
56        s = xmlrpclib.dumps((dt,))
57
58        result, m = xmlrpclib.loads(s, use_builtin_types=True)
59        (newdt,) = result
60        self.assertEqual(newdt, dt)
61        self.assertIs(type(newdt), datetime.datetime)
62        self.assertIsNone(m)
63
64        result, m = xmlrpclib.loads(s, use_builtin_types=False)
65        (newdt,) = result
66        self.assertEqual(newdt, dt)
67        self.assertIs(type(newdt), xmlrpclib.DateTime)
68        self.assertIsNone(m)
69
70        result, m = xmlrpclib.loads(s, use_datetime=True)
71        (newdt,) = result
72        self.assertEqual(newdt, dt)
73        self.assertIs(type(newdt), datetime.datetime)
74        self.assertIsNone(m)
75
76        result, m = xmlrpclib.loads(s, use_datetime=False)
77        (newdt,) = result
78        self.assertEqual(newdt, dt)
79        self.assertIs(type(newdt), xmlrpclib.DateTime)
80        self.assertIsNone(m)
81
82
83    def test_datetime_before_1900(self):
84        # same as before but with a date before 1900
85        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
86        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
87        s = xmlrpclib.dumps((dt,))
88
89        result, m = xmlrpclib.loads(s, use_builtin_types=True)
90        (newdt,) = result
91        self.assertEqual(newdt, dt)
92        self.assertIs(type(newdt), datetime.datetime)
93        self.assertIsNone(m)
94
95        result, m = xmlrpclib.loads(s, use_builtin_types=False)
96        (newdt,) = result
97        self.assertEqual(newdt, dt)
98        self.assertIs(type(newdt), xmlrpclib.DateTime)
99        self.assertIsNone(m)
100
101    def test_bug_1164912 (self):
102        d = xmlrpclib.DateTime()
103        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
104                                            methodresponse=True))
105        self.assertIsInstance(new_d.value, str)
106
107        # Check that the output of dumps() is still an 8-bit string
108        s = xmlrpclib.dumps((new_d,), methodresponse=True)
109        self.assertIsInstance(s, str)
110
111    def test_newstyle_class(self):
112        class T(object):
113            pass
114        t = T()
115        t.x = 100
116        t.y = "Hello"
117        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
118        self.assertEqual(t2, t.__dict__)
119
120    def test_dump_big_long(self):
121        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
122
123    def test_dump_bad_dict(self):
124        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
125
126    def test_dump_recursive_seq(self):
127        l = [1,2,3]
128        t = [3,4,5,l]
129        l.append(t)
130        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
131
132    def test_dump_recursive_dict(self):
133        d = {'1':1, '2':1}
134        t = {'3':3, 'd':d}
135        d['t'] = t
136        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
137
138    def test_dump_big_int(self):
139        if sys.maxsize > 2**31-1:
140            self.assertRaises(OverflowError, xmlrpclib.dumps,
141                              (int(2**34),))
142
143        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
144        self.assertRaises(OverflowError, xmlrpclib.dumps,
145                          (xmlrpclib.MAXINT+1,))
146        self.assertRaises(OverflowError, xmlrpclib.dumps,
147                          (xmlrpclib.MININT-1,))
148
149        def dummy_write(s):
150            pass
151
152        m = xmlrpclib.Marshaller()
153        m.dump_int(xmlrpclib.MAXINT, dummy_write)
154        m.dump_int(xmlrpclib.MININT, dummy_write)
155        self.assertRaises(OverflowError, m.dump_int,
156                          xmlrpclib.MAXINT+1, dummy_write)
157        self.assertRaises(OverflowError, m.dump_int,
158                          xmlrpclib.MININT-1, dummy_write)
159
160    def test_dump_double(self):
161        xmlrpclib.dumps((float(2 ** 34),))
162        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
163                         float(xmlrpclib.MININT)))
164        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
165                         float(xmlrpclib.MININT - 42)))
166
167        def dummy_write(s):
168            pass
169
170        m = xmlrpclib.Marshaller()
171        m.dump_double(xmlrpclib.MAXINT, dummy_write)
172        m.dump_double(xmlrpclib.MININT, dummy_write)
173        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
174        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
175
176    def test_dump_none(self):
177        value = alist + [None]
178        arg1 = (alist + [None],)
179        strg = xmlrpclib.dumps(arg1, allow_none=True)
180        self.assertEqual(value,
181                          xmlrpclib.loads(strg)[0][0])
182        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
183
184    def test_dump_encoding(self):
185        value = {'key\u20ac\xa4':
186                 'value\u20ac\xa4'}
187        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
188        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
189        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
190        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
191        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
192
193        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
194                               methodresponse=True)
195        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
197        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
198
199        methodname = 'method\u20ac\xa4'
200        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
201                               methodname=methodname)
202        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
203        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
204
205    def test_dump_bytes(self):
206        sample = b"my dog has fleas"
207        self.assertEqual(sample, xmlrpclib.Binary(sample))
208        for type_ in bytes, bytearray, xmlrpclib.Binary:
209            value = type_(sample)
210            s = xmlrpclib.dumps((value,))
211
212            result, m = xmlrpclib.loads(s, use_builtin_types=True)
213            (newvalue,) = result
214            self.assertEqual(newvalue, sample)
215            self.assertIs(type(newvalue), bytes)
216            self.assertIsNone(m)
217
218            result, m = xmlrpclib.loads(s, use_builtin_types=False)
219            (newvalue,) = result
220            self.assertEqual(newvalue, sample)
221            self.assertIs(type(newvalue), xmlrpclib.Binary)
222            self.assertIsNone(m)
223
224    def test_loads_unsupported(self):
225        ResponseError = xmlrpclib.ResponseError
226        data = '<params><param><value><spam/></value></param></params>'
227        self.assertRaises(ResponseError, xmlrpclib.loads, data)
228        data = ('<params><param><value><array>'
229                '<value><spam/></value>'
230                '</array></value></param></params>')
231        self.assertRaises(ResponseError, xmlrpclib.loads, data)
232        data = ('<params><param><value><struct>'
233                '<member><name>a</name><value><spam/></value></member>'
234                '<member><name>b</name><value><spam/></value></member>'
235                '</struct></value></param></params>')
236        self.assertRaises(ResponseError, xmlrpclib.loads, data)
237
238    def check_loads(self, s, value, **kwargs):
239        dump = '<params><param><value>%s</value></param></params>' % s
240        result, m = xmlrpclib.loads(dump, **kwargs)
241        (newvalue,) = result
242        self.assertEqual(newvalue, value)
243        self.assertIs(type(newvalue), type(value))
244        self.assertIsNone(m)
245
246    def test_load_standard_types(self):
247        check = self.check_loads
248        check('string', 'string')
249        check('<string>string</string>', 'string')
250        check('<string>�������������� string</string>', '�������������� string')
251        check('<int>2056183947</int>', 2056183947)
252        check('<int>-2056183947</int>', -2056183947)
253        check('<i4>2056183947</i4>', 2056183947)
254        check('<double>46093.78125</double>', 46093.78125)
255        check('<boolean>0</boolean>', False)
256        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
257              xmlrpclib.Binary(b'\x00byte string\xff'))
258        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
259              b'\x00byte string\xff', use_builtin_types=True)
260        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
261              xmlrpclib.DateTime('20050210T11:41:23'))
262        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
263              datetime.datetime(2005, 2, 10, 11, 41, 23),
264              use_builtin_types=True)
265        check('<array><data>'
266              '<value><int>1</int></value><value><int>2</int></value>'
267              '</data></array>', [1, 2])
268        check('<struct>'
269              '<member><name>b</name><value><int>2</int></value></member>'
270              '<member><name>a</name><value><int>1</int></value></member>'
271              '</struct>', {'a': 1, 'b': 2})
272
273    def test_load_extension_types(self):
274        check = self.check_loads
275        check('<nil/>', None)
276        check('<ex:nil/>', None)
277        check('<i1>205</i1>', 205)
278        check('<i2>20561</i2>', 20561)
279        check('<i8>9876543210</i8>', 9876543210)
280        check('<biginteger>98765432100123456789</biginteger>',
281              98765432100123456789)
282        check('<float>93.78125</float>', 93.78125)
283        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
284              decimal.Decimal('9876543210.0123456789'))
285
286    def test_get_host_info(self):
287        # see bug #3613, this raised a TypeError
288        transp = xmlrpc.client.Transport()
289        self.assertEqual(transp.get_host_info("user@host.tld"),
290                          ('host.tld',
291                           [('Authorization', 'Basic dXNlcg==')], {}))
292
293    def test_ssl_presence(self):
294        try:
295            import ssl
296        except ImportError:
297            has_ssl = False
298        else:
299            has_ssl = True
300        try:
301            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
302        except NotImplementedError:
303            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
304        except OSError:
305            self.assertTrue(has_ssl)
306
307    def test_keepalive_disconnect(self):
308        class RequestHandler(http.server.BaseHTTPRequestHandler):
309            protocol_version = "HTTP/1.1"
310            handled = False
311
312            def do_POST(self):
313                length = int(self.headers.get("Content-Length"))
314                self.rfile.read(length)
315                if self.handled:
316                    self.close_connection = True
317                    return
318                response = xmlrpclib.dumps((5,), methodresponse=True)
319                response = response.encode()
320                self.send_response(http.HTTPStatus.OK)
321                self.send_header("Content-Length", len(response))
322                self.end_headers()
323                self.wfile.write(response)
324                self.handled = True
325                self.close_connection = False
326
327            def log_message(self, format, *args):
328                # don't clobber sys.stderr
329                pass
330
331        def run_server():
332            server.socket.settimeout(float(1))  # Don't hang if client fails
333            server.handle_request()  # First request and attempt at second
334            server.handle_request()  # Retried second request
335
336        server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
337        self.addCleanup(server.server_close)
338        thread = threading.Thread(target=run_server)
339        thread.start()
340        self.addCleanup(thread.join)
341        url = "http://{}:{}/".format(*server.server_address)
342        with xmlrpclib.ServerProxy(url) as p:
343            self.assertEqual(p.method(), 5)
344            self.assertEqual(p.method(), 5)
345
346
347class SimpleXMLRPCDispatcherTestCase(unittest.TestCase):
348    class DispatchExc(Exception):
349        """Raised inside the dispatched functions when checking for
350        chained exceptions"""
351
352    def test_call_registered_func(self):
353        """Calls explicitly registered function"""
354        # Makes sure any exception raised inside the function has no other
355        # exception chained to it
356
357        exp_params = 1, 2, 3
358
359        def dispatched_func(*params):
360            raise self.DispatchExc(params)
361
362        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
363        dispatcher.register_function(dispatched_func)
364        with self.assertRaises(self.DispatchExc) as exc_ctx:
365            dispatcher._dispatch('dispatched_func', exp_params)
366        self.assertEqual(exc_ctx.exception.args, (exp_params,))
367        self.assertIsNone(exc_ctx.exception.__cause__)
368        self.assertIsNone(exc_ctx.exception.__context__)
369
370    def test_call_instance_func(self):
371        """Calls a registered instance attribute as a function"""
372        # Makes sure any exception raised inside the function has no other
373        # exception chained to it
374
375        exp_params = 1, 2, 3
376
377        class DispatchedClass:
378            def dispatched_func(self, *params):
379                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
380
381        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
382        dispatcher.register_instance(DispatchedClass())
383        with self.assertRaises(self.DispatchExc) as exc_ctx:
384            dispatcher._dispatch('dispatched_func', exp_params)
385        self.assertEqual(exc_ctx.exception.args, (exp_params,))
386        self.assertIsNone(exc_ctx.exception.__cause__)
387        self.assertIsNone(exc_ctx.exception.__context__)
388
389    def test_call_dispatch_func(self):
390        """Calls the registered instance's `_dispatch` function"""
391        # Makes sure any exception raised inside the function has no other
392        # exception chained to it
393
394        exp_method = 'method'
395        exp_params = 1, 2, 3
396
397        class TestInstance:
398            def _dispatch(self, method, params):
399                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
400                    method, params)
401
402        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
403        dispatcher.register_instance(TestInstance())
404        with self.assertRaises(self.DispatchExc) as exc_ctx:
405            dispatcher._dispatch(exp_method, exp_params)
406        self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
407        self.assertIsNone(exc_ctx.exception.__cause__)
408        self.assertIsNone(exc_ctx.exception.__context__)
409
410    def test_registered_func_is_none(self):
411        """Calls explicitly registered function which is None"""
412
413        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
414        dispatcher.register_function(None, name='method')
415        with self.assertRaisesRegex(Exception, 'method'):
416            dispatcher._dispatch('method', ('param',))
417
418    def test_instance_has_no_func(self):
419        """Attempts to call nonexistent function on a registered instance"""
420
421        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
422        dispatcher.register_instance(object())
423        with self.assertRaisesRegex(Exception, 'method'):
424            dispatcher._dispatch('method', ('param',))
425
426    def test_cannot_locate_func(self):
427        """Calls a function that the dispatcher cannot locate"""
428
429        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
430        with self.assertRaisesRegex(Exception, 'method'):
431            dispatcher._dispatch('method', ('param',))
432
433
434class HelperTestCase(unittest.TestCase):
435    def test_escape(self):
436        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
437        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
438        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
439
440class FaultTestCase(unittest.TestCase):
441    def test_repr(self):
442        f = xmlrpclib.Fault(42, 'Test Fault')
443        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
444        self.assertEqual(repr(f), str(f))
445
446    def test_dump_fault(self):
447        f = xmlrpclib.Fault(42, 'Test Fault')
448        s = xmlrpclib.dumps((f,))
449        (newf,), m = xmlrpclib.loads(s)
450        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
451        self.assertEqual(m, None)
452
453        s = xmlrpclib.Marshaller().dumps(f)
454        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
455
456    def test_dotted_attribute(self):
457        # this will raise AttributeError because code don't want us to use
458        # private methods
459        self.assertRaises(AttributeError,
460                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
461        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
462
463class DateTimeTestCase(unittest.TestCase):
464    def test_default(self):
465        with mock.patch('time.localtime') as localtime_mock:
466            time_struct = time.struct_time(
467                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
468            localtime_mock.return_value = time_struct
469            localtime = time.localtime()
470            t = xmlrpclib.DateTime()
471            self.assertEqual(str(t),
472                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
473
474    def test_time(self):
475        d = 1181399930.036952
476        t = xmlrpclib.DateTime(d)
477        self.assertEqual(str(t),
478                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
479
480    def test_time_tuple(self):
481        d = (2007,6,9,10,38,50,5,160,0)
482        t = xmlrpclib.DateTime(d)
483        self.assertEqual(str(t), '20070609T10:38:50')
484
485    def test_time_struct(self):
486        d = time.localtime(1181399930.036952)
487        t = xmlrpclib.DateTime(d)
488        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
489
490    def test_datetime_datetime(self):
491        d = datetime.datetime(2007,1,2,3,4,5)
492        t = xmlrpclib.DateTime(d)
493        self.assertEqual(str(t), '20070102T03:04:05')
494
495    def test_repr(self):
496        d = datetime.datetime(2007,1,2,3,4,5)
497        t = xmlrpclib.DateTime(d)
498        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
499        self.assertEqual(repr(t), val)
500
501    def test_decode(self):
502        d = ' 20070908T07:11:13  '
503        t1 = xmlrpclib.DateTime()
504        t1.decode(d)
505        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
506        self.assertEqual(t1, tref)
507
508        t2 = xmlrpclib._datetime(d)
509        self.assertEqual(t2, tref)
510
511    def test_comparison(self):
512        now = datetime.datetime.now()
513        dtime = xmlrpclib.DateTime(now.timetuple())
514
515        # datetime vs. DateTime
516        self.assertTrue(dtime == now)
517        self.assertTrue(now == dtime)
518        then = now + datetime.timedelta(seconds=4)
519        self.assertTrue(then >= dtime)
520        self.assertTrue(dtime < then)
521
522        # str vs. DateTime
523        dstr = now.strftime("%Y%m%dT%H:%M:%S")
524        self.assertTrue(dtime == dstr)
525        self.assertTrue(dstr == dtime)
526        dtime_then = xmlrpclib.DateTime(then.timetuple())
527        self.assertTrue(dtime_then >= dstr)
528        self.assertTrue(dstr < dtime_then)
529
530        # some other types
531        dbytes = dstr.encode('ascii')
532        dtuple = now.timetuple()
533        with self.assertRaises(TypeError):
534            dtime == 1970
535        with self.assertRaises(TypeError):
536            dtime != dbytes
537        with self.assertRaises(TypeError):
538            dtime == bytearray(dbytes)
539        with self.assertRaises(TypeError):
540            dtime != dtuple
541        with self.assertRaises(TypeError):
542            dtime < float(1970)
543        with self.assertRaises(TypeError):
544            dtime > dbytes
545        with self.assertRaises(TypeError):
546            dtime <= bytearray(dbytes)
547        with self.assertRaises(TypeError):
548            dtime >= dtuple
549
550class BinaryTestCase(unittest.TestCase):
551
552    # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
553    # for now (i.e. interpreting the binary data as Latin-1-encoded
554    # text).  But this feels very unsatisfactory.  Perhaps we should
555    # only define repr(), and return r"Binary(b'\xff')" instead?
556
557    def test_default(self):
558        t = xmlrpclib.Binary()
559        self.assertEqual(str(t), '')
560
561    def test_string(self):
562        d = b'\x01\x02\x03abc123\xff\xfe'
563        t = xmlrpclib.Binary(d)
564        self.assertEqual(str(t), str(d, "latin-1"))
565
566    def test_decode(self):
567        d = b'\x01\x02\x03abc123\xff\xfe'
568        de = base64.encodebytes(d)
569        t1 = xmlrpclib.Binary()
570        t1.decode(de)
571        self.assertEqual(str(t1), str(d, "latin-1"))
572
573        t2 = xmlrpclib._binary(de)
574        self.assertEqual(str(t2), str(d, "latin-1"))
575
576
577ADDR = PORT = URL = None
578
579# The evt is set twice.  First when the server is ready to serve.
580# Second when the server has been shutdown.  The user must clear
581# the event after it has been set the first time to catch the second set.
582def http_server(evt, numrequests, requestHandler=None, encoding=None):
583    class TestInstanceClass:
584        def div(self, x, y):
585            return x // y
586
587        def _methodHelp(self, name):
588            if name == 'div':
589                return 'This is the div function'
590
591        class Fixture:
592            @staticmethod
593            def getData():
594                return '42'
595
596    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
597        def get_request(self):
598            # Ensure the socket is always non-blocking.  On Linux, socket
599            # attributes are not inherited like they are on *BSD and Windows.
600            s, port = self.socket.accept()
601            s.setblocking(True)
602            return s, port
603
604    if not requestHandler:
605        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
606    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
607                          encoding=encoding,
608                          logRequests=False, bind_and_activate=False)
609    try:
610        serv.server_bind()
611        global ADDR, PORT, URL
612        ADDR, PORT = serv.socket.getsockname()
613        #connect to IP address directly.  This avoids socket.create_connection()
614        #trying to connect to "localhost" using all address families, which
615        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
616        #on AF_INET only.
617        URL = "http://%s:%d"%(ADDR, PORT)
618        serv.server_activate()
619        serv.register_introspection_functions()
620        serv.register_multicall_functions()
621        serv.register_function(pow)
622        serv.register_function(lambda x: x, 'têšt')
623        @serv.register_function
624        def my_function():
625            '''This is my function'''
626            return True
627        @serv.register_function(name='add')
628        def _(x, y):
629            return x + y
630        testInstance = TestInstanceClass()
631        serv.register_instance(testInstance, allow_dotted_names=True)
632        evt.set()
633
634        # handle up to 'numrequests' requests
635        while numrequests > 0:
636            serv.handle_request()
637            numrequests -= 1
638
639    except socket.timeout:
640        pass
641    finally:
642        serv.socket.close()
643        PORT = None
644        evt.set()
645
646def http_multi_server(evt, numrequests, requestHandler=None):
647    class TestInstanceClass:
648        def div(self, x, y):
649            return x // y
650
651        def _methodHelp(self, name):
652            if name == 'div':
653                return 'This is the div function'
654
655    def my_function():
656        '''This is my function'''
657        return True
658
659    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
660        def get_request(self):
661            # Ensure the socket is always non-blocking.  On Linux, socket
662            # attributes are not inherited like they are on *BSD and Windows.
663            s, port = self.socket.accept()
664            s.setblocking(True)
665            return s, port
666
667    if not requestHandler:
668        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
669    class MyRequestHandler(requestHandler):
670        rpc_paths = []
671
672    class BrokenDispatcher:
673        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
674            raise RuntimeError("broken dispatcher")
675
676    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
677                          logRequests=False, bind_and_activate=False)
678    serv.socket.settimeout(3)
679    serv.server_bind()
680    try:
681        global ADDR, PORT, URL
682        ADDR, PORT = serv.socket.getsockname()
683        #connect to IP address directly.  This avoids socket.create_connection()
684        #trying to connect to "localhost" using all address families, which
685        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
686        #on AF_INET only.
687        URL = "http://%s:%d"%(ADDR, PORT)
688        serv.server_activate()
689        paths = ["/foo", "/foo/bar"]
690        for path in paths:
691            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
692            d.register_introspection_functions()
693            d.register_multicall_functions()
694        serv.get_dispatcher(paths[0]).register_function(pow)
695        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
696        serv.add_dispatcher("/is/broken", BrokenDispatcher())
697        evt.set()
698
699        # handle up to 'numrequests' requests
700        while numrequests > 0:
701            serv.handle_request()
702            numrequests -= 1
703
704    except socket.timeout:
705        pass
706    finally:
707        serv.socket.close()
708        PORT = None
709        evt.set()
710
711# This function prevents errors like:
712#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
713def is_unavailable_exception(e):
714    '''Returns True if the given ProtocolError is the product of a server-side
715       exception caused by the 'temporarily unavailable' response sometimes
716       given by operations on non-blocking sockets.'''
717
718    # sometimes we get a -1 error code and/or empty headers
719    try:
720        if e.errcode == -1 or e.headers is None:
721            return True
722        exc_mess = e.headers.get('X-exception')
723    except AttributeError:
724        # Ignore OSErrors here.
725        exc_mess = str(e)
726
727    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
728        return True
729
730def make_request_and_skipIf(condition, reason):
731    # If we skip the test, we have to make a request because
732    # the server created in setUp blocks expecting one to come in.
733    if not condition:
734        return lambda func: func
735    def decorator(func):
736        def make_request_and_skip(self):
737            try:
738                xmlrpclib.ServerProxy(URL).my_function()
739            except (xmlrpclib.ProtocolError, OSError) as e:
740                if not is_unavailable_exception(e):
741                    raise
742            raise unittest.SkipTest(reason)
743        return make_request_and_skip
744    return decorator
745
746class BaseServerTestCase(unittest.TestCase):
747    requestHandler = None
748    request_count = 1
749    threadFunc = staticmethod(http_server)
750
751    def setUp(self):
752        # enable traceback reporting
753        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
754
755        self.evt = threading.Event()
756        # start server thread to handle requests
757        serv_args = (self.evt, self.request_count, self.requestHandler)
758        thread = threading.Thread(target=self.threadFunc, args=serv_args)
759        thread.start()
760        self.addCleanup(thread.join)
761
762        # wait for the server to be ready
763        self.evt.wait()
764        self.evt.clear()
765
766    def tearDown(self):
767        # wait on the server thread to terminate
768        self.evt.wait()
769
770        # disable traceback reporting
771        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
772
773class SimpleServerTestCase(BaseServerTestCase):
774    def test_simple1(self):
775        try:
776            p = xmlrpclib.ServerProxy(URL)
777            self.assertEqual(p.pow(6,8), 6**8)
778        except (xmlrpclib.ProtocolError, OSError) as e:
779            # ignore failures due to non-blocking socket 'unavailable' errors
780            if not is_unavailable_exception(e):
781                # protocol error; provide additional information in test output
782                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
783
784    def test_nonascii(self):
785        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
786        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
787        try:
788            p = xmlrpclib.ServerProxy(URL)
789            self.assertEqual(p.add(start_string, end_string),
790                             start_string + end_string)
791        except (xmlrpclib.ProtocolError, OSError) as e:
792            # ignore failures due to non-blocking socket 'unavailable' errors
793            if not is_unavailable_exception(e):
794                # protocol error; provide additional information in test output
795                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
796
797    def test_client_encoding(self):
798        start_string = '\u20ac'
799        end_string = '\xa4'
800
801        try:
802            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
803            self.assertEqual(p.add(start_string, end_string),
804                             start_string + end_string)
805        except (xmlrpclib.ProtocolError, socket.error) as e:
806            # ignore failures due to non-blocking socket unavailable errors.
807            if not is_unavailable_exception(e):
808                # protocol error; provide additional information in test output
809                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
810
811    def test_nonascii_methodname(self):
812        try:
813            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
814            self.assertEqual(p.têšt(42), 42)
815        except (xmlrpclib.ProtocolError, socket.error) as e:
816            # ignore failures due to non-blocking socket unavailable errors.
817            if not is_unavailable_exception(e):
818                # protocol error; provide additional information in test output
819                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
820
821    def test_404(self):
822        # send POST with http.client, it should return 404 header and
823        # 'Not Found' message.
824        conn = http.client.HTTPConnection(ADDR, PORT)
825        conn.request('POST', '/this-is-not-valid')
826        response = conn.getresponse()
827        conn.close()
828
829        self.assertEqual(response.status, 404)
830        self.assertEqual(response.reason, 'Not Found')
831
832    def test_introspection1(self):
833        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
834                                'system.listMethods', 'system.methodHelp',
835                                'system.methodSignature', 'system.multicall',
836                                'Fixture'])
837        try:
838            p = xmlrpclib.ServerProxy(URL)
839            meth = p.system.listMethods()
840            self.assertEqual(set(meth), expected_methods)
841        except (xmlrpclib.ProtocolError, OSError) as e:
842            # ignore failures due to non-blocking socket 'unavailable' errors
843            if not is_unavailable_exception(e):
844                # protocol error; provide additional information in test output
845                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
846
847
848    def test_introspection2(self):
849        try:
850            # test _methodHelp()
851            p = xmlrpclib.ServerProxy(URL)
852            divhelp = p.system.methodHelp('div')
853            self.assertEqual(divhelp, 'This is the div function')
854        except (xmlrpclib.ProtocolError, OSError) as e:
855            # ignore failures due to non-blocking socket 'unavailable' errors
856            if not is_unavailable_exception(e):
857                # protocol error; provide additional information in test output
858                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
859
860    @make_request_and_skipIf(sys.flags.optimize >= 2,
861                     "Docstrings are omitted with -O2 and above")
862    def test_introspection3(self):
863        try:
864            # test native doc
865            p = xmlrpclib.ServerProxy(URL)
866            myfunction = p.system.methodHelp('my_function')
867            self.assertEqual(myfunction, 'This is my function')
868        except (xmlrpclib.ProtocolError, OSError) as e:
869            # ignore failures due to non-blocking socket 'unavailable' errors
870            if not is_unavailable_exception(e):
871                # protocol error; provide additional information in test output
872                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
873
874    def test_introspection4(self):
875        # the SimpleXMLRPCServer doesn't support signatures, but
876        # at least check that we can try making the call
877        try:
878            p = xmlrpclib.ServerProxy(URL)
879            divsig = p.system.methodSignature('div')
880            self.assertEqual(divsig, 'signatures not supported')
881        except (xmlrpclib.ProtocolError, OSError) as e:
882            # ignore failures due to non-blocking socket 'unavailable' errors
883            if not is_unavailable_exception(e):
884                # protocol error; provide additional information in test output
885                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
886
887    def test_multicall(self):
888        try:
889            p = xmlrpclib.ServerProxy(URL)
890            multicall = xmlrpclib.MultiCall(p)
891            multicall.add(2,3)
892            multicall.pow(6,8)
893            multicall.div(127,42)
894            add_result, pow_result, div_result = multicall()
895            self.assertEqual(add_result, 2+3)
896            self.assertEqual(pow_result, 6**8)
897            self.assertEqual(div_result, 127//42)
898        except (xmlrpclib.ProtocolError, OSError) as e:
899            # ignore failures due to non-blocking socket 'unavailable' errors
900            if not is_unavailable_exception(e):
901                # protocol error; provide additional information in test output
902                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
903
904    def test_non_existing_multicall(self):
905        try:
906            p = xmlrpclib.ServerProxy(URL)
907            multicall = xmlrpclib.MultiCall(p)
908            multicall.this_is_not_exists()
909            result = multicall()
910
911            # result.results contains;
912            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
913            #   'method "this_is_not_exists" is not supported'>}]
914
915            self.assertEqual(result.results[0]['faultCode'], 1)
916            self.assertEqual(result.results[0]['faultString'],
917                '<class \'Exception\'>:method "this_is_not_exists" '
918                'is not supported')
919        except (xmlrpclib.ProtocolError, OSError) as e:
920            # ignore failures due to non-blocking socket 'unavailable' errors
921            if not is_unavailable_exception(e):
922                # protocol error; provide additional information in test output
923                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
924
925    def test_dotted_attribute(self):
926        # Raises an AttributeError because private methods are not allowed.
927        self.assertRaises(AttributeError,
928                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
929
930        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
931        # Get the test to run faster by sending a request with test_simple1.
932        # This avoids waiting for the socket timeout.
933        self.test_simple1()
934
935    def test_allow_dotted_names_true(self):
936        # XXX also need allow_dotted_names_false test.
937        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
938        data = server.Fixture.getData()
939        self.assertEqual(data, '42')
940
941    def test_unicode_host(self):
942        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
943        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
944
945    def test_partial_post(self):
946        # Check that a partial POST doesn't make the server loop: issue #14001.
947        conn = http.client.HTTPConnection(ADDR, PORT)
948        conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
949        conn.close()
950
951    def test_context_manager(self):
952        with xmlrpclib.ServerProxy(URL) as server:
953            server.add(2, 3)
954            self.assertNotEqual(server('transport')._connection,
955                                (None, None))
956        self.assertEqual(server('transport')._connection,
957                         (None, None))
958
959    def test_context_manager_method_error(self):
960        try:
961            with xmlrpclib.ServerProxy(URL) as server:
962                server.add(2, "a")
963        except xmlrpclib.Fault:
964            pass
965        self.assertEqual(server('transport')._connection,
966                         (None, None))
967
968
969class SimpleServerEncodingTestCase(BaseServerTestCase):
970    @staticmethod
971    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
972        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
973
974    def test_server_encoding(self):
975        start_string = '\u20ac'
976        end_string = '\xa4'
977
978        try:
979            p = xmlrpclib.ServerProxy(URL)
980            self.assertEqual(p.add(start_string, end_string),
981                             start_string + end_string)
982        except (xmlrpclib.ProtocolError, socket.error) as e:
983            # ignore failures due to non-blocking socket unavailable errors.
984            if not is_unavailable_exception(e):
985                # protocol error; provide additional information in test output
986                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
987
988
989class MultiPathServerTestCase(BaseServerTestCase):
990    threadFunc = staticmethod(http_multi_server)
991    request_count = 2
992    def test_path1(self):
993        p = xmlrpclib.ServerProxy(URL+"/foo")
994        self.assertEqual(p.pow(6,8), 6**8)
995        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
996
997    def test_path2(self):
998        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
999        self.assertEqual(p.add(6,8), 6+8)
1000        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1001
1002    def test_path3(self):
1003        p = xmlrpclib.ServerProxy(URL+"/is/broken")
1004        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1005
1006#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1007#does indeed serve subsequent requests on the same connection
1008class BaseKeepaliveServerTestCase(BaseServerTestCase):
1009    #a request handler that supports keep-alive and logs requests into a
1010    #class variable
1011    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1012        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1013        protocol_version = 'HTTP/1.1'
1014        myRequests = []
1015        def handle(self):
1016            self.myRequests.append([])
1017            self.reqidx = len(self.myRequests)-1
1018            return self.parentClass.handle(self)
1019        def handle_one_request(self):
1020            result = self.parentClass.handle_one_request(self)
1021            self.myRequests[self.reqidx].append(self.raw_requestline)
1022            return result
1023
1024    requestHandler = RequestHandler
1025    def setUp(self):
1026        #clear request log
1027        self.RequestHandler.myRequests = []
1028        return BaseServerTestCase.setUp(self)
1029
1030#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1031#does indeed serve subsequent requests on the same connection
1032class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
1033    def test_two(self):
1034        p = xmlrpclib.ServerProxy(URL)
1035        #do three requests.
1036        self.assertEqual(p.pow(6,8), 6**8)
1037        self.assertEqual(p.pow(6,8), 6**8)
1038        self.assertEqual(p.pow(6,8), 6**8)
1039        p("close")()
1040
1041        #they should have all been handled by a single request handler
1042        self.assertEqual(len(self.RequestHandler.myRequests), 1)
1043
1044        #check that we did at least two (the third may be pending append
1045        #due to thread scheduling)
1046        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1047
1048
1049#test special attribute access on the serverproxy, through the __call__
1050#function.
1051class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
1052    #ask for two keepalive requests to be handled.
1053    request_count=2
1054
1055    def test_close(self):
1056        p = xmlrpclib.ServerProxy(URL)
1057        #do some requests with close.
1058        self.assertEqual(p.pow(6,8), 6**8)
1059        self.assertEqual(p.pow(6,8), 6**8)
1060        self.assertEqual(p.pow(6,8), 6**8)
1061        p("close")() #this should trigger a new keep-alive request
1062        self.assertEqual(p.pow(6,8), 6**8)
1063        self.assertEqual(p.pow(6,8), 6**8)
1064        self.assertEqual(p.pow(6,8), 6**8)
1065        p("close")()
1066
1067        #they should have all been two request handlers, each having logged at least
1068        #two complete requests
1069        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1070        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1071        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1072
1073
1074    def test_transport(self):
1075        p = xmlrpclib.ServerProxy(URL)
1076        #do some requests with close.
1077        self.assertEqual(p.pow(6,8), 6**8)
1078        p("transport").close() #same as above, really.
1079        self.assertEqual(p.pow(6,8), 6**8)
1080        p("close")()
1081        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1082
1083#A test case that verifies that gzip encoding works in both directions
1084#(for a request and the response)
1085@unittest.skipIf(gzip is None, 'requires gzip')
1086class GzipServerTestCase(BaseServerTestCase):
1087    #a request handler that supports keep-alive and logs requests into a
1088    #class variable
1089    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1090        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1091        protocol_version = 'HTTP/1.1'
1092
1093        def do_POST(self):
1094            #store content of last request in class
1095            self.__class__.content_length = int(self.headers["content-length"])
1096            return self.parentClass.do_POST(self)
1097    requestHandler = RequestHandler
1098
1099    class Transport(xmlrpclib.Transport):
1100        #custom transport, stores the response length for our perusal
1101        fake_gzip = False
1102        def parse_response(self, response):
1103            self.response_length=int(response.getheader("content-length", 0))
1104            return xmlrpclib.Transport.parse_response(self, response)
1105
1106        def send_content(self, connection, body):
1107            if self.fake_gzip:
1108                #add a lone gzip header to induce decode error remotely
1109                connection.putheader("Content-Encoding", "gzip")
1110            return xmlrpclib.Transport.send_content(self, connection, body)
1111
1112    def setUp(self):
1113        BaseServerTestCase.setUp(self)
1114
1115    def test_gzip_request(self):
1116        t = self.Transport()
1117        t.encode_threshold = None
1118        p = xmlrpclib.ServerProxy(URL, transport=t)
1119        self.assertEqual(p.pow(6,8), 6**8)
1120        a = self.RequestHandler.content_length
1121        t.encode_threshold = 0 #turn on request encoding
1122        self.assertEqual(p.pow(6,8), 6**8)
1123        b = self.RequestHandler.content_length
1124        self.assertTrue(a>b)
1125        p("close")()
1126
1127    def test_bad_gzip_request(self):
1128        t = self.Transport()
1129        t.encode_threshold = None
1130        t.fake_gzip = True
1131        p = xmlrpclib.ServerProxy(URL, transport=t)
1132        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1133                                    re.compile(r"\b400\b"))
1134        with cm:
1135            p.pow(6, 8)
1136        p("close")()
1137
1138    def test_gzip_response(self):
1139        t = self.Transport()
1140        p = xmlrpclib.ServerProxy(URL, transport=t)
1141        old = self.requestHandler.encode_threshold
1142        self.requestHandler.encode_threshold = None #no encoding
1143        self.assertEqual(p.pow(6,8), 6**8)
1144        a = t.response_length
1145        self.requestHandler.encode_threshold = 0 #always encode
1146        self.assertEqual(p.pow(6,8), 6**8)
1147        p("close")()
1148        b = t.response_length
1149        self.requestHandler.encode_threshold = old
1150        self.assertTrue(a>b)
1151
1152
1153@unittest.skipIf(gzip is None, 'requires gzip')
1154class GzipUtilTestCase(unittest.TestCase):
1155
1156    def test_gzip_decode_limit(self):
1157        max_gzip_decode = 20 * 1024 * 1024
1158        data = b'\0' * max_gzip_decode
1159        encoded = xmlrpclib.gzip_encode(data)
1160        decoded = xmlrpclib.gzip_decode(encoded)
1161        self.assertEqual(len(decoded), max_gzip_decode)
1162
1163        data = b'\0' * (max_gzip_decode + 1)
1164        encoded = xmlrpclib.gzip_encode(data)
1165
1166        with self.assertRaisesRegex(ValueError,
1167                                    "max gzipped payload length exceeded"):
1168            xmlrpclib.gzip_decode(encoded)
1169
1170        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1171
1172
1173#Test special attributes of the ServerProxy object
1174class ServerProxyTestCase(unittest.TestCase):
1175    def setUp(self):
1176        unittest.TestCase.setUp(self)
1177        # Actual value of the URL doesn't matter if it is a string in
1178        # the correct format.
1179        self.url = 'http://fake.localhost'
1180
1181    def test_close(self):
1182        p = xmlrpclib.ServerProxy(self.url)
1183        self.assertEqual(p('close')(), None)
1184
1185    def test_transport(self):
1186        t = xmlrpclib.Transport()
1187        p = xmlrpclib.ServerProxy(self.url, transport=t)
1188        self.assertEqual(p('transport'), t)
1189
1190
1191# This is a contrived way to make a failure occur on the server side
1192# in order to test the _send_traceback_header flag on the server
1193class FailingMessageClass(http.client.HTTPMessage):
1194    def get(self, key, failobj=None):
1195        key = key.lower()
1196        if key == 'content-length':
1197            return 'I am broken'
1198        return super().get(key, failobj)
1199
1200
1201class FailingServerTestCase(unittest.TestCase):
1202    def setUp(self):
1203        self.evt = threading.Event()
1204        # start server thread to handle requests
1205        serv_args = (self.evt, 1)
1206        thread = threading.Thread(target=http_server, args=serv_args)
1207        thread.start()
1208        self.addCleanup(thread.join)
1209
1210        # wait for the server to be ready
1211        self.evt.wait()
1212        self.evt.clear()
1213
1214    def tearDown(self):
1215        # wait on the server thread to terminate
1216        self.evt.wait()
1217        # reset flag
1218        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1219        # reset message class
1220        default_class = http.client.HTTPMessage
1221        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1222
1223    def test_basic(self):
1224        # check that flag is false by default
1225        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1226        self.assertEqual(flagval, False)
1227
1228        # enable traceback reporting
1229        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1230
1231        # test a call that shouldn't fail just as a smoke test
1232        try:
1233            p = xmlrpclib.ServerProxy(URL)
1234            self.assertEqual(p.pow(6,8), 6**8)
1235        except (xmlrpclib.ProtocolError, OSError) as e:
1236            # ignore failures due to non-blocking socket 'unavailable' errors
1237            if not is_unavailable_exception(e):
1238                # protocol error; provide additional information in test output
1239                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1240
1241    def test_fail_no_info(self):
1242        # use the broken message class
1243        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1244
1245        try:
1246            p = xmlrpclib.ServerProxy(URL)
1247            p.pow(6,8)
1248        except (xmlrpclib.ProtocolError, OSError) as e:
1249            # ignore failures due to non-blocking socket 'unavailable' errors
1250            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1251                # The two server-side error headers shouldn't be sent back in this case
1252                self.assertTrue(e.headers.get("X-exception") is None)
1253                self.assertTrue(e.headers.get("X-traceback") is None)
1254        else:
1255            self.fail('ProtocolError not raised')
1256
1257    def test_fail_with_info(self):
1258        # use the broken message class
1259        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1260
1261        # Check that errors in the server send back exception/traceback
1262        # info when flag is set
1263        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1264
1265        try:
1266            p = xmlrpclib.ServerProxy(URL)
1267            p.pow(6,8)
1268        except (xmlrpclib.ProtocolError, OSError) as e:
1269            # ignore failures due to non-blocking socket 'unavailable' errors
1270            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1271                # We should get error info in the response
1272                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1273                self.assertEqual(e.headers.get("X-exception"), expected_err)
1274                self.assertTrue(e.headers.get("X-traceback") is not None)
1275        else:
1276            self.fail('ProtocolError not raised')
1277
1278
1279@contextlib.contextmanager
1280def captured_stdout(encoding='utf-8'):
1281    """A variation on support.captured_stdout() which gives a text stream
1282    having a `buffer` attribute.
1283    """
1284    orig_stdout = sys.stdout
1285    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1286    try:
1287        yield sys.stdout
1288    finally:
1289        sys.stdout = orig_stdout
1290
1291
1292class CGIHandlerTestCase(unittest.TestCase):
1293    def setUp(self):
1294        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1295
1296    def tearDown(self):
1297        self.cgi = None
1298
1299    def test_cgi_get(self):
1300        with support.EnvironmentVarGuard() as env:
1301            env['REQUEST_METHOD'] = 'GET'
1302            # if the method is GET and no request_text is given, it runs handle_get
1303            # get sysout output
1304            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1305                self.cgi.handle_request()
1306
1307            # parse Status header
1308            data_out.seek(0)
1309            handle = data_out.read()
1310            status = handle.split()[1]
1311            message = ' '.join(handle.split()[2:4])
1312
1313            self.assertEqual(status, '400')
1314            self.assertEqual(message, 'Bad Request')
1315
1316
1317    def test_cgi_xmlrpc_response(self):
1318        data = """<?xml version='1.0'?>
1319        <methodCall>
1320            <methodName>test_method</methodName>
1321            <params>
1322                <param>
1323                    <value><string>foo</string></value>
1324                </param>
1325                <param>
1326                    <value><string>bar</string></value>
1327                </param>
1328            </params>
1329        </methodCall>
1330        """
1331
1332        with support.EnvironmentVarGuard() as env, \
1333             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1334             support.captured_stdin() as data_in:
1335            data_in.write(data)
1336            data_in.seek(0)
1337            env['CONTENT_LENGTH'] = str(len(data))
1338            self.cgi.handle_request()
1339        data_out.seek(0)
1340
1341        # will respond exception, if so, our goal is achieved ;)
1342        handle = data_out.read()
1343
1344        # start with 44th char so as not to get http header, we just
1345        # need only xml
1346        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1347
1348        # Also test the content-length returned  by handle_request
1349        # Using the same test method inorder to avoid all the datapassing
1350        # boilerplate code.
1351        # Test for bug: http://bugs.python.org/issue5040
1352
1353        content = handle[handle.find("<?xml"):]
1354
1355        self.assertEqual(
1356            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1357            len(content))
1358
1359
1360class UseBuiltinTypesTestCase(unittest.TestCase):
1361
1362    def test_use_builtin_types(self):
1363        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1364        # makes all dispatch of binary data as bytes instances, and all
1365        # dispatch of datetime argument as datetime.datetime instances.
1366        self.log = []
1367        expected_bytes = b"my dog has fleas"
1368        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1369        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1370        def foobar(*args):
1371            self.log.extend(args)
1372        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1373            allow_none=True, encoding=None, use_builtin_types=True)
1374        handler.register_function(foobar)
1375        handler._marshaled_dispatch(marshaled)
1376        self.assertEqual(len(self.log), 2)
1377        mybytes, mydate = self.log
1378        self.assertEqual(self.log, [expected_bytes, expected_date])
1379        self.assertIs(type(mydate), datetime.datetime)
1380        self.assertIs(type(mybytes), bytes)
1381
1382    def test_cgihandler_has_use_builtin_types_flag(self):
1383        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1384        self.assertTrue(handler.use_builtin_types)
1385
1386    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1387        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1388            use_builtin_types=True)
1389        server.server_close()
1390        self.assertTrue(server.use_builtin_types)
1391
1392
1393@support.reap_threads
1394def test_main():
1395    support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1396            BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
1397            SimpleServerTestCase, SimpleServerEncodingTestCase,
1398            KeepaliveServerTestCase1, KeepaliveServerTestCase2,
1399            GzipServerTestCase, GzipUtilTestCase,
1400            MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
1401            CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase)
1402
1403
1404if __name__ == "__main__":
1405    test_main()
1406