• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import threading
14import re
15import io
16import contextlib
17from test import support
18from test.support import socket_helper
19from test.support import ALWAYS_EQ, LARGEST, SMALLEST
20
21try:
22    import gzip
23except ImportError:
24    gzip = None
25
26alist = [{'astring': 'foo@bar.baz.spam',
27          'afloat': 7283.43,
28          'anint': 2**20,
29          'ashortlong': 2,
30          'anotherlist': ['.zyx.41'],
31          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
32          'b64bytes': b"my dog has fleas",
33          'b64bytearray': bytearray(b"my dog has fleas"),
34          'boolean': False,
35          'unicode': '\u4000\u6000\u8000',
36          'ukey\u4000': 'regular value',
37          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
38          'datetime2': xmlrpclib.DateTime(
39                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
40          'datetime3': xmlrpclib.DateTime(
41                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
42          }]
43
44class XMLRPCTestCase(unittest.TestCase):
45
46    def test_dump_load(self):
47        dump = xmlrpclib.dumps((alist,))
48        load = xmlrpclib.loads(dump)
49        self.assertEqual(alist, load[0][0])
50
51    def test_dump_bare_datetime(self):
52        # This checks that an unwrapped datetime.date object can be handled
53        # by the marshalling code.  This can't be done via test_dump_load()
54        # since with use_builtin_types set to 1 the unmarshaller would create
55        # datetime objects for the 'datetime[123]' keys as well
56        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
57        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
58        s = xmlrpclib.dumps((dt,))
59
60        result, m = xmlrpclib.loads(s, use_builtin_types=True)
61        (newdt,) = result
62        self.assertEqual(newdt, dt)
63        self.assertIs(type(newdt), datetime.datetime)
64        self.assertIsNone(m)
65
66        result, m = xmlrpclib.loads(s, use_builtin_types=False)
67        (newdt,) = result
68        self.assertEqual(newdt, dt)
69        self.assertIs(type(newdt), xmlrpclib.DateTime)
70        self.assertIsNone(m)
71
72        result, m = xmlrpclib.loads(s, use_datetime=True)
73        (newdt,) = result
74        self.assertEqual(newdt, dt)
75        self.assertIs(type(newdt), datetime.datetime)
76        self.assertIsNone(m)
77
78        result, m = xmlrpclib.loads(s, use_datetime=False)
79        (newdt,) = result
80        self.assertEqual(newdt, dt)
81        self.assertIs(type(newdt), xmlrpclib.DateTime)
82        self.assertIsNone(m)
83
84
85    def test_datetime_before_1900(self):
86        # same as before but with a date before 1900
87        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
88        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
89        s = xmlrpclib.dumps((dt,))
90
91        result, m = xmlrpclib.loads(s, use_builtin_types=True)
92        (newdt,) = result
93        self.assertEqual(newdt, dt)
94        self.assertIs(type(newdt), datetime.datetime)
95        self.assertIsNone(m)
96
97        result, m = xmlrpclib.loads(s, use_builtin_types=False)
98        (newdt,) = result
99        self.assertEqual(newdt, dt)
100        self.assertIs(type(newdt), xmlrpclib.DateTime)
101        self.assertIsNone(m)
102
103    def test_bug_1164912 (self):
104        d = xmlrpclib.DateTime()
105        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
106                                            methodresponse=True))
107        self.assertIsInstance(new_d.value, str)
108
109        # Check that the output of dumps() is still an 8-bit string
110        s = xmlrpclib.dumps((new_d,), methodresponse=True)
111        self.assertIsInstance(s, str)
112
113    def test_newstyle_class(self):
114        class T(object):
115            pass
116        t = T()
117        t.x = 100
118        t.y = "Hello"
119        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
120        self.assertEqual(t2, t.__dict__)
121
122    def test_dump_big_long(self):
123        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
124
125    def test_dump_bad_dict(self):
126        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
127
128    def test_dump_recursive_seq(self):
129        l = [1,2,3]
130        t = [3,4,5,l]
131        l.append(t)
132        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
133
134    def test_dump_recursive_dict(self):
135        d = {'1':1, '2':1}
136        t = {'3':3, 'd':d}
137        d['t'] = t
138        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
139
140    def test_dump_big_int(self):
141        if sys.maxsize > 2**31-1:
142            self.assertRaises(OverflowError, xmlrpclib.dumps,
143                              (int(2**34),))
144
145        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
146        self.assertRaises(OverflowError, xmlrpclib.dumps,
147                          (xmlrpclib.MAXINT+1,))
148        self.assertRaises(OverflowError, xmlrpclib.dumps,
149                          (xmlrpclib.MININT-1,))
150
151        def dummy_write(s):
152            pass
153
154        m = xmlrpclib.Marshaller()
155        m.dump_int(xmlrpclib.MAXINT, dummy_write)
156        m.dump_int(xmlrpclib.MININT, dummy_write)
157        self.assertRaises(OverflowError, m.dump_int,
158                          xmlrpclib.MAXINT+1, dummy_write)
159        self.assertRaises(OverflowError, m.dump_int,
160                          xmlrpclib.MININT-1, dummy_write)
161
162    def test_dump_double(self):
163        xmlrpclib.dumps((float(2 ** 34),))
164        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
165                         float(xmlrpclib.MININT)))
166        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
167                         float(xmlrpclib.MININT - 42)))
168
169        def dummy_write(s):
170            pass
171
172        m = xmlrpclib.Marshaller()
173        m.dump_double(xmlrpclib.MAXINT, dummy_write)
174        m.dump_double(xmlrpclib.MININT, dummy_write)
175        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
176        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
177
178    def test_dump_none(self):
179        value = alist + [None]
180        arg1 = (alist + [None],)
181        strg = xmlrpclib.dumps(arg1, allow_none=True)
182        self.assertEqual(value,
183                          xmlrpclib.loads(strg)[0][0])
184        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
185
186    def test_dump_encoding(self):
187        value = {'key\u20ac\xa4':
188                 'value\u20ac\xa4'}
189        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
190        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
191        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
192        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
193        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
194
195        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
196                               methodresponse=True)
197        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
198        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
199        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
200
201        methodname = 'method\u20ac\xa4'
202        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
203                               methodname=methodname)
204        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
205        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
206
207    def test_dump_bytes(self):
208        sample = b"my dog has fleas"
209        self.assertEqual(sample, xmlrpclib.Binary(sample))
210        for type_ in bytes, bytearray, xmlrpclib.Binary:
211            value = type_(sample)
212            s = xmlrpclib.dumps((value,))
213
214            result, m = xmlrpclib.loads(s, use_builtin_types=True)
215            (newvalue,) = result
216            self.assertEqual(newvalue, sample)
217            self.assertIs(type(newvalue), bytes)
218            self.assertIsNone(m)
219
220            result, m = xmlrpclib.loads(s, use_builtin_types=False)
221            (newvalue,) = result
222            self.assertEqual(newvalue, sample)
223            self.assertIs(type(newvalue), xmlrpclib.Binary)
224            self.assertIsNone(m)
225
226    def test_loads_unsupported(self):
227        ResponseError = xmlrpclib.ResponseError
228        data = '<params><param><value><spam/></value></param></params>'
229        self.assertRaises(ResponseError, xmlrpclib.loads, data)
230        data = ('<params><param><value><array>'
231                '<value><spam/></value>'
232                '</array></value></param></params>')
233        self.assertRaises(ResponseError, xmlrpclib.loads, data)
234        data = ('<params><param><value><struct>'
235                '<member><name>a</name><value><spam/></value></member>'
236                '<member><name>b</name><value><spam/></value></member>'
237                '</struct></value></param></params>')
238        self.assertRaises(ResponseError, xmlrpclib.loads, data)
239
240    def check_loads(self, s, value, **kwargs):
241        dump = '<params><param><value>%s</value></param></params>' % s
242        result, m = xmlrpclib.loads(dump, **kwargs)
243        (newvalue,) = result
244        self.assertEqual(newvalue, value)
245        self.assertIs(type(newvalue), type(value))
246        self.assertIsNone(m)
247
248    def test_load_standard_types(self):
249        check = self.check_loads
250        check('string', 'string')
251        check('<string>string</string>', 'string')
252        check('<string>�������������� string</string>', '�������������� string')
253        check('<int>2056183947</int>', 2056183947)
254        check('<int>-2056183947</int>', -2056183947)
255        check('<i4>2056183947</i4>', 2056183947)
256        check('<double>46093.78125</double>', 46093.78125)
257        check('<boolean>0</boolean>', False)
258        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
259              xmlrpclib.Binary(b'\x00byte string\xff'))
260        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
261              b'\x00byte string\xff', use_builtin_types=True)
262        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
263              xmlrpclib.DateTime('20050210T11:41:23'))
264        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
265              datetime.datetime(2005, 2, 10, 11, 41, 23),
266              use_builtin_types=True)
267        check('<array><data>'
268              '<value><int>1</int></value><value><int>2</int></value>'
269              '</data></array>', [1, 2])
270        check('<struct>'
271              '<member><name>b</name><value><int>2</int></value></member>'
272              '<member><name>a</name><value><int>1</int></value></member>'
273              '</struct>', {'a': 1, 'b': 2})
274
275    def test_load_extension_types(self):
276        check = self.check_loads
277        check('<nil/>', None)
278        check('<ex:nil/>', None)
279        check('<i1>205</i1>', 205)
280        check('<i2>20561</i2>', 20561)
281        check('<i8>9876543210</i8>', 9876543210)
282        check('<biginteger>98765432100123456789</biginteger>',
283              98765432100123456789)
284        check('<float>93.78125</float>', 93.78125)
285        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
286              decimal.Decimal('9876543210.0123456789'))
287
288    def test_limit_int(self):
289        check = self.check_loads
290        maxdigits = 5000
291        with support.adjust_int_max_str_digits(maxdigits):
292            s = '1' * (maxdigits + 1)
293            with self.assertRaises(ValueError):
294                check(f'<int>{s}</int>', None)
295            with self.assertRaises(ValueError):
296                check(f'<biginteger>{s}</biginteger>', None)
297
298    def test_get_host_info(self):
299        # see bug #3613, this raised a TypeError
300        transp = xmlrpc.client.Transport()
301        self.assertEqual(transp.get_host_info("user@host.tld"),
302                          ('host.tld',
303                           [('Authorization', 'Basic dXNlcg==')], {}))
304
305    def test_ssl_presence(self):
306        try:
307            import ssl
308        except ImportError:
309            has_ssl = False
310        else:
311            has_ssl = True
312        try:
313            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
314        except NotImplementedError:
315            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
316        except OSError:
317            self.assertTrue(has_ssl)
318
319    def test_keepalive_disconnect(self):
320        class RequestHandler(http.server.BaseHTTPRequestHandler):
321            protocol_version = "HTTP/1.1"
322            handled = False
323
324            def do_POST(self):
325                length = int(self.headers.get("Content-Length"))
326                self.rfile.read(length)
327                if self.handled:
328                    self.close_connection = True
329                    return
330                response = xmlrpclib.dumps((5,), methodresponse=True)
331                response = response.encode()
332                self.send_response(http.HTTPStatus.OK)
333                self.send_header("Content-Length", len(response))
334                self.end_headers()
335                self.wfile.write(response)
336                self.handled = True
337                self.close_connection = False
338
339            def log_message(self, format, *args):
340                # don't clobber sys.stderr
341                pass
342
343        def run_server():
344            server.socket.settimeout(float(1))  # Don't hang if client fails
345            server.handle_request()  # First request and attempt at second
346            server.handle_request()  # Retried second request
347
348        server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
349        self.addCleanup(server.server_close)
350        thread = threading.Thread(target=run_server)
351        thread.start()
352        self.addCleanup(thread.join)
353        url = "http://{}:{}/".format(*server.server_address)
354        with xmlrpclib.ServerProxy(url) as p:
355            self.assertEqual(p.method(), 5)
356            self.assertEqual(p.method(), 5)
357
358
359class SimpleXMLRPCDispatcherTestCase(unittest.TestCase):
360    class DispatchExc(Exception):
361        """Raised inside the dispatched functions when checking for
362        chained exceptions"""
363
364    def test_call_registered_func(self):
365        """Calls explicitly registered function"""
366        # Makes sure any exception raised inside the function has no other
367        # exception chained to it
368
369        exp_params = 1, 2, 3
370
371        def dispatched_func(*params):
372            raise self.DispatchExc(params)
373
374        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
375        dispatcher.register_function(dispatched_func)
376        with self.assertRaises(self.DispatchExc) as exc_ctx:
377            dispatcher._dispatch('dispatched_func', exp_params)
378        self.assertEqual(exc_ctx.exception.args, (exp_params,))
379        self.assertIsNone(exc_ctx.exception.__cause__)
380        self.assertIsNone(exc_ctx.exception.__context__)
381
382    def test_call_instance_func(self):
383        """Calls a registered instance attribute as a function"""
384        # Makes sure any exception raised inside the function has no other
385        # exception chained to it
386
387        exp_params = 1, 2, 3
388
389        class DispatchedClass:
390            def dispatched_func(self, *params):
391                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
392
393        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
394        dispatcher.register_instance(DispatchedClass())
395        with self.assertRaises(self.DispatchExc) as exc_ctx:
396            dispatcher._dispatch('dispatched_func', exp_params)
397        self.assertEqual(exc_ctx.exception.args, (exp_params,))
398        self.assertIsNone(exc_ctx.exception.__cause__)
399        self.assertIsNone(exc_ctx.exception.__context__)
400
401    def test_call_dispatch_func(self):
402        """Calls the registered instance's `_dispatch` function"""
403        # Makes sure any exception raised inside the function has no other
404        # exception chained to it
405
406        exp_method = 'method'
407        exp_params = 1, 2, 3
408
409        class TestInstance:
410            def _dispatch(self, method, params):
411                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
412                    method, params)
413
414        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
415        dispatcher.register_instance(TestInstance())
416        with self.assertRaises(self.DispatchExc) as exc_ctx:
417            dispatcher._dispatch(exp_method, exp_params)
418        self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
419        self.assertIsNone(exc_ctx.exception.__cause__)
420        self.assertIsNone(exc_ctx.exception.__context__)
421
422    def test_registered_func_is_none(self):
423        """Calls explicitly registered function which is None"""
424
425        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
426        dispatcher.register_function(None, name='method')
427        with self.assertRaisesRegex(Exception, 'method'):
428            dispatcher._dispatch('method', ('param',))
429
430    def test_instance_has_no_func(self):
431        """Attempts to call nonexistent function on a registered instance"""
432
433        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
434        dispatcher.register_instance(object())
435        with self.assertRaisesRegex(Exception, 'method'):
436            dispatcher._dispatch('method', ('param',))
437
438    def test_cannot_locate_func(self):
439        """Calls a function that the dispatcher cannot locate"""
440
441        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
442        with self.assertRaisesRegex(Exception, 'method'):
443            dispatcher._dispatch('method', ('param',))
444
445
446class HelperTestCase(unittest.TestCase):
447    def test_escape(self):
448        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
449        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
450        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
451
452class FaultTestCase(unittest.TestCase):
453    def test_repr(self):
454        f = xmlrpclib.Fault(42, 'Test Fault')
455        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
456        self.assertEqual(repr(f), str(f))
457
458    def test_dump_fault(self):
459        f = xmlrpclib.Fault(42, 'Test Fault')
460        s = xmlrpclib.dumps((f,))
461        (newf,), m = xmlrpclib.loads(s)
462        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
463        self.assertEqual(m, None)
464
465        s = xmlrpclib.Marshaller().dumps(f)
466        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
467
468    def test_dotted_attribute(self):
469        # this will raise AttributeError because code don't want us to use
470        # private methods
471        self.assertRaises(AttributeError,
472                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
473        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
474
475class DateTimeTestCase(unittest.TestCase):
476    def test_default(self):
477        with mock.patch('time.localtime') as localtime_mock:
478            time_struct = time.struct_time(
479                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
480            localtime_mock.return_value = time_struct
481            localtime = time.localtime()
482            t = xmlrpclib.DateTime()
483            self.assertEqual(str(t),
484                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
485
486    def test_time(self):
487        d = 1181399930.036952
488        t = xmlrpclib.DateTime(d)
489        self.assertEqual(str(t),
490                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
491
492    def test_time_tuple(self):
493        d = (2007,6,9,10,38,50,5,160,0)
494        t = xmlrpclib.DateTime(d)
495        self.assertEqual(str(t), '20070609T10:38:50')
496
497    def test_time_struct(self):
498        d = time.localtime(1181399930.036952)
499        t = xmlrpclib.DateTime(d)
500        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
501
502    def test_datetime_datetime(self):
503        d = datetime.datetime(2007,1,2,3,4,5)
504        t = xmlrpclib.DateTime(d)
505        self.assertEqual(str(t), '20070102T03:04:05')
506
507    def test_repr(self):
508        d = datetime.datetime(2007,1,2,3,4,5)
509        t = xmlrpclib.DateTime(d)
510        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
511        self.assertEqual(repr(t), val)
512
513    def test_decode(self):
514        d = ' 20070908T07:11:13  '
515        t1 = xmlrpclib.DateTime()
516        t1.decode(d)
517        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
518        self.assertEqual(t1, tref)
519
520        t2 = xmlrpclib._datetime(d)
521        self.assertEqual(t2, tref)
522
523    def test_comparison(self):
524        now = datetime.datetime.now()
525        dtime = xmlrpclib.DateTime(now.timetuple())
526
527        # datetime vs. DateTime
528        self.assertTrue(dtime == now)
529        self.assertTrue(now == dtime)
530        then = now + datetime.timedelta(seconds=4)
531        self.assertTrue(then >= dtime)
532        self.assertTrue(dtime < then)
533
534        # str vs. DateTime
535        dstr = now.strftime("%Y%m%dT%H:%M:%S")
536        self.assertTrue(dtime == dstr)
537        self.assertTrue(dstr == dtime)
538        dtime_then = xmlrpclib.DateTime(then.timetuple())
539        self.assertTrue(dtime_then >= dstr)
540        self.assertTrue(dstr < dtime_then)
541
542        # some other types
543        dbytes = dstr.encode('ascii')
544        dtuple = now.timetuple()
545        self.assertFalse(dtime == 1970)
546        self.assertTrue(dtime != dbytes)
547        self.assertFalse(dtime == bytearray(dbytes))
548        self.assertTrue(dtime != dtuple)
549        with self.assertRaises(TypeError):
550            dtime < float(1970)
551        with self.assertRaises(TypeError):
552            dtime > dbytes
553        with self.assertRaises(TypeError):
554            dtime <= bytearray(dbytes)
555        with self.assertRaises(TypeError):
556            dtime >= dtuple
557
558        self.assertTrue(dtime == ALWAYS_EQ)
559        self.assertFalse(dtime != ALWAYS_EQ)
560        self.assertTrue(dtime < LARGEST)
561        self.assertFalse(dtime > LARGEST)
562        self.assertTrue(dtime <= LARGEST)
563        self.assertFalse(dtime >= LARGEST)
564        self.assertFalse(dtime < SMALLEST)
565        self.assertTrue(dtime > SMALLEST)
566        self.assertFalse(dtime <= SMALLEST)
567        self.assertTrue(dtime >= SMALLEST)
568
569
570class BinaryTestCase(unittest.TestCase):
571
572    # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
573    # for now (i.e. interpreting the binary data as Latin-1-encoded
574    # text).  But this feels very unsatisfactory.  Perhaps we should
575    # only define repr(), and return r"Binary(b'\xff')" instead?
576
577    def test_default(self):
578        t = xmlrpclib.Binary()
579        self.assertEqual(str(t), '')
580
581    def test_string(self):
582        d = b'\x01\x02\x03abc123\xff\xfe'
583        t = xmlrpclib.Binary(d)
584        self.assertEqual(str(t), str(d, "latin-1"))
585
586    def test_decode(self):
587        d = b'\x01\x02\x03abc123\xff\xfe'
588        de = base64.encodebytes(d)
589        t1 = xmlrpclib.Binary()
590        t1.decode(de)
591        self.assertEqual(str(t1), str(d, "latin-1"))
592
593        t2 = xmlrpclib._binary(de)
594        self.assertEqual(str(t2), str(d, "latin-1"))
595
596
597ADDR = PORT = URL = None
598
599# The evt is set twice.  First when the server is ready to serve.
600# Second when the server has been shutdown.  The user must clear
601# the event after it has been set the first time to catch the second set.
602def http_server(evt, numrequests, requestHandler=None, encoding=None):
603    class TestInstanceClass:
604        def div(self, x, y):
605            return x // y
606
607        def _methodHelp(self, name):
608            if name == 'div':
609                return 'This is the div function'
610
611        class Fixture:
612            @staticmethod
613            def getData():
614                return '42'
615
616    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
617        def get_request(self):
618            # Ensure the socket is always non-blocking.  On Linux, socket
619            # attributes are not inherited like they are on *BSD and Windows.
620            s, port = self.socket.accept()
621            s.setblocking(True)
622            return s, port
623
624    if not requestHandler:
625        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
626    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
627                          encoding=encoding,
628                          logRequests=False, bind_and_activate=False)
629    try:
630        serv.server_bind()
631        global ADDR, PORT, URL
632        ADDR, PORT = serv.socket.getsockname()
633        #connect to IP address directly.  This avoids socket.create_connection()
634        #trying to connect to "localhost" using all address families, which
635        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
636        #on AF_INET only.
637        URL = "http://%s:%d"%(ADDR, PORT)
638        serv.server_activate()
639        serv.register_introspection_functions()
640        serv.register_multicall_functions()
641        serv.register_function(pow)
642        serv.register_function(lambda x: x, 'têšt')
643        @serv.register_function
644        def my_function():
645            '''This is my function'''
646            return True
647        @serv.register_function(name='add')
648        def _(x, y):
649            return x + y
650        testInstance = TestInstanceClass()
651        serv.register_instance(testInstance, allow_dotted_names=True)
652        evt.set()
653
654        # handle up to 'numrequests' requests
655        while numrequests > 0:
656            serv.handle_request()
657            numrequests -= 1
658
659    except socket.timeout:
660        pass
661    finally:
662        serv.socket.close()
663        PORT = None
664        evt.set()
665
666def http_multi_server(evt, numrequests, requestHandler=None):
667    class TestInstanceClass:
668        def div(self, x, y):
669            return x // y
670
671        def _methodHelp(self, name):
672            if name == 'div':
673                return 'This is the div function'
674
675    def my_function():
676        '''This is my function'''
677        return True
678
679    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
680        def get_request(self):
681            # Ensure the socket is always non-blocking.  On Linux, socket
682            # attributes are not inherited like they are on *BSD and Windows.
683            s, port = self.socket.accept()
684            s.setblocking(True)
685            return s, port
686
687    if not requestHandler:
688        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
689    class MyRequestHandler(requestHandler):
690        rpc_paths = []
691
692    class BrokenDispatcher:
693        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
694            raise RuntimeError("broken dispatcher")
695
696    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
697                          logRequests=False, bind_and_activate=False)
698    serv.socket.settimeout(3)
699    serv.server_bind()
700    try:
701        global ADDR, PORT, URL
702        ADDR, PORT = serv.socket.getsockname()
703        #connect to IP address directly.  This avoids socket.create_connection()
704        #trying to connect to "localhost" using all address families, which
705        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
706        #on AF_INET only.
707        URL = "http://%s:%d"%(ADDR, PORT)
708        serv.server_activate()
709        paths = ["/foo", "/foo/bar"]
710        for path in paths:
711            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
712            d.register_introspection_functions()
713            d.register_multicall_functions()
714        serv.get_dispatcher(paths[0]).register_function(pow)
715        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
716        serv.add_dispatcher("/is/broken", BrokenDispatcher())
717        evt.set()
718
719        # handle up to 'numrequests' requests
720        while numrequests > 0:
721            serv.handle_request()
722            numrequests -= 1
723
724    except socket.timeout:
725        pass
726    finally:
727        serv.socket.close()
728        PORT = None
729        evt.set()
730
731# This function prevents errors like:
732#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
733def is_unavailable_exception(e):
734    '''Returns True if the given ProtocolError is the product of a server-side
735       exception caused by the 'temporarily unavailable' response sometimes
736       given by operations on non-blocking sockets.'''
737
738    # sometimes we get a -1 error code and/or empty headers
739    try:
740        if e.errcode == -1 or e.headers is None:
741            return True
742        exc_mess = e.headers.get('X-exception')
743    except AttributeError:
744        # Ignore OSErrors here.
745        exc_mess = str(e)
746
747    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
748        return True
749
750def make_request_and_skipIf(condition, reason):
751    # If we skip the test, we have to make a request because
752    # the server created in setUp blocks expecting one to come in.
753    if not condition:
754        return lambda func: func
755    def decorator(func):
756        def make_request_and_skip(self):
757            try:
758                xmlrpclib.ServerProxy(URL).my_function()
759            except (xmlrpclib.ProtocolError, OSError) as e:
760                if not is_unavailable_exception(e):
761                    raise
762            raise unittest.SkipTest(reason)
763        return make_request_and_skip
764    return decorator
765
766class BaseServerTestCase(unittest.TestCase):
767    requestHandler = None
768    request_count = 1
769    threadFunc = staticmethod(http_server)
770
771    def setUp(self):
772        # enable traceback reporting
773        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
774
775        self.evt = threading.Event()
776        # start server thread to handle requests
777        serv_args = (self.evt, self.request_count, self.requestHandler)
778        thread = threading.Thread(target=self.threadFunc, args=serv_args)
779        thread.start()
780        self.addCleanup(thread.join)
781
782        # wait for the server to be ready
783        self.evt.wait()
784        self.evt.clear()
785
786    def tearDown(self):
787        # wait on the server thread to terminate
788        self.evt.wait()
789
790        # disable traceback reporting
791        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
792
793class SimpleServerTestCase(BaseServerTestCase):
794    def test_simple1(self):
795        try:
796            p = xmlrpclib.ServerProxy(URL)
797            self.assertEqual(p.pow(6,8), 6**8)
798        except (xmlrpclib.ProtocolError, OSError) as e:
799            # ignore failures due to non-blocking socket 'unavailable' errors
800            if not is_unavailable_exception(e):
801                # protocol error; provide additional information in test output
802                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
803
804    def test_nonascii(self):
805        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
806        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
807        try:
808            p = xmlrpclib.ServerProxy(URL)
809            self.assertEqual(p.add(start_string, end_string),
810                             start_string + end_string)
811        except (xmlrpclib.ProtocolError, OSError) as e:
812            # ignore failures due to non-blocking socket 'unavailable' errors
813            if not is_unavailable_exception(e):
814                # protocol error; provide additional information in test output
815                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
816
817    def test_client_encoding(self):
818        start_string = '\u20ac'
819        end_string = '\xa4'
820
821        try:
822            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
823            self.assertEqual(p.add(start_string, end_string),
824                             start_string + end_string)
825        except (xmlrpclib.ProtocolError, socket.error) as e:
826            # ignore failures due to non-blocking socket unavailable errors.
827            if not is_unavailable_exception(e):
828                # protocol error; provide additional information in test output
829                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
830
831    def test_nonascii_methodname(self):
832        try:
833            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
834            self.assertEqual(p.têšt(42), 42)
835        except (xmlrpclib.ProtocolError, socket.error) as e:
836            # ignore failures due to non-blocking socket unavailable errors.
837            if not is_unavailable_exception(e):
838                # protocol error; provide additional information in test output
839                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
840
841    def test_404(self):
842        # send POST with http.client, it should return 404 header and
843        # 'Not Found' message.
844        with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
845            conn.request('POST', '/this-is-not-valid')
846            response = conn.getresponse()
847
848        self.assertEqual(response.status, 404)
849        self.assertEqual(response.reason, 'Not Found')
850
851    def test_introspection1(self):
852        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
853                                'system.listMethods', 'system.methodHelp',
854                                'system.methodSignature', 'system.multicall',
855                                'Fixture'])
856        try:
857            p = xmlrpclib.ServerProxy(URL)
858            meth = p.system.listMethods()
859            self.assertEqual(set(meth), expected_methods)
860        except (xmlrpclib.ProtocolError, OSError) as e:
861            # ignore failures due to non-blocking socket 'unavailable' errors
862            if not is_unavailable_exception(e):
863                # protocol error; provide additional information in test output
864                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
865
866
867    def test_introspection2(self):
868        try:
869            # test _methodHelp()
870            p = xmlrpclib.ServerProxy(URL)
871            divhelp = p.system.methodHelp('div')
872            self.assertEqual(divhelp, 'This is the div function')
873        except (xmlrpclib.ProtocolError, OSError) as e:
874            # ignore failures due to non-blocking socket 'unavailable' errors
875            if not is_unavailable_exception(e):
876                # protocol error; provide additional information in test output
877                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
878
879    @make_request_and_skipIf(sys.flags.optimize >= 2,
880                     "Docstrings are omitted with -O2 and above")
881    def test_introspection3(self):
882        try:
883            # test native doc
884            p = xmlrpclib.ServerProxy(URL)
885            myfunction = p.system.methodHelp('my_function')
886            self.assertEqual(myfunction, 'This is my function')
887        except (xmlrpclib.ProtocolError, OSError) as e:
888            # ignore failures due to non-blocking socket 'unavailable' errors
889            if not is_unavailable_exception(e):
890                # protocol error; provide additional information in test output
891                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
892
893    def test_introspection4(self):
894        # the SimpleXMLRPCServer doesn't support signatures, but
895        # at least check that we can try making the call
896        try:
897            p = xmlrpclib.ServerProxy(URL)
898            divsig = p.system.methodSignature('div')
899            self.assertEqual(divsig, 'signatures not supported')
900        except (xmlrpclib.ProtocolError, OSError) as e:
901            # ignore failures due to non-blocking socket 'unavailable' errors
902            if not is_unavailable_exception(e):
903                # protocol error; provide additional information in test output
904                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
905
906    def test_multicall(self):
907        try:
908            p = xmlrpclib.ServerProxy(URL)
909            multicall = xmlrpclib.MultiCall(p)
910            multicall.add(2,3)
911            multicall.pow(6,8)
912            multicall.div(127,42)
913            add_result, pow_result, div_result = multicall()
914            self.assertEqual(add_result, 2+3)
915            self.assertEqual(pow_result, 6**8)
916            self.assertEqual(div_result, 127//42)
917        except (xmlrpclib.ProtocolError, OSError) as e:
918            # ignore failures due to non-blocking socket 'unavailable' errors
919            if not is_unavailable_exception(e):
920                # protocol error; provide additional information in test output
921                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
922
923    def test_non_existing_multicall(self):
924        try:
925            p = xmlrpclib.ServerProxy(URL)
926            multicall = xmlrpclib.MultiCall(p)
927            multicall.this_is_not_exists()
928            result = multicall()
929
930            # result.results contains;
931            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
932            #   'method "this_is_not_exists" is not supported'>}]
933
934            self.assertEqual(result.results[0]['faultCode'], 1)
935            self.assertEqual(result.results[0]['faultString'],
936                '<class \'Exception\'>:method "this_is_not_exists" '
937                'is not supported')
938        except (xmlrpclib.ProtocolError, OSError) as e:
939            # ignore failures due to non-blocking socket 'unavailable' errors
940            if not is_unavailable_exception(e):
941                # protocol error; provide additional information in test output
942                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
943
944    def test_dotted_attribute(self):
945        # Raises an AttributeError because private methods are not allowed.
946        self.assertRaises(AttributeError,
947                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
948
949        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
950        # Get the test to run faster by sending a request with test_simple1.
951        # This avoids waiting for the socket timeout.
952        self.test_simple1()
953
954    def test_allow_dotted_names_true(self):
955        # XXX also need allow_dotted_names_false test.
956        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
957        data = server.Fixture.getData()
958        self.assertEqual(data, '42')
959
960    def test_unicode_host(self):
961        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
962        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
963
964    def test_partial_post(self):
965        # Check that a partial POST doesn't make the server loop: issue #14001.
966        with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
967            conn.send('POST /RPC2 HTTP/1.0\r\n'
968                      'Content-Length: 100\r\n\r\n'
969                      'bye HTTP/1.1\r\n'
970                      f'Host: {ADDR}:{PORT}\r\n'
971                      'Accept-Encoding: identity\r\n'
972                      'Content-Length: 0\r\n\r\n'.encode('ascii'))
973
974    def test_context_manager(self):
975        with xmlrpclib.ServerProxy(URL) as server:
976            server.add(2, 3)
977            self.assertNotEqual(server('transport')._connection,
978                                (None, None))
979        self.assertEqual(server('transport')._connection,
980                         (None, None))
981
982    def test_context_manager_method_error(self):
983        try:
984            with xmlrpclib.ServerProxy(URL) as server:
985                server.add(2, "a")
986        except xmlrpclib.Fault:
987            pass
988        self.assertEqual(server('transport')._connection,
989                         (None, None))
990
991
992class SimpleServerEncodingTestCase(BaseServerTestCase):
993    @staticmethod
994    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
995        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
996
997    def test_server_encoding(self):
998        start_string = '\u20ac'
999        end_string = '\xa4'
1000
1001        try:
1002            p = xmlrpclib.ServerProxy(URL)
1003            self.assertEqual(p.add(start_string, end_string),
1004                             start_string + end_string)
1005        except (xmlrpclib.ProtocolError, socket.error) as e:
1006            # ignore failures due to non-blocking socket unavailable errors.
1007            if not is_unavailable_exception(e):
1008                # protocol error; provide additional information in test output
1009                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1010
1011
1012class MultiPathServerTestCase(BaseServerTestCase):
1013    threadFunc = staticmethod(http_multi_server)
1014    request_count = 2
1015    def test_path1(self):
1016        p = xmlrpclib.ServerProxy(URL+"/foo")
1017        self.assertEqual(p.pow(6,8), 6**8)
1018        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1019
1020    def test_path2(self):
1021        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
1022        self.assertEqual(p.add(6,8), 6+8)
1023        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1024
1025    def test_path3(self):
1026        p = xmlrpclib.ServerProxy(URL+"/is/broken")
1027        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1028
1029#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1030#does indeed serve subsequent requests on the same connection
1031class BaseKeepaliveServerTestCase(BaseServerTestCase):
1032    #a request handler that supports keep-alive and logs requests into a
1033    #class variable
1034    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1035        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1036        protocol_version = 'HTTP/1.1'
1037        myRequests = []
1038        def handle(self):
1039            self.myRequests.append([])
1040            self.reqidx = len(self.myRequests)-1
1041            return self.parentClass.handle(self)
1042        def handle_one_request(self):
1043            result = self.parentClass.handle_one_request(self)
1044            self.myRequests[self.reqidx].append(self.raw_requestline)
1045            return result
1046
1047    requestHandler = RequestHandler
1048    def setUp(self):
1049        #clear request log
1050        self.RequestHandler.myRequests = []
1051        return BaseServerTestCase.setUp(self)
1052
1053#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1054#does indeed serve subsequent requests on the same connection
1055class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
1056    def test_two(self):
1057        p = xmlrpclib.ServerProxy(URL)
1058        #do three requests.
1059        self.assertEqual(p.pow(6,8), 6**8)
1060        self.assertEqual(p.pow(6,8), 6**8)
1061        self.assertEqual(p.pow(6,8), 6**8)
1062        p("close")()
1063
1064        #they should have all been handled by a single request handler
1065        self.assertEqual(len(self.RequestHandler.myRequests), 1)
1066
1067        #check that we did at least two (the third may be pending append
1068        #due to thread scheduling)
1069        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1070
1071
1072#test special attribute access on the serverproxy, through the __call__
1073#function.
1074class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
1075    #ask for two keepalive requests to be handled.
1076    request_count=2
1077
1078    def test_close(self):
1079        p = xmlrpclib.ServerProxy(URL)
1080        #do some requests with close.
1081        self.assertEqual(p.pow(6,8), 6**8)
1082        self.assertEqual(p.pow(6,8), 6**8)
1083        self.assertEqual(p.pow(6,8), 6**8)
1084        p("close")() #this should trigger a new keep-alive request
1085        self.assertEqual(p.pow(6,8), 6**8)
1086        self.assertEqual(p.pow(6,8), 6**8)
1087        self.assertEqual(p.pow(6,8), 6**8)
1088        p("close")()
1089
1090        #they should have all been two request handlers, each having logged at least
1091        #two complete requests
1092        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1093        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1094        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1095
1096
1097    def test_transport(self):
1098        p = xmlrpclib.ServerProxy(URL)
1099        #do some requests with close.
1100        self.assertEqual(p.pow(6,8), 6**8)
1101        p("transport").close() #same as above, really.
1102        self.assertEqual(p.pow(6,8), 6**8)
1103        p("close")()
1104        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1105
1106#A test case that verifies that gzip encoding works in both directions
1107#(for a request and the response)
1108@unittest.skipIf(gzip is None, 'requires gzip')
1109class GzipServerTestCase(BaseServerTestCase):
1110    #a request handler that supports keep-alive and logs requests into a
1111    #class variable
1112    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1113        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1114        protocol_version = 'HTTP/1.1'
1115
1116        def do_POST(self):
1117            #store content of last request in class
1118            self.__class__.content_length = int(self.headers["content-length"])
1119            return self.parentClass.do_POST(self)
1120    requestHandler = RequestHandler
1121
1122    class Transport(xmlrpclib.Transport):
1123        #custom transport, stores the response length for our perusal
1124        fake_gzip = False
1125        def parse_response(self, response):
1126            self.response_length=int(response.getheader("content-length", 0))
1127            return xmlrpclib.Transport.parse_response(self, response)
1128
1129        def send_content(self, connection, body):
1130            if self.fake_gzip:
1131                #add a lone gzip header to induce decode error remotely
1132                connection.putheader("Content-Encoding", "gzip")
1133            return xmlrpclib.Transport.send_content(self, connection, body)
1134
1135    def setUp(self):
1136        BaseServerTestCase.setUp(self)
1137
1138    def test_gzip_request(self):
1139        t = self.Transport()
1140        t.encode_threshold = None
1141        p = xmlrpclib.ServerProxy(URL, transport=t)
1142        self.assertEqual(p.pow(6,8), 6**8)
1143        a = self.RequestHandler.content_length
1144        t.encode_threshold = 0 #turn on request encoding
1145        self.assertEqual(p.pow(6,8), 6**8)
1146        b = self.RequestHandler.content_length
1147        self.assertTrue(a>b)
1148        p("close")()
1149
1150    def test_bad_gzip_request(self):
1151        t = self.Transport()
1152        t.encode_threshold = None
1153        t.fake_gzip = True
1154        p = xmlrpclib.ServerProxy(URL, transport=t)
1155        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1156                                    re.compile(r"\b400\b"))
1157        with cm:
1158            p.pow(6, 8)
1159        p("close")()
1160
1161    def test_gzip_response(self):
1162        t = self.Transport()
1163        p = xmlrpclib.ServerProxy(URL, transport=t)
1164        old = self.requestHandler.encode_threshold
1165        self.requestHandler.encode_threshold = None #no encoding
1166        self.assertEqual(p.pow(6,8), 6**8)
1167        a = t.response_length
1168        self.requestHandler.encode_threshold = 0 #always encode
1169        self.assertEqual(p.pow(6,8), 6**8)
1170        p("close")()
1171        b = t.response_length
1172        self.requestHandler.encode_threshold = old
1173        self.assertTrue(a>b)
1174
1175
1176@unittest.skipIf(gzip is None, 'requires gzip')
1177class GzipUtilTestCase(unittest.TestCase):
1178
1179    def test_gzip_decode_limit(self):
1180        max_gzip_decode = 20 * 1024 * 1024
1181        data = b'\0' * max_gzip_decode
1182        encoded = xmlrpclib.gzip_encode(data)
1183        decoded = xmlrpclib.gzip_decode(encoded)
1184        self.assertEqual(len(decoded), max_gzip_decode)
1185
1186        data = b'\0' * (max_gzip_decode + 1)
1187        encoded = xmlrpclib.gzip_encode(data)
1188
1189        with self.assertRaisesRegex(ValueError,
1190                                    "max gzipped payload length exceeded"):
1191            xmlrpclib.gzip_decode(encoded)
1192
1193        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1194
1195
1196class HeadersServerTestCase(BaseServerTestCase):
1197    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1198        test_headers = None
1199
1200        def do_POST(self):
1201            self.__class__.test_headers = self.headers
1202            return super().do_POST()
1203    requestHandler = RequestHandler
1204    standard_headers = [
1205        'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
1206        'Content-Length']
1207
1208    def setUp(self):
1209        self.RequestHandler.test_headers = None
1210        return super().setUp()
1211
1212    def assertContainsAdditionalHeaders(self, headers, additional):
1213        expected_keys = sorted(self.standard_headers + list(additional.keys()))
1214        self.assertListEqual(sorted(headers.keys()), expected_keys)
1215
1216        for key, value in additional.items():
1217            self.assertEqual(headers.get(key), value)
1218
1219    def test_header(self):
1220        p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
1221        self.assertEqual(p.pow(6, 8), 6**8)
1222
1223        headers = self.RequestHandler.test_headers
1224        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1225
1226    def test_header_many(self):
1227        p = xmlrpclib.ServerProxy(
1228            URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
1229        self.assertEqual(p.pow(6, 8), 6**8)
1230
1231        headers = self.RequestHandler.test_headers
1232        self.assertContainsAdditionalHeaders(
1233            headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
1234
1235    def test_header_empty(self):
1236        p = xmlrpclib.ServerProxy(URL, headers=[])
1237        self.assertEqual(p.pow(6, 8), 6**8)
1238
1239        headers = self.RequestHandler.test_headers
1240        self.assertContainsAdditionalHeaders(headers, {})
1241
1242    def test_header_tuple(self):
1243        p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
1244        self.assertEqual(p.pow(6, 8), 6**8)
1245
1246        headers = self.RequestHandler.test_headers
1247        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1248
1249    def test_header_items(self):
1250        p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
1251        self.assertEqual(p.pow(6, 8), 6**8)
1252
1253        headers = self.RequestHandler.test_headers
1254        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1255
1256
1257#Test special attributes of the ServerProxy object
1258class ServerProxyTestCase(unittest.TestCase):
1259    def setUp(self):
1260        unittest.TestCase.setUp(self)
1261        # Actual value of the URL doesn't matter if it is a string in
1262        # the correct format.
1263        self.url = 'http://fake.localhost'
1264
1265    def test_close(self):
1266        p = xmlrpclib.ServerProxy(self.url)
1267        self.assertEqual(p('close')(), None)
1268
1269    def test_transport(self):
1270        t = xmlrpclib.Transport()
1271        p = xmlrpclib.ServerProxy(self.url, transport=t)
1272        self.assertEqual(p('transport'), t)
1273
1274
1275# This is a contrived way to make a failure occur on the server side
1276# in order to test the _send_traceback_header flag on the server
1277class FailingMessageClass(http.client.HTTPMessage):
1278    def get(self, key, failobj=None):
1279        key = key.lower()
1280        if key == 'content-length':
1281            return 'I am broken'
1282        return super().get(key, failobj)
1283
1284
1285class FailingServerTestCase(unittest.TestCase):
1286    def setUp(self):
1287        self.evt = threading.Event()
1288        # start server thread to handle requests
1289        serv_args = (self.evt, 1)
1290        thread = threading.Thread(target=http_server, args=serv_args)
1291        thread.start()
1292        self.addCleanup(thread.join)
1293
1294        # wait for the server to be ready
1295        self.evt.wait()
1296        self.evt.clear()
1297
1298    def tearDown(self):
1299        # wait on the server thread to terminate
1300        self.evt.wait()
1301        # reset flag
1302        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1303        # reset message class
1304        default_class = http.client.HTTPMessage
1305        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1306
1307    def test_basic(self):
1308        # check that flag is false by default
1309        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1310        self.assertEqual(flagval, False)
1311
1312        # enable traceback reporting
1313        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1314
1315        # test a call that shouldn't fail just as a smoke test
1316        try:
1317            p = xmlrpclib.ServerProxy(URL)
1318            self.assertEqual(p.pow(6,8), 6**8)
1319        except (xmlrpclib.ProtocolError, OSError) as e:
1320            # ignore failures due to non-blocking socket 'unavailable' errors
1321            if not is_unavailable_exception(e):
1322                # protocol error; provide additional information in test output
1323                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1324
1325    def test_fail_no_info(self):
1326        # use the broken message class
1327        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1328
1329        try:
1330            p = xmlrpclib.ServerProxy(URL)
1331            p.pow(6,8)
1332        except (xmlrpclib.ProtocolError, OSError) as e:
1333            # ignore failures due to non-blocking socket 'unavailable' errors
1334            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1335                # The two server-side error headers shouldn't be sent back in this case
1336                self.assertTrue(e.headers.get("X-exception") is None)
1337                self.assertTrue(e.headers.get("X-traceback") is None)
1338        else:
1339            self.fail('ProtocolError not raised')
1340
1341    def test_fail_with_info(self):
1342        # use the broken message class
1343        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1344
1345        # Check that errors in the server send back exception/traceback
1346        # info when flag is set
1347        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1348
1349        try:
1350            p = xmlrpclib.ServerProxy(URL)
1351            p.pow(6,8)
1352        except (xmlrpclib.ProtocolError, OSError) as e:
1353            # ignore failures due to non-blocking socket 'unavailable' errors
1354            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1355                # We should get error info in the response
1356                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1357                self.assertEqual(e.headers.get("X-exception"), expected_err)
1358                self.assertTrue(e.headers.get("X-traceback") is not None)
1359        else:
1360            self.fail('ProtocolError not raised')
1361
1362
1363@contextlib.contextmanager
1364def captured_stdout(encoding='utf-8'):
1365    """A variation on support.captured_stdout() which gives a text stream
1366    having a `buffer` attribute.
1367    """
1368    orig_stdout = sys.stdout
1369    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1370    try:
1371        yield sys.stdout
1372    finally:
1373        sys.stdout = orig_stdout
1374
1375
1376class CGIHandlerTestCase(unittest.TestCase):
1377    def setUp(self):
1378        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1379
1380    def tearDown(self):
1381        self.cgi = None
1382
1383    def test_cgi_get(self):
1384        with support.EnvironmentVarGuard() as env:
1385            env['REQUEST_METHOD'] = 'GET'
1386            # if the method is GET and no request_text is given, it runs handle_get
1387            # get sysout output
1388            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1389                self.cgi.handle_request()
1390
1391            # parse Status header
1392            data_out.seek(0)
1393            handle = data_out.read()
1394            status = handle.split()[1]
1395            message = ' '.join(handle.split()[2:4])
1396
1397            self.assertEqual(status, '400')
1398            self.assertEqual(message, 'Bad Request')
1399
1400
1401    def test_cgi_xmlrpc_response(self):
1402        data = """<?xml version='1.0'?>
1403        <methodCall>
1404            <methodName>test_method</methodName>
1405            <params>
1406                <param>
1407                    <value><string>foo</string></value>
1408                </param>
1409                <param>
1410                    <value><string>bar</string></value>
1411                </param>
1412            </params>
1413        </methodCall>
1414        """
1415
1416        with support.EnvironmentVarGuard() as env, \
1417             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1418             support.captured_stdin() as data_in:
1419            data_in.write(data)
1420            data_in.seek(0)
1421            env['CONTENT_LENGTH'] = str(len(data))
1422            self.cgi.handle_request()
1423        data_out.seek(0)
1424
1425        # will respond exception, if so, our goal is achieved ;)
1426        handle = data_out.read()
1427
1428        # start with 44th char so as not to get http header, we just
1429        # need only xml
1430        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1431
1432        # Also test the content-length returned  by handle_request
1433        # Using the same test method inorder to avoid all the datapassing
1434        # boilerplate code.
1435        # Test for bug: http://bugs.python.org/issue5040
1436
1437        content = handle[handle.find("<?xml"):]
1438
1439        self.assertEqual(
1440            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1441            len(content))
1442
1443
1444class UseBuiltinTypesTestCase(unittest.TestCase):
1445
1446    def test_use_builtin_types(self):
1447        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1448        # makes all dispatch of binary data as bytes instances, and all
1449        # dispatch of datetime argument as datetime.datetime instances.
1450        self.log = []
1451        expected_bytes = b"my dog has fleas"
1452        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1453        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1454        def foobar(*args):
1455            self.log.extend(args)
1456        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1457            allow_none=True, encoding=None, use_builtin_types=True)
1458        handler.register_function(foobar)
1459        handler._marshaled_dispatch(marshaled)
1460        self.assertEqual(len(self.log), 2)
1461        mybytes, mydate = self.log
1462        self.assertEqual(self.log, [expected_bytes, expected_date])
1463        self.assertIs(type(mydate), datetime.datetime)
1464        self.assertIs(type(mybytes), bytes)
1465
1466    def test_cgihandler_has_use_builtin_types_flag(self):
1467        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1468        self.assertTrue(handler.use_builtin_types)
1469
1470    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1471        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1472            use_builtin_types=True)
1473        server.server_close()
1474        self.assertTrue(server.use_builtin_types)
1475
1476
1477@support.reap_threads
1478def test_main():
1479    support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1480            BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
1481            SimpleServerTestCase, SimpleServerEncodingTestCase,
1482            KeepaliveServerTestCase1, KeepaliveServerTestCase2,
1483            GzipServerTestCase, GzipUtilTestCase, HeadersServerTestCase,
1484            MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
1485            CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase)
1486
1487
1488if __name__ == "__main__":
1489    test_main()
1490