• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import threading
14import re
15import io
16import contextlib
17from test import support
18
19try:
20    import gzip
21except ImportError:
22    gzip = None
23
24alist = [{'astring': 'foo@bar.baz.spam',
25          'afloat': 7283.43,
26          'anint': 2**20,
27          'ashortlong': 2,
28          'anotherlist': ['.zyx.41'],
29          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
30          'b64bytes': b"my dog has fleas",
31          'b64bytearray': bytearray(b"my dog has fleas"),
32          'boolean': False,
33          'unicode': '\u4000\u6000\u8000',
34          'ukey\u4000': 'regular value',
35          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
36          'datetime2': xmlrpclib.DateTime(
37                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
38          'datetime3': xmlrpclib.DateTime(
39                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
40          }]
41
42class XMLRPCTestCase(unittest.TestCase):
43
44    def test_dump_load(self):
45        dump = xmlrpclib.dumps((alist,))
46        load = xmlrpclib.loads(dump)
47        self.assertEqual(alist, load[0][0])
48
49    def test_dump_bare_datetime(self):
50        # This checks that an unwrapped datetime.date object can be handled
51        # by the marshalling code.  This can't be done via test_dump_load()
52        # since with use_builtin_types set to 1 the unmarshaller would create
53        # datetime objects for the 'datetime[123]' keys as well
54        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
55        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
56        s = xmlrpclib.dumps((dt,))
57
58        result, m = xmlrpclib.loads(s, use_builtin_types=True)
59        (newdt,) = result
60        self.assertEqual(newdt, dt)
61        self.assertIs(type(newdt), datetime.datetime)
62        self.assertIsNone(m)
63
64        result, m = xmlrpclib.loads(s, use_builtin_types=False)
65        (newdt,) = result
66        self.assertEqual(newdt, dt)
67        self.assertIs(type(newdt), xmlrpclib.DateTime)
68        self.assertIsNone(m)
69
70        result, m = xmlrpclib.loads(s, use_datetime=True)
71        (newdt,) = result
72        self.assertEqual(newdt, dt)
73        self.assertIs(type(newdt), datetime.datetime)
74        self.assertIsNone(m)
75
76        result, m = xmlrpclib.loads(s, use_datetime=False)
77        (newdt,) = result
78        self.assertEqual(newdt, dt)
79        self.assertIs(type(newdt), xmlrpclib.DateTime)
80        self.assertIsNone(m)
81
82
83    def test_datetime_before_1900(self):
84        # same as before but with a date before 1900
85        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
86        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
87        s = xmlrpclib.dumps((dt,))
88
89        result, m = xmlrpclib.loads(s, use_builtin_types=True)
90        (newdt,) = result
91        self.assertEqual(newdt, dt)
92        self.assertIs(type(newdt), datetime.datetime)
93        self.assertIsNone(m)
94
95        result, m = xmlrpclib.loads(s, use_builtin_types=False)
96        (newdt,) = result
97        self.assertEqual(newdt, dt)
98        self.assertIs(type(newdt), xmlrpclib.DateTime)
99        self.assertIsNone(m)
100
101    def test_bug_1164912 (self):
102        d = xmlrpclib.DateTime()
103        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
104                                            methodresponse=True))
105        self.assertIsInstance(new_d.value, str)
106
107        # Check that the output of dumps() is still an 8-bit string
108        s = xmlrpclib.dumps((new_d,), methodresponse=True)
109        self.assertIsInstance(s, str)
110
111    def test_newstyle_class(self):
112        class T(object):
113            pass
114        t = T()
115        t.x = 100
116        t.y = "Hello"
117        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
118        self.assertEqual(t2, t.__dict__)
119
120    def test_dump_big_long(self):
121        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
122
123    def test_dump_bad_dict(self):
124        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
125
126    def test_dump_recursive_seq(self):
127        l = [1,2,3]
128        t = [3,4,5,l]
129        l.append(t)
130        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
131
132    def test_dump_recursive_dict(self):
133        d = {'1':1, '2':1}
134        t = {'3':3, 'd':d}
135        d['t'] = t
136        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
137
138    def test_dump_big_int(self):
139        if sys.maxsize > 2**31-1:
140            self.assertRaises(OverflowError, xmlrpclib.dumps,
141                              (int(2**34),))
142
143        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
144        self.assertRaises(OverflowError, xmlrpclib.dumps,
145                          (xmlrpclib.MAXINT+1,))
146        self.assertRaises(OverflowError, xmlrpclib.dumps,
147                          (xmlrpclib.MININT-1,))
148
149        def dummy_write(s):
150            pass
151
152        m = xmlrpclib.Marshaller()
153        m.dump_int(xmlrpclib.MAXINT, dummy_write)
154        m.dump_int(xmlrpclib.MININT, dummy_write)
155        self.assertRaises(OverflowError, m.dump_int,
156                          xmlrpclib.MAXINT+1, dummy_write)
157        self.assertRaises(OverflowError, m.dump_int,
158                          xmlrpclib.MININT-1, dummy_write)
159
160    def test_dump_double(self):
161        xmlrpclib.dumps((float(2 ** 34),))
162        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
163                         float(xmlrpclib.MININT)))
164        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
165                         float(xmlrpclib.MININT - 42)))
166
167        def dummy_write(s):
168            pass
169
170        m = xmlrpclib.Marshaller()
171        m.dump_double(xmlrpclib.MAXINT, dummy_write)
172        m.dump_double(xmlrpclib.MININT, dummy_write)
173        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
174        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
175
176    def test_dump_none(self):
177        value = alist + [None]
178        arg1 = (alist + [None],)
179        strg = xmlrpclib.dumps(arg1, allow_none=True)
180        self.assertEqual(value,
181                          xmlrpclib.loads(strg)[0][0])
182        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
183
184    def test_dump_encoding(self):
185        value = {'key\u20ac\xa4':
186                 'value\u20ac\xa4'}
187        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
188        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
189        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
190        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
191        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
192
193        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
194                               methodresponse=True)
195        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
197        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
198
199        methodname = 'method\u20ac\xa4'
200        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
201                               methodname=methodname)
202        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
203        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
204
205    def test_dump_bytes(self):
206        sample = b"my dog has fleas"
207        self.assertEqual(sample, xmlrpclib.Binary(sample))
208        for type_ in bytes, bytearray, xmlrpclib.Binary:
209            value = type_(sample)
210            s = xmlrpclib.dumps((value,))
211
212            result, m = xmlrpclib.loads(s, use_builtin_types=True)
213            (newvalue,) = result
214            self.assertEqual(newvalue, sample)
215            self.assertIs(type(newvalue), bytes)
216            self.assertIsNone(m)
217
218            result, m = xmlrpclib.loads(s, use_builtin_types=False)
219            (newvalue,) = result
220            self.assertEqual(newvalue, sample)
221            self.assertIs(type(newvalue), xmlrpclib.Binary)
222            self.assertIsNone(m)
223
224    def test_loads_unsupported(self):
225        ResponseError = xmlrpclib.ResponseError
226        data = '<params><param><value><spam/></value></param></params>'
227        self.assertRaises(ResponseError, xmlrpclib.loads, data)
228        data = ('<params><param><value><array>'
229                '<value><spam/></value>'
230                '</array></value></param></params>')
231        self.assertRaises(ResponseError, xmlrpclib.loads, data)
232        data = ('<params><param><value><struct>'
233                '<member><name>a</name><value><spam/></value></member>'
234                '<member><name>b</name><value><spam/></value></member>'
235                '</struct></value></param></params>')
236        self.assertRaises(ResponseError, xmlrpclib.loads, data)
237
238    def check_loads(self, s, value, **kwargs):
239        dump = '<params><param><value>%s</value></param></params>' % s
240        result, m = xmlrpclib.loads(dump, **kwargs)
241        (newvalue,) = result
242        self.assertEqual(newvalue, value)
243        self.assertIs(type(newvalue), type(value))
244        self.assertIsNone(m)
245
246    def test_load_standard_types(self):
247        check = self.check_loads
248        check('string', 'string')
249        check('<string>string</string>', 'string')
250        check('<string>�������������� string</string>', '�������������� string')
251        check('<int>2056183947</int>', 2056183947)
252        check('<int>-2056183947</int>', -2056183947)
253        check('<i4>2056183947</i4>', 2056183947)
254        check('<double>46093.78125</double>', 46093.78125)
255        check('<boolean>0</boolean>', False)
256        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
257              xmlrpclib.Binary(b'\x00byte string\xff'))
258        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
259              b'\x00byte string\xff', use_builtin_types=True)
260        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
261              xmlrpclib.DateTime('20050210T11:41:23'))
262        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
263              datetime.datetime(2005, 2, 10, 11, 41, 23),
264              use_builtin_types=True)
265        check('<array><data>'
266              '<value><int>1</int></value><value><int>2</int></value>'
267              '</data></array>', [1, 2])
268        check('<struct>'
269              '<member><name>b</name><value><int>2</int></value></member>'
270              '<member><name>a</name><value><int>1</int></value></member>'
271              '</struct>', {'a': 1, 'b': 2})
272
273    def test_load_extension_types(self):
274        check = self.check_loads
275        check('<nil/>', None)
276        check('<ex:nil/>', None)
277        check('<i1>205</i1>', 205)
278        check('<i2>20561</i2>', 20561)
279        check('<i8>9876543210</i8>', 9876543210)
280        check('<biginteger>98765432100123456789</biginteger>',
281              98765432100123456789)
282        check('<float>93.78125</float>', 93.78125)
283        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
284              decimal.Decimal('9876543210.0123456789'))
285
286    def test_limit_int(self):
287        check = self.check_loads
288        maxdigits = 5000
289        with support.adjust_int_max_str_digits(maxdigits):
290            s = '1' * (maxdigits + 1)
291            with self.assertRaises(ValueError):
292                check(f'<int>{s}</int>', None)
293            with self.assertRaises(ValueError):
294                check(f'<biginteger>{s}</biginteger>', None)
295
296    def test_get_host_info(self):
297        # see bug #3613, this raised a TypeError
298        transp = xmlrpc.client.Transport()
299        self.assertEqual(transp.get_host_info("user@host.tld"),
300                          ('host.tld',
301                           [('Authorization', 'Basic dXNlcg==')], {}))
302
303    def test_ssl_presence(self):
304        try:
305            import ssl
306        except ImportError:
307            has_ssl = False
308        else:
309            has_ssl = True
310        try:
311            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
312        except NotImplementedError:
313            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
314        except OSError:
315            self.assertTrue(has_ssl)
316
317    def test_keepalive_disconnect(self):
318        class RequestHandler(http.server.BaseHTTPRequestHandler):
319            protocol_version = "HTTP/1.1"
320            handled = False
321
322            def do_POST(self):
323                length = int(self.headers.get("Content-Length"))
324                self.rfile.read(length)
325                if self.handled:
326                    self.close_connection = True
327                    return
328                response = xmlrpclib.dumps((5,), methodresponse=True)
329                response = response.encode()
330                self.send_response(http.HTTPStatus.OK)
331                self.send_header("Content-Length", len(response))
332                self.end_headers()
333                self.wfile.write(response)
334                self.handled = True
335                self.close_connection = False
336
337            def log_message(self, format, *args):
338                # don't clobber sys.stderr
339                pass
340
341        def run_server():
342            server.socket.settimeout(float(1))  # Don't hang if client fails
343            server.handle_request()  # First request and attempt at second
344            server.handle_request()  # Retried second request
345
346        server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
347        self.addCleanup(server.server_close)
348        thread = threading.Thread(target=run_server)
349        thread.start()
350        self.addCleanup(thread.join)
351        url = "http://{}:{}/".format(*server.server_address)
352        with xmlrpclib.ServerProxy(url) as p:
353            self.assertEqual(p.method(), 5)
354            self.assertEqual(p.method(), 5)
355
356
357class SimpleXMLRPCDispatcherTestCase(unittest.TestCase):
358    class DispatchExc(Exception):
359        """Raised inside the dispatched functions when checking for
360        chained exceptions"""
361
362    def test_call_registered_func(self):
363        """Calls explicitly registered function"""
364        # Makes sure any exception raised inside the function has no other
365        # exception chained to it
366
367        exp_params = 1, 2, 3
368
369        def dispatched_func(*params):
370            raise self.DispatchExc(params)
371
372        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
373        dispatcher.register_function(dispatched_func)
374        with self.assertRaises(self.DispatchExc) as exc_ctx:
375            dispatcher._dispatch('dispatched_func', exp_params)
376        self.assertEqual(exc_ctx.exception.args, (exp_params,))
377        self.assertIsNone(exc_ctx.exception.__cause__)
378        self.assertIsNone(exc_ctx.exception.__context__)
379
380    def test_call_instance_func(self):
381        """Calls a registered instance attribute as a function"""
382        # Makes sure any exception raised inside the function has no other
383        # exception chained to it
384
385        exp_params = 1, 2, 3
386
387        class DispatchedClass:
388            def dispatched_func(self, *params):
389                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
390
391        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
392        dispatcher.register_instance(DispatchedClass())
393        with self.assertRaises(self.DispatchExc) as exc_ctx:
394            dispatcher._dispatch('dispatched_func', exp_params)
395        self.assertEqual(exc_ctx.exception.args, (exp_params,))
396        self.assertIsNone(exc_ctx.exception.__cause__)
397        self.assertIsNone(exc_ctx.exception.__context__)
398
399    def test_call_dispatch_func(self):
400        """Calls the registered instance's `_dispatch` function"""
401        # Makes sure any exception raised inside the function has no other
402        # exception chained to it
403
404        exp_method = 'method'
405        exp_params = 1, 2, 3
406
407        class TestInstance:
408            def _dispatch(self, method, params):
409                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
410                    method, params)
411
412        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
413        dispatcher.register_instance(TestInstance())
414        with self.assertRaises(self.DispatchExc) as exc_ctx:
415            dispatcher._dispatch(exp_method, exp_params)
416        self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
417        self.assertIsNone(exc_ctx.exception.__cause__)
418        self.assertIsNone(exc_ctx.exception.__context__)
419
420    def test_registered_func_is_none(self):
421        """Calls explicitly registered function which is None"""
422
423        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
424        dispatcher.register_function(None, name='method')
425        with self.assertRaisesRegex(Exception, 'method'):
426            dispatcher._dispatch('method', ('param',))
427
428    def test_instance_has_no_func(self):
429        """Attempts to call nonexistent function on a registered instance"""
430
431        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
432        dispatcher.register_instance(object())
433        with self.assertRaisesRegex(Exception, 'method'):
434            dispatcher._dispatch('method', ('param',))
435
436    def test_cannot_locate_func(self):
437        """Calls a function that the dispatcher cannot locate"""
438
439        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
440        with self.assertRaisesRegex(Exception, 'method'):
441            dispatcher._dispatch('method', ('param',))
442
443
444class HelperTestCase(unittest.TestCase):
445    def test_escape(self):
446        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
447        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
448        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
449
450class FaultTestCase(unittest.TestCase):
451    def test_repr(self):
452        f = xmlrpclib.Fault(42, 'Test Fault')
453        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
454        self.assertEqual(repr(f), str(f))
455
456    def test_dump_fault(self):
457        f = xmlrpclib.Fault(42, 'Test Fault')
458        s = xmlrpclib.dumps((f,))
459        (newf,), m = xmlrpclib.loads(s)
460        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
461        self.assertEqual(m, None)
462
463        s = xmlrpclib.Marshaller().dumps(f)
464        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
465
466    def test_dotted_attribute(self):
467        # this will raise AttributeError because code don't want us to use
468        # private methods
469        self.assertRaises(AttributeError,
470                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
471        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
472
473class DateTimeTestCase(unittest.TestCase):
474    def test_default(self):
475        with mock.patch('time.localtime') as localtime_mock:
476            time_struct = time.struct_time(
477                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
478            localtime_mock.return_value = time_struct
479            localtime = time.localtime()
480            t = xmlrpclib.DateTime()
481            self.assertEqual(str(t),
482                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
483
484    def test_time(self):
485        d = 1181399930.036952
486        t = xmlrpclib.DateTime(d)
487        self.assertEqual(str(t),
488                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
489
490    def test_time_tuple(self):
491        d = (2007,6,9,10,38,50,5,160,0)
492        t = xmlrpclib.DateTime(d)
493        self.assertEqual(str(t), '20070609T10:38:50')
494
495    def test_time_struct(self):
496        d = time.localtime(1181399930.036952)
497        t = xmlrpclib.DateTime(d)
498        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
499
500    def test_datetime_datetime(self):
501        d = datetime.datetime(2007,1,2,3,4,5)
502        t = xmlrpclib.DateTime(d)
503        self.assertEqual(str(t), '20070102T03:04:05')
504
505    def test_repr(self):
506        d = datetime.datetime(2007,1,2,3,4,5)
507        t = xmlrpclib.DateTime(d)
508        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
509        self.assertEqual(repr(t), val)
510
511    def test_decode(self):
512        d = ' 20070908T07:11:13  '
513        t1 = xmlrpclib.DateTime()
514        t1.decode(d)
515        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
516        self.assertEqual(t1, tref)
517
518        t2 = xmlrpclib._datetime(d)
519        self.assertEqual(t2, tref)
520
521    def test_comparison(self):
522        now = datetime.datetime.now()
523        dtime = xmlrpclib.DateTime(now.timetuple())
524
525        # datetime vs. DateTime
526        self.assertTrue(dtime == now)
527        self.assertTrue(now == dtime)
528        then = now + datetime.timedelta(seconds=4)
529        self.assertTrue(then >= dtime)
530        self.assertTrue(dtime < then)
531
532        # str vs. DateTime
533        dstr = now.strftime("%Y%m%dT%H:%M:%S")
534        self.assertTrue(dtime == dstr)
535        self.assertTrue(dstr == dtime)
536        dtime_then = xmlrpclib.DateTime(then.timetuple())
537        self.assertTrue(dtime_then >= dstr)
538        self.assertTrue(dstr < dtime_then)
539
540        # some other types
541        dbytes = dstr.encode('ascii')
542        dtuple = now.timetuple()
543        with self.assertRaises(TypeError):
544            dtime == 1970
545        with self.assertRaises(TypeError):
546            dtime != dbytes
547        with self.assertRaises(TypeError):
548            dtime == bytearray(dbytes)
549        with self.assertRaises(TypeError):
550            dtime != dtuple
551        with self.assertRaises(TypeError):
552            dtime < float(1970)
553        with self.assertRaises(TypeError):
554            dtime > dbytes
555        with self.assertRaises(TypeError):
556            dtime <= bytearray(dbytes)
557        with self.assertRaises(TypeError):
558            dtime >= dtuple
559
560class BinaryTestCase(unittest.TestCase):
561
562    # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
563    # for now (i.e. interpreting the binary data as Latin-1-encoded
564    # text).  But this feels very unsatisfactory.  Perhaps we should
565    # only define repr(), and return r"Binary(b'\xff')" instead?
566
567    def test_default(self):
568        t = xmlrpclib.Binary()
569        self.assertEqual(str(t), '')
570
571    def test_string(self):
572        d = b'\x01\x02\x03abc123\xff\xfe'
573        t = xmlrpclib.Binary(d)
574        self.assertEqual(str(t), str(d, "latin-1"))
575
576    def test_decode(self):
577        d = b'\x01\x02\x03abc123\xff\xfe'
578        de = base64.encodebytes(d)
579        t1 = xmlrpclib.Binary()
580        t1.decode(de)
581        self.assertEqual(str(t1), str(d, "latin-1"))
582
583        t2 = xmlrpclib._binary(de)
584        self.assertEqual(str(t2), str(d, "latin-1"))
585
586
587ADDR = PORT = URL = None
588
589# The evt is set twice.  First when the server is ready to serve.
590# Second when the server has been shutdown.  The user must clear
591# the event after it has been set the first time to catch the second set.
592def http_server(evt, numrequests, requestHandler=None, encoding=None):
593    class TestInstanceClass:
594        def div(self, x, y):
595            return x // y
596
597        def _methodHelp(self, name):
598            if name == 'div':
599                return 'This is the div function'
600
601        class Fixture:
602            @staticmethod
603            def getData():
604                return '42'
605
606    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
607        def get_request(self):
608            # Ensure the socket is always non-blocking.  On Linux, socket
609            # attributes are not inherited like they are on *BSD and Windows.
610            s, port = self.socket.accept()
611            s.setblocking(True)
612            return s, port
613
614    if not requestHandler:
615        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
616    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
617                          encoding=encoding,
618                          logRequests=False, bind_and_activate=False)
619    try:
620        serv.server_bind()
621        global ADDR, PORT, URL
622        ADDR, PORT = serv.socket.getsockname()
623        #connect to IP address directly.  This avoids socket.create_connection()
624        #trying to connect to "localhost" using all address families, which
625        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
626        #on AF_INET only.
627        URL = "http://%s:%d"%(ADDR, PORT)
628        serv.server_activate()
629        serv.register_introspection_functions()
630        serv.register_multicall_functions()
631        serv.register_function(pow)
632        serv.register_function(lambda x: x, 'têšt')
633        @serv.register_function
634        def my_function():
635            '''This is my function'''
636            return True
637        @serv.register_function(name='add')
638        def _(x, y):
639            return x + y
640        testInstance = TestInstanceClass()
641        serv.register_instance(testInstance, allow_dotted_names=True)
642        evt.set()
643
644        # handle up to 'numrequests' requests
645        while numrequests > 0:
646            serv.handle_request()
647            numrequests -= 1
648
649    except socket.timeout:
650        pass
651    finally:
652        serv.socket.close()
653        PORT = None
654        evt.set()
655
656def http_multi_server(evt, numrequests, requestHandler=None):
657    class TestInstanceClass:
658        def div(self, x, y):
659            return x // y
660
661        def _methodHelp(self, name):
662            if name == 'div':
663                return 'This is the div function'
664
665    def my_function():
666        '''This is my function'''
667        return True
668
669    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
670        def get_request(self):
671            # Ensure the socket is always non-blocking.  On Linux, socket
672            # attributes are not inherited like they are on *BSD and Windows.
673            s, port = self.socket.accept()
674            s.setblocking(True)
675            return s, port
676
677    if not requestHandler:
678        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
679    class MyRequestHandler(requestHandler):
680        rpc_paths = []
681
682    class BrokenDispatcher:
683        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
684            raise RuntimeError("broken dispatcher")
685
686    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
687                          logRequests=False, bind_and_activate=False)
688    serv.socket.settimeout(3)
689    serv.server_bind()
690    try:
691        global ADDR, PORT, URL
692        ADDR, PORT = serv.socket.getsockname()
693        #connect to IP address directly.  This avoids socket.create_connection()
694        #trying to connect to "localhost" using all address families, which
695        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
696        #on AF_INET only.
697        URL = "http://%s:%d"%(ADDR, PORT)
698        serv.server_activate()
699        paths = ["/foo", "/foo/bar"]
700        for path in paths:
701            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
702            d.register_introspection_functions()
703            d.register_multicall_functions()
704        serv.get_dispatcher(paths[0]).register_function(pow)
705        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
706        serv.add_dispatcher("/is/broken", BrokenDispatcher())
707        evt.set()
708
709        # handle up to 'numrequests' requests
710        while numrequests > 0:
711            serv.handle_request()
712            numrequests -= 1
713
714    except socket.timeout:
715        pass
716    finally:
717        serv.socket.close()
718        PORT = None
719        evt.set()
720
721# This function prevents errors like:
722#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
723def is_unavailable_exception(e):
724    '''Returns True if the given ProtocolError is the product of a server-side
725       exception caused by the 'temporarily unavailable' response sometimes
726       given by operations on non-blocking sockets.'''
727
728    # sometimes we get a -1 error code and/or empty headers
729    try:
730        if e.errcode == -1 or e.headers is None:
731            return True
732        exc_mess = e.headers.get('X-exception')
733    except AttributeError:
734        # Ignore OSErrors here.
735        exc_mess = str(e)
736
737    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
738        return True
739
740def make_request_and_skipIf(condition, reason):
741    # If we skip the test, we have to make a request because
742    # the server created in setUp blocks expecting one to come in.
743    if not condition:
744        return lambda func: func
745    def decorator(func):
746        def make_request_and_skip(self):
747            try:
748                xmlrpclib.ServerProxy(URL).my_function()
749            except (xmlrpclib.ProtocolError, OSError) as e:
750                if not is_unavailable_exception(e):
751                    raise
752            raise unittest.SkipTest(reason)
753        return make_request_and_skip
754    return decorator
755
756class BaseServerTestCase(unittest.TestCase):
757    requestHandler = None
758    request_count = 1
759    threadFunc = staticmethod(http_server)
760
761    def setUp(self):
762        # enable traceback reporting
763        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
764
765        self.evt = threading.Event()
766        # start server thread to handle requests
767        serv_args = (self.evt, self.request_count, self.requestHandler)
768        thread = threading.Thread(target=self.threadFunc, args=serv_args)
769        thread.start()
770        self.addCleanup(thread.join)
771
772        # wait for the server to be ready
773        self.evt.wait()
774        self.evt.clear()
775
776    def tearDown(self):
777        # wait on the server thread to terminate
778        self.evt.wait()
779
780        # disable traceback reporting
781        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
782
783class SimpleServerTestCase(BaseServerTestCase):
784    def test_simple1(self):
785        try:
786            p = xmlrpclib.ServerProxy(URL)
787            self.assertEqual(p.pow(6,8), 6**8)
788        except (xmlrpclib.ProtocolError, OSError) as e:
789            # ignore failures due to non-blocking socket 'unavailable' errors
790            if not is_unavailable_exception(e):
791                # protocol error; provide additional information in test output
792                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
793
794    def test_nonascii(self):
795        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
796        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
797        try:
798            p = xmlrpclib.ServerProxy(URL)
799            self.assertEqual(p.add(start_string, end_string),
800                             start_string + end_string)
801        except (xmlrpclib.ProtocolError, OSError) as e:
802            # ignore failures due to non-blocking socket 'unavailable' errors
803            if not is_unavailable_exception(e):
804                # protocol error; provide additional information in test output
805                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
806
807    def test_client_encoding(self):
808        start_string = '\u20ac'
809        end_string = '\xa4'
810
811        try:
812            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
813            self.assertEqual(p.add(start_string, end_string),
814                             start_string + end_string)
815        except (xmlrpclib.ProtocolError, socket.error) as e:
816            # ignore failures due to non-blocking socket unavailable errors.
817            if not is_unavailable_exception(e):
818                # protocol error; provide additional information in test output
819                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
820
821    def test_nonascii_methodname(self):
822        try:
823            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
824            self.assertEqual(p.têšt(42), 42)
825        except (xmlrpclib.ProtocolError, socket.error) as e:
826            # ignore failures due to non-blocking socket unavailable errors.
827            if not is_unavailable_exception(e):
828                # protocol error; provide additional information in test output
829                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
830
831    def test_404(self):
832        # send POST with http.client, it should return 404 header and
833        # 'Not Found' message.
834        with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
835            conn.request('POST', '/this-is-not-valid')
836            response = conn.getresponse()
837
838        self.assertEqual(response.status, 404)
839        self.assertEqual(response.reason, 'Not Found')
840
841    def test_introspection1(self):
842        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
843                                'system.listMethods', 'system.methodHelp',
844                                'system.methodSignature', 'system.multicall',
845                                'Fixture'])
846        try:
847            p = xmlrpclib.ServerProxy(URL)
848            meth = p.system.listMethods()
849            self.assertEqual(set(meth), expected_methods)
850        except (xmlrpclib.ProtocolError, OSError) as e:
851            # ignore failures due to non-blocking socket 'unavailable' errors
852            if not is_unavailable_exception(e):
853                # protocol error; provide additional information in test output
854                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
855
856
857    def test_introspection2(self):
858        try:
859            # test _methodHelp()
860            p = xmlrpclib.ServerProxy(URL)
861            divhelp = p.system.methodHelp('div')
862            self.assertEqual(divhelp, 'This is the div function')
863        except (xmlrpclib.ProtocolError, OSError) as e:
864            # ignore failures due to non-blocking socket 'unavailable' errors
865            if not is_unavailable_exception(e):
866                # protocol error; provide additional information in test output
867                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
868
869    @make_request_and_skipIf(sys.flags.optimize >= 2,
870                     "Docstrings are omitted with -O2 and above")
871    def test_introspection3(self):
872        try:
873            # test native doc
874            p = xmlrpclib.ServerProxy(URL)
875            myfunction = p.system.methodHelp('my_function')
876            self.assertEqual(myfunction, 'This is my function')
877        except (xmlrpclib.ProtocolError, OSError) as e:
878            # ignore failures due to non-blocking socket 'unavailable' errors
879            if not is_unavailable_exception(e):
880                # protocol error; provide additional information in test output
881                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
882
883    def test_introspection4(self):
884        # the SimpleXMLRPCServer doesn't support signatures, but
885        # at least check that we can try making the call
886        try:
887            p = xmlrpclib.ServerProxy(URL)
888            divsig = p.system.methodSignature('div')
889            self.assertEqual(divsig, 'signatures not supported')
890        except (xmlrpclib.ProtocolError, OSError) as e:
891            # ignore failures due to non-blocking socket 'unavailable' errors
892            if not is_unavailable_exception(e):
893                # protocol error; provide additional information in test output
894                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
895
896    def test_multicall(self):
897        try:
898            p = xmlrpclib.ServerProxy(URL)
899            multicall = xmlrpclib.MultiCall(p)
900            multicall.add(2,3)
901            multicall.pow(6,8)
902            multicall.div(127,42)
903            add_result, pow_result, div_result = multicall()
904            self.assertEqual(add_result, 2+3)
905            self.assertEqual(pow_result, 6**8)
906            self.assertEqual(div_result, 127//42)
907        except (xmlrpclib.ProtocolError, OSError) as e:
908            # ignore failures due to non-blocking socket 'unavailable' errors
909            if not is_unavailable_exception(e):
910                # protocol error; provide additional information in test output
911                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
912
913    def test_non_existing_multicall(self):
914        try:
915            p = xmlrpclib.ServerProxy(URL)
916            multicall = xmlrpclib.MultiCall(p)
917            multicall.this_is_not_exists()
918            result = multicall()
919
920            # result.results contains;
921            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
922            #   'method "this_is_not_exists" is not supported'>}]
923
924            self.assertEqual(result.results[0]['faultCode'], 1)
925            self.assertEqual(result.results[0]['faultString'],
926                '<class \'Exception\'>:method "this_is_not_exists" '
927                'is not supported')
928        except (xmlrpclib.ProtocolError, OSError) as e:
929            # ignore failures due to non-blocking socket 'unavailable' errors
930            if not is_unavailable_exception(e):
931                # protocol error; provide additional information in test output
932                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
933
934    def test_dotted_attribute(self):
935        # Raises an AttributeError because private methods are not allowed.
936        self.assertRaises(AttributeError,
937                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
938
939        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
940        # Get the test to run faster by sending a request with test_simple1.
941        # This avoids waiting for the socket timeout.
942        self.test_simple1()
943
944    def test_allow_dotted_names_true(self):
945        # XXX also need allow_dotted_names_false test.
946        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
947        data = server.Fixture.getData()
948        self.assertEqual(data, '42')
949
950    def test_unicode_host(self):
951        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
952        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
953
954    def test_partial_post(self):
955        # Check that a partial POST doesn't make the server loop: issue #14001.
956        with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
957            conn.send('POST /RPC2 HTTP/1.0\r\n'
958                      'Content-Length: 100\r\n\r\n'
959                      'bye HTTP/1.1\r\n'
960                      f'Host: {ADDR}:{PORT}\r\n'
961                      'Accept-Encoding: identity\r\n'
962                      'Content-Length: 0\r\n\r\n'.encode('ascii'))
963
964    def test_context_manager(self):
965        with xmlrpclib.ServerProxy(URL) as server:
966            server.add(2, 3)
967            self.assertNotEqual(server('transport')._connection,
968                                (None, None))
969        self.assertEqual(server('transport')._connection,
970                         (None, None))
971
972    def test_context_manager_method_error(self):
973        try:
974            with xmlrpclib.ServerProxy(URL) as server:
975                server.add(2, "a")
976        except xmlrpclib.Fault:
977            pass
978        self.assertEqual(server('transport')._connection,
979                         (None, None))
980
981
982class SimpleServerEncodingTestCase(BaseServerTestCase):
983    @staticmethod
984    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
985        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
986
987    def test_server_encoding(self):
988        start_string = '\u20ac'
989        end_string = '\xa4'
990
991        try:
992            p = xmlrpclib.ServerProxy(URL)
993            self.assertEqual(p.add(start_string, end_string),
994                             start_string + end_string)
995        except (xmlrpclib.ProtocolError, socket.error) as e:
996            # ignore failures due to non-blocking socket unavailable errors.
997            if not is_unavailable_exception(e):
998                # protocol error; provide additional information in test output
999                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1000
1001
1002class MultiPathServerTestCase(BaseServerTestCase):
1003    threadFunc = staticmethod(http_multi_server)
1004    request_count = 2
1005    def test_path1(self):
1006        p = xmlrpclib.ServerProxy(URL+"/foo")
1007        self.assertEqual(p.pow(6,8), 6**8)
1008        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1009
1010    def test_path2(self):
1011        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
1012        self.assertEqual(p.add(6,8), 6+8)
1013        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1014
1015    def test_path3(self):
1016        p = xmlrpclib.ServerProxy(URL+"/is/broken")
1017        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1018
1019#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1020#does indeed serve subsequent requests on the same connection
1021class BaseKeepaliveServerTestCase(BaseServerTestCase):
1022    #a request handler that supports keep-alive and logs requests into a
1023    #class variable
1024    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1025        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1026        protocol_version = 'HTTP/1.1'
1027        myRequests = []
1028        def handle(self):
1029            self.myRequests.append([])
1030            self.reqidx = len(self.myRequests)-1
1031            return self.parentClass.handle(self)
1032        def handle_one_request(self):
1033            result = self.parentClass.handle_one_request(self)
1034            self.myRequests[self.reqidx].append(self.raw_requestline)
1035            return result
1036
1037    requestHandler = RequestHandler
1038    def setUp(self):
1039        #clear request log
1040        self.RequestHandler.myRequests = []
1041        return BaseServerTestCase.setUp(self)
1042
1043#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1044#does indeed serve subsequent requests on the same connection
1045class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
1046    def test_two(self):
1047        p = xmlrpclib.ServerProxy(URL)
1048        #do three requests.
1049        self.assertEqual(p.pow(6,8), 6**8)
1050        self.assertEqual(p.pow(6,8), 6**8)
1051        self.assertEqual(p.pow(6,8), 6**8)
1052        p("close")()
1053
1054        #they should have all been handled by a single request handler
1055        self.assertEqual(len(self.RequestHandler.myRequests), 1)
1056
1057        #check that we did at least two (the third may be pending append
1058        #due to thread scheduling)
1059        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1060
1061
1062#test special attribute access on the serverproxy, through the __call__
1063#function.
1064class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
1065    #ask for two keepalive requests to be handled.
1066    request_count=2
1067
1068    def test_close(self):
1069        p = xmlrpclib.ServerProxy(URL)
1070        #do some requests with close.
1071        self.assertEqual(p.pow(6,8), 6**8)
1072        self.assertEqual(p.pow(6,8), 6**8)
1073        self.assertEqual(p.pow(6,8), 6**8)
1074        p("close")() #this should trigger a new keep-alive request
1075        self.assertEqual(p.pow(6,8), 6**8)
1076        self.assertEqual(p.pow(6,8), 6**8)
1077        self.assertEqual(p.pow(6,8), 6**8)
1078        p("close")()
1079
1080        #they should have all been two request handlers, each having logged at least
1081        #two complete requests
1082        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1083        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1084        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1085
1086
1087    def test_transport(self):
1088        p = xmlrpclib.ServerProxy(URL)
1089        #do some requests with close.
1090        self.assertEqual(p.pow(6,8), 6**8)
1091        p("transport").close() #same as above, really.
1092        self.assertEqual(p.pow(6,8), 6**8)
1093        p("close")()
1094        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1095
1096#A test case that verifies that gzip encoding works in both directions
1097#(for a request and the response)
1098@unittest.skipIf(gzip is None, 'requires gzip')
1099class GzipServerTestCase(BaseServerTestCase):
1100    #a request handler that supports keep-alive and logs requests into a
1101    #class variable
1102    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1103        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1104        protocol_version = 'HTTP/1.1'
1105
1106        def do_POST(self):
1107            #store content of last request in class
1108            self.__class__.content_length = int(self.headers["content-length"])
1109            return self.parentClass.do_POST(self)
1110    requestHandler = RequestHandler
1111
1112    class Transport(xmlrpclib.Transport):
1113        #custom transport, stores the response length for our perusal
1114        fake_gzip = False
1115        def parse_response(self, response):
1116            self.response_length=int(response.getheader("content-length", 0))
1117            return xmlrpclib.Transport.parse_response(self, response)
1118
1119        def send_content(self, connection, body):
1120            if self.fake_gzip:
1121                #add a lone gzip header to induce decode error remotely
1122                connection.putheader("Content-Encoding", "gzip")
1123            return xmlrpclib.Transport.send_content(self, connection, body)
1124
1125    def setUp(self):
1126        BaseServerTestCase.setUp(self)
1127
1128    def test_gzip_request(self):
1129        t = self.Transport()
1130        t.encode_threshold = None
1131        p = xmlrpclib.ServerProxy(URL, transport=t)
1132        self.assertEqual(p.pow(6,8), 6**8)
1133        a = self.RequestHandler.content_length
1134        t.encode_threshold = 0 #turn on request encoding
1135        self.assertEqual(p.pow(6,8), 6**8)
1136        b = self.RequestHandler.content_length
1137        self.assertTrue(a>b)
1138        p("close")()
1139
1140    def test_bad_gzip_request(self):
1141        t = self.Transport()
1142        t.encode_threshold = None
1143        t.fake_gzip = True
1144        p = xmlrpclib.ServerProxy(URL, transport=t)
1145        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1146                                    re.compile(r"\b400\b"))
1147        with cm:
1148            p.pow(6, 8)
1149        p("close")()
1150
1151    def test_gzip_response(self):
1152        t = self.Transport()
1153        p = xmlrpclib.ServerProxy(URL, transport=t)
1154        old = self.requestHandler.encode_threshold
1155        self.requestHandler.encode_threshold = None #no encoding
1156        self.assertEqual(p.pow(6,8), 6**8)
1157        a = t.response_length
1158        self.requestHandler.encode_threshold = 0 #always encode
1159        self.assertEqual(p.pow(6,8), 6**8)
1160        p("close")()
1161        b = t.response_length
1162        self.requestHandler.encode_threshold = old
1163        self.assertTrue(a>b)
1164
1165
1166@unittest.skipIf(gzip is None, 'requires gzip')
1167class GzipUtilTestCase(unittest.TestCase):
1168
1169    def test_gzip_decode_limit(self):
1170        max_gzip_decode = 20 * 1024 * 1024
1171        data = b'\0' * max_gzip_decode
1172        encoded = xmlrpclib.gzip_encode(data)
1173        decoded = xmlrpclib.gzip_decode(encoded)
1174        self.assertEqual(len(decoded), max_gzip_decode)
1175
1176        data = b'\0' * (max_gzip_decode + 1)
1177        encoded = xmlrpclib.gzip_encode(data)
1178
1179        with self.assertRaisesRegex(ValueError,
1180                                    "max gzipped payload length exceeded"):
1181            xmlrpclib.gzip_decode(encoded)
1182
1183        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1184
1185
1186class HeadersServerTestCase(BaseServerTestCase):
1187    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1188        test_headers = None
1189
1190        def do_POST(self):
1191            self.__class__.test_headers = self.headers
1192            return super().do_POST()
1193    requestHandler = RequestHandler
1194    standard_headers = [
1195        'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
1196        'Content-Length']
1197
1198    def setUp(self):
1199        self.RequestHandler.test_headers = None
1200        return super().setUp()
1201
1202    def assertContainsAdditionalHeaders(self, headers, additional):
1203        expected_keys = sorted(self.standard_headers + list(additional.keys()))
1204        self.assertListEqual(sorted(headers.keys()), expected_keys)
1205
1206        for key, value in additional.items():
1207            self.assertEqual(headers.get(key), value)
1208
1209    def test_header(self):
1210        p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
1211        self.assertEqual(p.pow(6, 8), 6**8)
1212
1213        headers = self.RequestHandler.test_headers
1214        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1215
1216    def test_header_many(self):
1217        p = xmlrpclib.ServerProxy(
1218            URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
1219        self.assertEqual(p.pow(6, 8), 6**8)
1220
1221        headers = self.RequestHandler.test_headers
1222        self.assertContainsAdditionalHeaders(
1223            headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
1224
1225    def test_header_empty(self):
1226        p = xmlrpclib.ServerProxy(URL, headers=[])
1227        self.assertEqual(p.pow(6, 8), 6**8)
1228
1229        headers = self.RequestHandler.test_headers
1230        self.assertContainsAdditionalHeaders(headers, {})
1231
1232    def test_header_tuple(self):
1233        p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
1234        self.assertEqual(p.pow(6, 8), 6**8)
1235
1236        headers = self.RequestHandler.test_headers
1237        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1238
1239    def test_header_items(self):
1240        p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
1241        self.assertEqual(p.pow(6, 8), 6**8)
1242
1243        headers = self.RequestHandler.test_headers
1244        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1245
1246
1247#Test special attributes of the ServerProxy object
1248class ServerProxyTestCase(unittest.TestCase):
1249    def setUp(self):
1250        unittest.TestCase.setUp(self)
1251        # Actual value of the URL doesn't matter if it is a string in
1252        # the correct format.
1253        self.url = 'http://fake.localhost'
1254
1255    def test_close(self):
1256        p = xmlrpclib.ServerProxy(self.url)
1257        self.assertEqual(p('close')(), None)
1258
1259    def test_transport(self):
1260        t = xmlrpclib.Transport()
1261        p = xmlrpclib.ServerProxy(self.url, transport=t)
1262        self.assertEqual(p('transport'), t)
1263
1264
1265# This is a contrived way to make a failure occur on the server side
1266# in order to test the _send_traceback_header flag on the server
1267class FailingMessageClass(http.client.HTTPMessage):
1268    def get(self, key, failobj=None):
1269        key = key.lower()
1270        if key == 'content-length':
1271            return 'I am broken'
1272        return super().get(key, failobj)
1273
1274
1275class FailingServerTestCase(unittest.TestCase):
1276    def setUp(self):
1277        self.evt = threading.Event()
1278        # start server thread to handle requests
1279        serv_args = (self.evt, 1)
1280        thread = threading.Thread(target=http_server, args=serv_args)
1281        thread.start()
1282        self.addCleanup(thread.join)
1283
1284        # wait for the server to be ready
1285        self.evt.wait()
1286        self.evt.clear()
1287
1288    def tearDown(self):
1289        # wait on the server thread to terminate
1290        self.evt.wait()
1291        # reset flag
1292        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1293        # reset message class
1294        default_class = http.client.HTTPMessage
1295        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1296
1297    def test_basic(self):
1298        # check that flag is false by default
1299        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1300        self.assertEqual(flagval, False)
1301
1302        # enable traceback reporting
1303        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1304
1305        # test a call that shouldn't fail just as a smoke test
1306        try:
1307            p = xmlrpclib.ServerProxy(URL)
1308            self.assertEqual(p.pow(6,8), 6**8)
1309        except (xmlrpclib.ProtocolError, OSError) as e:
1310            # ignore failures due to non-blocking socket 'unavailable' errors
1311            if not is_unavailable_exception(e):
1312                # protocol error; provide additional information in test output
1313                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1314
1315    def test_fail_no_info(self):
1316        # use the broken message class
1317        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1318
1319        try:
1320            p = xmlrpclib.ServerProxy(URL)
1321            p.pow(6,8)
1322        except (xmlrpclib.ProtocolError, OSError) as e:
1323            # ignore failures due to non-blocking socket 'unavailable' errors
1324            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1325                # The two server-side error headers shouldn't be sent back in this case
1326                self.assertTrue(e.headers.get("X-exception") is None)
1327                self.assertTrue(e.headers.get("X-traceback") is None)
1328        else:
1329            self.fail('ProtocolError not raised')
1330
1331    def test_fail_with_info(self):
1332        # use the broken message class
1333        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1334
1335        # Check that errors in the server send back exception/traceback
1336        # info when flag is set
1337        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1338
1339        try:
1340            p = xmlrpclib.ServerProxy(URL)
1341            p.pow(6,8)
1342        except (xmlrpclib.ProtocolError, OSError) as e:
1343            # ignore failures due to non-blocking socket 'unavailable' errors
1344            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1345                # We should get error info in the response
1346                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1347                self.assertEqual(e.headers.get("X-exception"), expected_err)
1348                self.assertTrue(e.headers.get("X-traceback") is not None)
1349        else:
1350            self.fail('ProtocolError not raised')
1351
1352
1353@contextlib.contextmanager
1354def captured_stdout(encoding='utf-8'):
1355    """A variation on support.captured_stdout() which gives a text stream
1356    having a `buffer` attribute.
1357    """
1358    orig_stdout = sys.stdout
1359    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1360    try:
1361        yield sys.stdout
1362    finally:
1363        sys.stdout = orig_stdout
1364
1365
1366class CGIHandlerTestCase(unittest.TestCase):
1367    def setUp(self):
1368        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1369
1370    def tearDown(self):
1371        self.cgi = None
1372
1373    def test_cgi_get(self):
1374        with support.EnvironmentVarGuard() as env:
1375            env['REQUEST_METHOD'] = 'GET'
1376            # if the method is GET and no request_text is given, it runs handle_get
1377            # get sysout output
1378            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1379                self.cgi.handle_request()
1380
1381            # parse Status header
1382            data_out.seek(0)
1383            handle = data_out.read()
1384            status = handle.split()[1]
1385            message = ' '.join(handle.split()[2:4])
1386
1387            self.assertEqual(status, '400')
1388            self.assertEqual(message, 'Bad Request')
1389
1390
1391    def test_cgi_xmlrpc_response(self):
1392        data = """<?xml version='1.0'?>
1393        <methodCall>
1394            <methodName>test_method</methodName>
1395            <params>
1396                <param>
1397                    <value><string>foo</string></value>
1398                </param>
1399                <param>
1400                    <value><string>bar</string></value>
1401                </param>
1402            </params>
1403        </methodCall>
1404        """
1405
1406        with support.EnvironmentVarGuard() as env, \
1407             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1408             support.captured_stdin() as data_in:
1409            data_in.write(data)
1410            data_in.seek(0)
1411            env['CONTENT_LENGTH'] = str(len(data))
1412            self.cgi.handle_request()
1413        data_out.seek(0)
1414
1415        # will respond exception, if so, our goal is achieved ;)
1416        handle = data_out.read()
1417
1418        # start with 44th char so as not to get http header, we just
1419        # need only xml
1420        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1421
1422        # Also test the content-length returned  by handle_request
1423        # Using the same test method inorder to avoid all the datapassing
1424        # boilerplate code.
1425        # Test for bug: http://bugs.python.org/issue5040
1426
1427        content = handle[handle.find("<?xml"):]
1428
1429        self.assertEqual(
1430            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1431            len(content))
1432
1433
1434class UseBuiltinTypesTestCase(unittest.TestCase):
1435
1436    def test_use_builtin_types(self):
1437        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1438        # makes all dispatch of binary data as bytes instances, and all
1439        # dispatch of datetime argument as datetime.datetime instances.
1440        self.log = []
1441        expected_bytes = b"my dog has fleas"
1442        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1443        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1444        def foobar(*args):
1445            self.log.extend(args)
1446        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1447            allow_none=True, encoding=None, use_builtin_types=True)
1448        handler.register_function(foobar)
1449        handler._marshaled_dispatch(marshaled)
1450        self.assertEqual(len(self.log), 2)
1451        mybytes, mydate = self.log
1452        self.assertEqual(self.log, [expected_bytes, expected_date])
1453        self.assertIs(type(mydate), datetime.datetime)
1454        self.assertIs(type(mybytes), bytes)
1455
1456    def test_cgihandler_has_use_builtin_types_flag(self):
1457        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1458        self.assertTrue(handler.use_builtin_types)
1459
1460    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1461        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1462            use_builtin_types=True)
1463        server.server_close()
1464        self.assertTrue(server.use_builtin_types)
1465
1466
1467@support.reap_threads
1468def test_main():
1469    support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1470            BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
1471            SimpleServerTestCase, SimpleServerEncodingTestCase,
1472            KeepaliveServerTestCase1, KeepaliveServerTestCase2,
1473            GzipServerTestCase, GzipUtilTestCase, HeadersServerTestCase,
1474            MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
1475            CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase)
1476
1477
1478if __name__ == "__main__":
1479    test_main()
1480