• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import threading
14import re
15import io
16import contextlib
17from test import support
18from test.support import os_helper
19from test.support import socket_helper
20from test.support import threading_helper
21from test.support import ALWAYS_EQ, LARGEST, SMALLEST
22
23try:
24    import gzip
25except ImportError:
26    gzip = None
27
28alist = [{'astring': 'foo@bar.baz.spam',
29          'afloat': 7283.43,
30          'anint': 2**20,
31          'ashortlong': 2,
32          'anotherlist': ['.zyx.41'],
33          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
34          'b64bytes': b"my dog has fleas",
35          'b64bytearray': bytearray(b"my dog has fleas"),
36          'boolean': False,
37          'unicode': '\u4000\u6000\u8000',
38          'ukey\u4000': 'regular value',
39          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
40          'datetime2': xmlrpclib.DateTime(
41                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
42          'datetime3': xmlrpclib.DateTime(
43                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
44          }]
45
46class XMLRPCTestCase(unittest.TestCase):
47
48    def test_dump_load(self):
49        dump = xmlrpclib.dumps((alist,))
50        load = xmlrpclib.loads(dump)
51        self.assertEqual(alist, load[0][0])
52
53    def test_dump_bare_datetime(self):
54        # This checks that an unwrapped datetime.date object can be handled
55        # by the marshalling code.  This can't be done via test_dump_load()
56        # since with use_builtin_types set to 1 the unmarshaller would create
57        # datetime objects for the 'datetime[123]' keys as well
58        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
59        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
60        s = xmlrpclib.dumps((dt,))
61
62        result, m = xmlrpclib.loads(s, use_builtin_types=True)
63        (newdt,) = result
64        self.assertEqual(newdt, dt)
65        self.assertIs(type(newdt), datetime.datetime)
66        self.assertIsNone(m)
67
68        result, m = xmlrpclib.loads(s, use_builtin_types=False)
69        (newdt,) = result
70        self.assertEqual(newdt, dt)
71        self.assertIs(type(newdt), xmlrpclib.DateTime)
72        self.assertIsNone(m)
73
74        result, m = xmlrpclib.loads(s, use_datetime=True)
75        (newdt,) = result
76        self.assertEqual(newdt, dt)
77        self.assertIs(type(newdt), datetime.datetime)
78        self.assertIsNone(m)
79
80        result, m = xmlrpclib.loads(s, use_datetime=False)
81        (newdt,) = result
82        self.assertEqual(newdt, dt)
83        self.assertIs(type(newdt), xmlrpclib.DateTime)
84        self.assertIsNone(m)
85
86
87    def test_datetime_before_1900(self):
88        # same as before but with a date before 1900
89        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
90        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
91        s = xmlrpclib.dumps((dt,))
92
93        result, m = xmlrpclib.loads(s, use_builtin_types=True)
94        (newdt,) = result
95        self.assertEqual(newdt, dt)
96        self.assertIs(type(newdt), datetime.datetime)
97        self.assertIsNone(m)
98
99        result, m = xmlrpclib.loads(s, use_builtin_types=False)
100        (newdt,) = result
101        self.assertEqual(newdt, dt)
102        self.assertIs(type(newdt), xmlrpclib.DateTime)
103        self.assertIsNone(m)
104
105    def test_bug_1164912 (self):
106        d = xmlrpclib.DateTime()
107        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
108                                            methodresponse=True))
109        self.assertIsInstance(new_d.value, str)
110
111        # Check that the output of dumps() is still an 8-bit string
112        s = xmlrpclib.dumps((new_d,), methodresponse=True)
113        self.assertIsInstance(s, str)
114
115    def test_newstyle_class(self):
116        class T(object):
117            pass
118        t = T()
119        t.x = 100
120        t.y = "Hello"
121        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
122        self.assertEqual(t2, t.__dict__)
123
124    def test_dump_big_long(self):
125        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
126
127    def test_dump_bad_dict(self):
128        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
129
130    def test_dump_recursive_seq(self):
131        l = [1,2,3]
132        t = [3,4,5,l]
133        l.append(t)
134        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
135
136    def test_dump_recursive_dict(self):
137        d = {'1':1, '2':1}
138        t = {'3':3, 'd':d}
139        d['t'] = t
140        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
141
142    def test_dump_big_int(self):
143        if sys.maxsize > 2**31-1:
144            self.assertRaises(OverflowError, xmlrpclib.dumps,
145                              (int(2**34),))
146
147        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
148        self.assertRaises(OverflowError, xmlrpclib.dumps,
149                          (xmlrpclib.MAXINT+1,))
150        self.assertRaises(OverflowError, xmlrpclib.dumps,
151                          (xmlrpclib.MININT-1,))
152
153        def dummy_write(s):
154            pass
155
156        m = xmlrpclib.Marshaller()
157        m.dump_int(xmlrpclib.MAXINT, dummy_write)
158        m.dump_int(xmlrpclib.MININT, dummy_write)
159        self.assertRaises(OverflowError, m.dump_int,
160                          xmlrpclib.MAXINT+1, dummy_write)
161        self.assertRaises(OverflowError, m.dump_int,
162                          xmlrpclib.MININT-1, dummy_write)
163
164    def test_dump_double(self):
165        xmlrpclib.dumps((float(2 ** 34),))
166        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
167                         float(xmlrpclib.MININT)))
168        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
169                         float(xmlrpclib.MININT - 42)))
170
171        def dummy_write(s):
172            pass
173
174        m = xmlrpclib.Marshaller()
175        m.dump_double(xmlrpclib.MAXINT, dummy_write)
176        m.dump_double(xmlrpclib.MININT, dummy_write)
177        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
178        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
179
180    def test_dump_none(self):
181        value = alist + [None]
182        arg1 = (alist + [None],)
183        strg = xmlrpclib.dumps(arg1, allow_none=True)
184        self.assertEqual(value,
185                          xmlrpclib.loads(strg)[0][0])
186        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
187
188    def test_dump_encoding(self):
189        value = {'key\u20ac\xa4':
190                 'value\u20ac\xa4'}
191        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
192        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
193        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
194        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
195        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196
197        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
198                               methodresponse=True)
199        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
200        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
201        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
202
203        methodname = 'method\u20ac\xa4'
204        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
205                               methodname=methodname)
206        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
207        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
208
209    def test_dump_bytes(self):
210        sample = b"my dog has fleas"
211        self.assertEqual(sample, xmlrpclib.Binary(sample))
212        for type_ in bytes, bytearray, xmlrpclib.Binary:
213            value = type_(sample)
214            s = xmlrpclib.dumps((value,))
215
216            result, m = xmlrpclib.loads(s, use_builtin_types=True)
217            (newvalue,) = result
218            self.assertEqual(newvalue, sample)
219            self.assertIs(type(newvalue), bytes)
220            self.assertIsNone(m)
221
222            result, m = xmlrpclib.loads(s, use_builtin_types=False)
223            (newvalue,) = result
224            self.assertEqual(newvalue, sample)
225            self.assertIs(type(newvalue), xmlrpclib.Binary)
226            self.assertIsNone(m)
227
228    def test_loads_unsupported(self):
229        ResponseError = xmlrpclib.ResponseError
230        data = '<params><param><value><spam/></value></param></params>'
231        self.assertRaises(ResponseError, xmlrpclib.loads, data)
232        data = ('<params><param><value><array>'
233                '<value><spam/></value>'
234                '</array></value></param></params>')
235        self.assertRaises(ResponseError, xmlrpclib.loads, data)
236        data = ('<params><param><value><struct>'
237                '<member><name>a</name><value><spam/></value></member>'
238                '<member><name>b</name><value><spam/></value></member>'
239                '</struct></value></param></params>')
240        self.assertRaises(ResponseError, xmlrpclib.loads, data)
241
242    def check_loads(self, s, value, **kwargs):
243        dump = '<params><param><value>%s</value></param></params>' % s
244        result, m = xmlrpclib.loads(dump, **kwargs)
245        (newvalue,) = result
246        self.assertEqual(newvalue, value)
247        self.assertIs(type(newvalue), type(value))
248        self.assertIsNone(m)
249
250    def test_load_standard_types(self):
251        check = self.check_loads
252        check('string', 'string')
253        check('<string>string</string>', 'string')
254        check('<string>�������������� string</string>', '�������������� string')
255        check('<int>2056183947</int>', 2056183947)
256        check('<int>-2056183947</int>', -2056183947)
257        check('<i4>2056183947</i4>', 2056183947)
258        check('<double>46093.78125</double>', 46093.78125)
259        check('<boolean>0</boolean>', False)
260        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
261              xmlrpclib.Binary(b'\x00byte string\xff'))
262        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
263              b'\x00byte string\xff', use_builtin_types=True)
264        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
265              xmlrpclib.DateTime('20050210T11:41:23'))
266        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
267              datetime.datetime(2005, 2, 10, 11, 41, 23),
268              use_builtin_types=True)
269        check('<array><data>'
270              '<value><int>1</int></value><value><int>2</int></value>'
271              '</data></array>', [1, 2])
272        check('<struct>'
273              '<member><name>b</name><value><int>2</int></value></member>'
274              '<member><name>a</name><value><int>1</int></value></member>'
275              '</struct>', {'a': 1, 'b': 2})
276
277    def test_load_extension_types(self):
278        check = self.check_loads
279        check('<nil/>', None)
280        check('<ex:nil/>', None)
281        check('<i1>205</i1>', 205)
282        check('<i2>20561</i2>', 20561)
283        check('<i8>9876543210</i8>', 9876543210)
284        check('<biginteger>98765432100123456789</biginteger>',
285              98765432100123456789)
286        check('<float>93.78125</float>', 93.78125)
287        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
288              decimal.Decimal('9876543210.0123456789'))
289
290    def test_limit_int(self):
291        check = self.check_loads
292        maxdigits = 5000
293        with support.adjust_int_max_str_digits(maxdigits):
294            s = '1' * (maxdigits + 1)
295            with self.assertRaises(ValueError):
296                check(f'<int>{s}</int>', None)
297            with self.assertRaises(ValueError):
298                check(f'<biginteger>{s}</biginteger>', None)
299
300    def test_get_host_info(self):
301        # see bug #3613, this raised a TypeError
302        transp = xmlrpc.client.Transport()
303        self.assertEqual(transp.get_host_info("user@host.tld"),
304                          ('host.tld',
305                           [('Authorization', 'Basic dXNlcg==')], {}))
306
307    def test_ssl_presence(self):
308        try:
309            import ssl
310        except ImportError:
311            has_ssl = False
312        else:
313            has_ssl = True
314        try:
315            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
316        except NotImplementedError:
317            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
318        except OSError:
319            self.assertTrue(has_ssl)
320
321    def test_keepalive_disconnect(self):
322        class RequestHandler(http.server.BaseHTTPRequestHandler):
323            protocol_version = "HTTP/1.1"
324            handled = False
325
326            def do_POST(self):
327                length = int(self.headers.get("Content-Length"))
328                self.rfile.read(length)
329                if self.handled:
330                    self.close_connection = True
331                    return
332                response = xmlrpclib.dumps((5,), methodresponse=True)
333                response = response.encode()
334                self.send_response(http.HTTPStatus.OK)
335                self.send_header("Content-Length", len(response))
336                self.end_headers()
337                self.wfile.write(response)
338                self.handled = True
339                self.close_connection = False
340
341            def log_message(self, format, *args):
342                # don't clobber sys.stderr
343                pass
344
345        def run_server():
346            server.socket.settimeout(float(1))  # Don't hang if client fails
347            server.handle_request()  # First request and attempt at second
348            server.handle_request()  # Retried second request
349
350        server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
351        self.addCleanup(server.server_close)
352        thread = threading.Thread(target=run_server)
353        thread.start()
354        self.addCleanup(thread.join)
355        url = "http://{}:{}/".format(*server.server_address)
356        with xmlrpclib.ServerProxy(url) as p:
357            self.assertEqual(p.method(), 5)
358            self.assertEqual(p.method(), 5)
359
360
361class SimpleXMLRPCDispatcherTestCase(unittest.TestCase):
362    class DispatchExc(Exception):
363        """Raised inside the dispatched functions when checking for
364        chained exceptions"""
365
366    def test_call_registered_func(self):
367        """Calls explicitly registered function"""
368        # Makes sure any exception raised inside the function has no other
369        # exception chained to it
370
371        exp_params = 1, 2, 3
372
373        def dispatched_func(*params):
374            raise self.DispatchExc(params)
375
376        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
377        dispatcher.register_function(dispatched_func)
378        with self.assertRaises(self.DispatchExc) as exc_ctx:
379            dispatcher._dispatch('dispatched_func', exp_params)
380        self.assertEqual(exc_ctx.exception.args, (exp_params,))
381        self.assertIsNone(exc_ctx.exception.__cause__)
382        self.assertIsNone(exc_ctx.exception.__context__)
383
384    def test_call_instance_func(self):
385        """Calls a registered instance attribute as a function"""
386        # Makes sure any exception raised inside the function has no other
387        # exception chained to it
388
389        exp_params = 1, 2, 3
390
391        class DispatchedClass:
392            def dispatched_func(self, *params):
393                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
394
395        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
396        dispatcher.register_instance(DispatchedClass())
397        with self.assertRaises(self.DispatchExc) as exc_ctx:
398            dispatcher._dispatch('dispatched_func', exp_params)
399        self.assertEqual(exc_ctx.exception.args, (exp_params,))
400        self.assertIsNone(exc_ctx.exception.__cause__)
401        self.assertIsNone(exc_ctx.exception.__context__)
402
403    def test_call_dispatch_func(self):
404        """Calls the registered instance's `_dispatch` function"""
405        # Makes sure any exception raised inside the function has no other
406        # exception chained to it
407
408        exp_method = 'method'
409        exp_params = 1, 2, 3
410
411        class TestInstance:
412            def _dispatch(self, method, params):
413                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
414                    method, params)
415
416        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
417        dispatcher.register_instance(TestInstance())
418        with self.assertRaises(self.DispatchExc) as exc_ctx:
419            dispatcher._dispatch(exp_method, exp_params)
420        self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
421        self.assertIsNone(exc_ctx.exception.__cause__)
422        self.assertIsNone(exc_ctx.exception.__context__)
423
424    def test_registered_func_is_none(self):
425        """Calls explicitly registered function which is None"""
426
427        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
428        dispatcher.register_function(None, name='method')
429        with self.assertRaisesRegex(Exception, 'method'):
430            dispatcher._dispatch('method', ('param',))
431
432    def test_instance_has_no_func(self):
433        """Attempts to call nonexistent function on a registered instance"""
434
435        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
436        dispatcher.register_instance(object())
437        with self.assertRaisesRegex(Exception, 'method'):
438            dispatcher._dispatch('method', ('param',))
439
440    def test_cannot_locate_func(self):
441        """Calls a function that the dispatcher cannot locate"""
442
443        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
444        with self.assertRaisesRegex(Exception, 'method'):
445            dispatcher._dispatch('method', ('param',))
446
447
448class HelperTestCase(unittest.TestCase):
449    def test_escape(self):
450        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
451        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
452        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
453
454class FaultTestCase(unittest.TestCase):
455    def test_repr(self):
456        f = xmlrpclib.Fault(42, 'Test Fault')
457        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
458        self.assertEqual(repr(f), str(f))
459
460    def test_dump_fault(self):
461        f = xmlrpclib.Fault(42, 'Test Fault')
462        s = xmlrpclib.dumps((f,))
463        (newf,), m = xmlrpclib.loads(s)
464        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
465        self.assertEqual(m, None)
466
467        s = xmlrpclib.Marshaller().dumps(f)
468        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
469
470    def test_dotted_attribute(self):
471        # this will raise AttributeError because code don't want us to use
472        # private methods
473        self.assertRaises(AttributeError,
474                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
475        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
476
477class DateTimeTestCase(unittest.TestCase):
478    def test_default(self):
479        with mock.patch('time.localtime') as localtime_mock:
480            time_struct = time.struct_time(
481                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
482            localtime_mock.return_value = time_struct
483            localtime = time.localtime()
484            t = xmlrpclib.DateTime()
485            self.assertEqual(str(t),
486                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
487
488    def test_time(self):
489        d = 1181399930.036952
490        t = xmlrpclib.DateTime(d)
491        self.assertEqual(str(t),
492                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
493
494    def test_time_tuple(self):
495        d = (2007,6,9,10,38,50,5,160,0)
496        t = xmlrpclib.DateTime(d)
497        self.assertEqual(str(t), '20070609T10:38:50')
498
499    def test_time_struct(self):
500        d = time.localtime(1181399930.036952)
501        t = xmlrpclib.DateTime(d)
502        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
503
504    def test_datetime_datetime(self):
505        d = datetime.datetime(2007,1,2,3,4,5)
506        t = xmlrpclib.DateTime(d)
507        self.assertEqual(str(t), '20070102T03:04:05')
508
509    def test_repr(self):
510        d = datetime.datetime(2007,1,2,3,4,5)
511        t = xmlrpclib.DateTime(d)
512        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
513        self.assertEqual(repr(t), val)
514
515    def test_decode(self):
516        d = ' 20070908T07:11:13  '
517        t1 = xmlrpclib.DateTime()
518        t1.decode(d)
519        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
520        self.assertEqual(t1, tref)
521
522        t2 = xmlrpclib._datetime(d)
523        self.assertEqual(t2, tref)
524
525    def test_comparison(self):
526        now = datetime.datetime.now()
527        dtime = xmlrpclib.DateTime(now.timetuple())
528
529        # datetime vs. DateTime
530        self.assertTrue(dtime == now)
531        self.assertTrue(now == dtime)
532        then = now + datetime.timedelta(seconds=4)
533        self.assertTrue(then >= dtime)
534        self.assertTrue(dtime < then)
535
536        # str vs. DateTime
537        dstr = now.strftime("%Y%m%dT%H:%M:%S")
538        self.assertTrue(dtime == dstr)
539        self.assertTrue(dstr == dtime)
540        dtime_then = xmlrpclib.DateTime(then.timetuple())
541        self.assertTrue(dtime_then >= dstr)
542        self.assertTrue(dstr < dtime_then)
543
544        # some other types
545        dbytes = dstr.encode('ascii')
546        dtuple = now.timetuple()
547        self.assertFalse(dtime == 1970)
548        self.assertTrue(dtime != dbytes)
549        self.assertFalse(dtime == bytearray(dbytes))
550        self.assertTrue(dtime != dtuple)
551        with self.assertRaises(TypeError):
552            dtime < float(1970)
553        with self.assertRaises(TypeError):
554            dtime > dbytes
555        with self.assertRaises(TypeError):
556            dtime <= bytearray(dbytes)
557        with self.assertRaises(TypeError):
558            dtime >= dtuple
559
560        self.assertTrue(dtime == ALWAYS_EQ)
561        self.assertFalse(dtime != ALWAYS_EQ)
562        self.assertTrue(dtime < LARGEST)
563        self.assertFalse(dtime > LARGEST)
564        self.assertTrue(dtime <= LARGEST)
565        self.assertFalse(dtime >= LARGEST)
566        self.assertFalse(dtime < SMALLEST)
567        self.assertTrue(dtime > SMALLEST)
568        self.assertFalse(dtime <= SMALLEST)
569        self.assertTrue(dtime >= SMALLEST)
570
571
572class BinaryTestCase(unittest.TestCase):
573
574    # XXX What should str(Binary(b"\xff")) return?  I'm choosing "\xff"
575    # for now (i.e. interpreting the binary data as Latin-1-encoded
576    # text).  But this feels very unsatisfactory.  Perhaps we should
577    # only define repr(), and return r"Binary(b'\xff')" instead?
578
579    def test_default(self):
580        t = xmlrpclib.Binary()
581        self.assertEqual(str(t), '')
582
583    def test_string(self):
584        d = b'\x01\x02\x03abc123\xff\xfe'
585        t = xmlrpclib.Binary(d)
586        self.assertEqual(str(t), str(d, "latin-1"))
587
588    def test_decode(self):
589        d = b'\x01\x02\x03abc123\xff\xfe'
590        de = base64.encodebytes(d)
591        t1 = xmlrpclib.Binary()
592        t1.decode(de)
593        self.assertEqual(str(t1), str(d, "latin-1"))
594
595        t2 = xmlrpclib._binary(de)
596        self.assertEqual(str(t2), str(d, "latin-1"))
597
598
599ADDR = PORT = URL = None
600
601# The evt is set twice.  First when the server is ready to serve.
602# Second when the server has been shutdown.  The user must clear
603# the event after it has been set the first time to catch the second set.
604def http_server(evt, numrequests, requestHandler=None, encoding=None):
605    class TestInstanceClass:
606        def div(self, x, y):
607            return x // y
608
609        def _methodHelp(self, name):
610            if name == 'div':
611                return 'This is the div function'
612
613        class Fixture:
614            @staticmethod
615            def getData():
616                return '42'
617
618    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
619        def get_request(self):
620            # Ensure the socket is always non-blocking.  On Linux, socket
621            # attributes are not inherited like they are on *BSD and Windows.
622            s, port = self.socket.accept()
623            s.setblocking(True)
624            return s, port
625
626    if not requestHandler:
627        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
628    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
629                          encoding=encoding,
630                          logRequests=False, bind_and_activate=False)
631    try:
632        serv.server_bind()
633        global ADDR, PORT, URL
634        ADDR, PORT = serv.socket.getsockname()
635        #connect to IP address directly.  This avoids socket.create_connection()
636        #trying to connect to "localhost" using all address families, which
637        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
638        #on AF_INET only.
639        URL = "http://%s:%d"%(ADDR, PORT)
640        serv.server_activate()
641        serv.register_introspection_functions()
642        serv.register_multicall_functions()
643        serv.register_function(pow)
644        serv.register_function(lambda x: x, 'têšt')
645        @serv.register_function
646        def my_function():
647            '''This is my function'''
648            return True
649        @serv.register_function(name='add')
650        def _(x, y):
651            return x + y
652        testInstance = TestInstanceClass()
653        serv.register_instance(testInstance, allow_dotted_names=True)
654        evt.set()
655
656        # handle up to 'numrequests' requests
657        while numrequests > 0:
658            serv.handle_request()
659            numrequests -= 1
660
661    except TimeoutError:
662        pass
663    finally:
664        serv.socket.close()
665        PORT = None
666        evt.set()
667
668def http_multi_server(evt, numrequests, requestHandler=None):
669    class TestInstanceClass:
670        def div(self, x, y):
671            return x // y
672
673        def _methodHelp(self, name):
674            if name == 'div':
675                return 'This is the div function'
676
677    def my_function():
678        '''This is my function'''
679        return True
680
681    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
682        def get_request(self):
683            # Ensure the socket is always non-blocking.  On Linux, socket
684            # attributes are not inherited like they are on *BSD and Windows.
685            s, port = self.socket.accept()
686            s.setblocking(True)
687            return s, port
688
689    if not requestHandler:
690        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
691    class MyRequestHandler(requestHandler):
692        rpc_paths = []
693
694    class BrokenDispatcher:
695        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
696            raise RuntimeError("broken dispatcher")
697
698    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
699                          logRequests=False, bind_and_activate=False)
700    serv.socket.settimeout(3)
701    serv.server_bind()
702    try:
703        global ADDR, PORT, URL
704        ADDR, PORT = serv.socket.getsockname()
705        #connect to IP address directly.  This avoids socket.create_connection()
706        #trying to connect to "localhost" using all address families, which
707        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
708        #on AF_INET only.
709        URL = "http://%s:%d"%(ADDR, PORT)
710        serv.server_activate()
711        paths = [
712            "/foo", "/foo/bar",
713            "/foo?k=v", "/foo#frag", "/foo?k=v#frag",
714            "", "/", "/RPC2", "?k=v", "#frag",
715        ]
716        for path in paths:
717            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
718            d.register_introspection_functions()
719            d.register_multicall_functions()
720            d.register_function(lambda p=path: p, 'test')
721        serv.get_dispatcher(paths[0]).register_function(pow)
722        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
723        serv.add_dispatcher("/is/broken", BrokenDispatcher())
724        evt.set()
725
726        # handle up to 'numrequests' requests
727        while numrequests > 0:
728            serv.handle_request()
729            numrequests -= 1
730
731    except TimeoutError:
732        pass
733    finally:
734        serv.socket.close()
735        PORT = None
736        evt.set()
737
738# This function prevents errors like:
739#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
740def is_unavailable_exception(e):
741    '''Returns True if the given ProtocolError is the product of a server-side
742       exception caused by the 'temporarily unavailable' response sometimes
743       given by operations on non-blocking sockets.'''
744
745    # sometimes we get a -1 error code and/or empty headers
746    try:
747        if e.errcode == -1 or e.headers is None:
748            return True
749        exc_mess = e.headers.get('X-exception')
750    except AttributeError:
751        # Ignore OSErrors here.
752        exc_mess = str(e)
753
754    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
755        return True
756
757def make_request_and_skipIf(condition, reason):
758    # If we skip the test, we have to make a request because
759    # the server created in setUp blocks expecting one to come in.
760    if not condition:
761        return lambda func: func
762    def decorator(func):
763        def make_request_and_skip(self):
764            try:
765                xmlrpclib.ServerProxy(URL).my_function()
766            except (xmlrpclib.ProtocolError, OSError) as e:
767                if not is_unavailable_exception(e):
768                    raise
769            raise unittest.SkipTest(reason)
770        return make_request_and_skip
771    return decorator
772
773class BaseServerTestCase(unittest.TestCase):
774    requestHandler = None
775    request_count = 1
776    threadFunc = staticmethod(http_server)
777
778    def setUp(self):
779        # enable traceback reporting
780        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
781
782        self.evt = threading.Event()
783        # start server thread to handle requests
784        serv_args = (self.evt, self.request_count, self.requestHandler)
785        thread = threading.Thread(target=self.threadFunc, args=serv_args)
786        thread.start()
787        self.addCleanup(thread.join)
788
789        # wait for the server to be ready
790        self.evt.wait()
791        self.evt.clear()
792
793    def tearDown(self):
794        # wait on the server thread to terminate
795        self.evt.wait()
796
797        # disable traceback reporting
798        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
799
800class SimpleServerTestCase(BaseServerTestCase):
801    def test_simple1(self):
802        try:
803            p = xmlrpclib.ServerProxy(URL)
804            self.assertEqual(p.pow(6,8), 6**8)
805        except (xmlrpclib.ProtocolError, OSError) as e:
806            # ignore failures due to non-blocking socket 'unavailable' errors
807            if not is_unavailable_exception(e):
808                # protocol error; provide additional information in test output
809                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
810
811    def test_nonascii(self):
812        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
813        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
814        try:
815            p = xmlrpclib.ServerProxy(URL)
816            self.assertEqual(p.add(start_string, end_string),
817                             start_string + end_string)
818        except (xmlrpclib.ProtocolError, OSError) as e:
819            # ignore failures due to non-blocking socket 'unavailable' errors
820            if not is_unavailable_exception(e):
821                # protocol error; provide additional information in test output
822                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
823
824    def test_client_encoding(self):
825        start_string = '\u20ac'
826        end_string = '\xa4'
827
828        try:
829            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
830            self.assertEqual(p.add(start_string, end_string),
831                             start_string + end_string)
832        except (xmlrpclib.ProtocolError, socket.error) as e:
833            # ignore failures due to non-blocking socket unavailable errors.
834            if not is_unavailable_exception(e):
835                # protocol error; provide additional information in test output
836                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
837
838    def test_nonascii_methodname(self):
839        try:
840            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
841            self.assertEqual(p.têšt(42), 42)
842        except (xmlrpclib.ProtocolError, socket.error) as e:
843            # ignore failures due to non-blocking socket unavailable errors.
844            if not is_unavailable_exception(e):
845                # protocol error; provide additional information in test output
846                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
847
848    def test_404(self):
849        # send POST with http.client, it should return 404 header and
850        # 'Not Found' message.
851        with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
852            conn.request('POST', '/this-is-not-valid')
853            response = conn.getresponse()
854
855        self.assertEqual(response.status, 404)
856        self.assertEqual(response.reason, 'Not Found')
857
858    def test_introspection1(self):
859        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
860                                'system.listMethods', 'system.methodHelp',
861                                'system.methodSignature', 'system.multicall',
862                                'Fixture'])
863        try:
864            p = xmlrpclib.ServerProxy(URL)
865            meth = p.system.listMethods()
866            self.assertEqual(set(meth), expected_methods)
867        except (xmlrpclib.ProtocolError, OSError) as e:
868            # ignore failures due to non-blocking socket 'unavailable' errors
869            if not is_unavailable_exception(e):
870                # protocol error; provide additional information in test output
871                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
872
873
874    def test_introspection2(self):
875        try:
876            # test _methodHelp()
877            p = xmlrpclib.ServerProxy(URL)
878            divhelp = p.system.methodHelp('div')
879            self.assertEqual(divhelp, 'This is the div function')
880        except (xmlrpclib.ProtocolError, OSError) as e:
881            # ignore failures due to non-blocking socket 'unavailable' errors
882            if not is_unavailable_exception(e):
883                # protocol error; provide additional information in test output
884                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
885
886    @make_request_and_skipIf(sys.flags.optimize >= 2,
887                     "Docstrings are omitted with -O2 and above")
888    def test_introspection3(self):
889        try:
890            # test native doc
891            p = xmlrpclib.ServerProxy(URL)
892            myfunction = p.system.methodHelp('my_function')
893            self.assertEqual(myfunction, 'This is my function')
894        except (xmlrpclib.ProtocolError, OSError) as e:
895            # ignore failures due to non-blocking socket 'unavailable' errors
896            if not is_unavailable_exception(e):
897                # protocol error; provide additional information in test output
898                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
899
900    def test_introspection4(self):
901        # the SimpleXMLRPCServer doesn't support signatures, but
902        # at least check that we can try making the call
903        try:
904            p = xmlrpclib.ServerProxy(URL)
905            divsig = p.system.methodSignature('div')
906            self.assertEqual(divsig, 'signatures not supported')
907        except (xmlrpclib.ProtocolError, OSError) as e:
908            # ignore failures due to non-blocking socket 'unavailable' errors
909            if not is_unavailable_exception(e):
910                # protocol error; provide additional information in test output
911                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
912
913    def test_multicall(self):
914        try:
915            p = xmlrpclib.ServerProxy(URL)
916            multicall = xmlrpclib.MultiCall(p)
917            multicall.add(2,3)
918            multicall.pow(6,8)
919            multicall.div(127,42)
920            add_result, pow_result, div_result = multicall()
921            self.assertEqual(add_result, 2+3)
922            self.assertEqual(pow_result, 6**8)
923            self.assertEqual(div_result, 127//42)
924        except (xmlrpclib.ProtocolError, OSError) as e:
925            # ignore failures due to non-blocking socket 'unavailable' errors
926            if not is_unavailable_exception(e):
927                # protocol error; provide additional information in test output
928                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
929
930    def test_non_existing_multicall(self):
931        try:
932            p = xmlrpclib.ServerProxy(URL)
933            multicall = xmlrpclib.MultiCall(p)
934            multicall.this_is_not_exists()
935            result = multicall()
936
937            # result.results contains;
938            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
939            #   'method "this_is_not_exists" is not supported'>}]
940
941            self.assertEqual(result.results[0]['faultCode'], 1)
942            self.assertEqual(result.results[0]['faultString'],
943                '<class \'Exception\'>:method "this_is_not_exists" '
944                'is not supported')
945        except (xmlrpclib.ProtocolError, OSError) as e:
946            # ignore failures due to non-blocking socket 'unavailable' errors
947            if not is_unavailable_exception(e):
948                # protocol error; provide additional information in test output
949                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
950
951    def test_dotted_attribute(self):
952        # Raises an AttributeError because private methods are not allowed.
953        self.assertRaises(AttributeError,
954                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
955
956        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
957        # Get the test to run faster by sending a request with test_simple1.
958        # This avoids waiting for the socket timeout.
959        self.test_simple1()
960
961    def test_allow_dotted_names_true(self):
962        # XXX also need allow_dotted_names_false test.
963        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
964        data = server.Fixture.getData()
965        self.assertEqual(data, '42')
966
967    def test_unicode_host(self):
968        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
969        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
970
971    def test_partial_post(self):
972        # Check that a partial POST doesn't make the server loop: issue #14001.
973        with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
974            conn.send('POST /RPC2 HTTP/1.0\r\n'
975                      'Content-Length: 100\r\n\r\n'
976                      'bye HTTP/1.1\r\n'
977                      f'Host: {ADDR}:{PORT}\r\n'
978                      'Accept-Encoding: identity\r\n'
979                      'Content-Length: 0\r\n\r\n'.encode('ascii'))
980
981    def test_context_manager(self):
982        with xmlrpclib.ServerProxy(URL) as server:
983            server.add(2, 3)
984            self.assertNotEqual(server('transport')._connection,
985                                (None, None))
986        self.assertEqual(server('transport')._connection,
987                         (None, None))
988
989    def test_context_manager_method_error(self):
990        try:
991            with xmlrpclib.ServerProxy(URL) as server:
992                server.add(2, "a")
993        except xmlrpclib.Fault:
994            pass
995        self.assertEqual(server('transport')._connection,
996                         (None, None))
997
998
999class SimpleServerEncodingTestCase(BaseServerTestCase):
1000    @staticmethod
1001    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
1002        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
1003
1004    def test_server_encoding(self):
1005        start_string = '\u20ac'
1006        end_string = '\xa4'
1007
1008        try:
1009            p = xmlrpclib.ServerProxy(URL)
1010            self.assertEqual(p.add(start_string, end_string),
1011                             start_string + end_string)
1012        except (xmlrpclib.ProtocolError, socket.error) as e:
1013            # ignore failures due to non-blocking socket unavailable errors.
1014            if not is_unavailable_exception(e):
1015                # protocol error; provide additional information in test output
1016                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1017
1018
1019class MultiPathServerTestCase(BaseServerTestCase):
1020    threadFunc = staticmethod(http_multi_server)
1021    request_count = 2
1022    def test_path1(self):
1023        p = xmlrpclib.ServerProxy(URL+"/foo")
1024        self.assertEqual(p.pow(6,8), 6**8)
1025        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1026
1027    def test_path2(self):
1028        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
1029        self.assertEqual(p.add(6,8), 6+8)
1030        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1031
1032    def test_path3(self):
1033        p = xmlrpclib.ServerProxy(URL+"/is/broken")
1034        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1035
1036    def test_invalid_path(self):
1037        p = xmlrpclib.ServerProxy(URL+"/invalid")
1038        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1039
1040    def test_path_query_fragment(self):
1041        p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
1042        self.assertEqual(p.test(), "/foo?k=v#frag")
1043
1044    def test_path_fragment(self):
1045        p = xmlrpclib.ServerProxy(URL+"/foo#frag")
1046        self.assertEqual(p.test(), "/foo#frag")
1047
1048    def test_path_query(self):
1049        p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
1050        self.assertEqual(p.test(), "/foo?k=v")
1051
1052    def test_empty_path(self):
1053        p = xmlrpclib.ServerProxy(URL)
1054        self.assertEqual(p.test(), "/RPC2")
1055
1056    def test_root_path(self):
1057        p = xmlrpclib.ServerProxy(URL + "/")
1058        self.assertEqual(p.test(), "/")
1059
1060    def test_empty_path_query(self):
1061        p = xmlrpclib.ServerProxy(URL + "?k=v")
1062        self.assertEqual(p.test(), "?k=v")
1063
1064    def test_empty_path_fragment(self):
1065        p = xmlrpclib.ServerProxy(URL + "#frag")
1066        self.assertEqual(p.test(), "#frag")
1067
1068
1069#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1070#does indeed serve subsequent requests on the same connection
1071class BaseKeepaliveServerTestCase(BaseServerTestCase):
1072    #a request handler that supports keep-alive and logs requests into a
1073    #class variable
1074    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1075        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1076        protocol_version = 'HTTP/1.1'
1077        myRequests = []
1078        def handle(self):
1079            self.myRequests.append([])
1080            self.reqidx = len(self.myRequests)-1
1081            return self.parentClass.handle(self)
1082        def handle_one_request(self):
1083            result = self.parentClass.handle_one_request(self)
1084            self.myRequests[self.reqidx].append(self.raw_requestline)
1085            return result
1086
1087    requestHandler = RequestHandler
1088    def setUp(self):
1089        #clear request log
1090        self.RequestHandler.myRequests = []
1091        return BaseServerTestCase.setUp(self)
1092
1093#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1094#does indeed serve subsequent requests on the same connection
1095class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
1096    def test_two(self):
1097        p = xmlrpclib.ServerProxy(URL)
1098        #do three requests.
1099        self.assertEqual(p.pow(6,8), 6**8)
1100        self.assertEqual(p.pow(6,8), 6**8)
1101        self.assertEqual(p.pow(6,8), 6**8)
1102        p("close")()
1103
1104        #they should have all been handled by a single request handler
1105        self.assertEqual(len(self.RequestHandler.myRequests), 1)
1106
1107        #check that we did at least two (the third may be pending append
1108        #due to thread scheduling)
1109        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1110
1111
1112#test special attribute access on the serverproxy, through the __call__
1113#function.
1114class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
1115    #ask for two keepalive requests to be handled.
1116    request_count=2
1117
1118    def test_close(self):
1119        p = xmlrpclib.ServerProxy(URL)
1120        #do some requests with close.
1121        self.assertEqual(p.pow(6,8), 6**8)
1122        self.assertEqual(p.pow(6,8), 6**8)
1123        self.assertEqual(p.pow(6,8), 6**8)
1124        p("close")() #this should trigger a new keep-alive request
1125        self.assertEqual(p.pow(6,8), 6**8)
1126        self.assertEqual(p.pow(6,8), 6**8)
1127        self.assertEqual(p.pow(6,8), 6**8)
1128        p("close")()
1129
1130        #they should have all been two request handlers, each having logged at least
1131        #two complete requests
1132        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1133        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1134        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1135
1136
1137    def test_transport(self):
1138        p = xmlrpclib.ServerProxy(URL)
1139        #do some requests with close.
1140        self.assertEqual(p.pow(6,8), 6**8)
1141        p("transport").close() #same as above, really.
1142        self.assertEqual(p.pow(6,8), 6**8)
1143        p("close")()
1144        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1145
1146#A test case that verifies that gzip encoding works in both directions
1147#(for a request and the response)
1148@unittest.skipIf(gzip is None, 'requires gzip')
1149class GzipServerTestCase(BaseServerTestCase):
1150    #a request handler that supports keep-alive and logs requests into a
1151    #class variable
1152    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1153        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1154        protocol_version = 'HTTP/1.1'
1155
1156        def do_POST(self):
1157            #store content of last request in class
1158            self.__class__.content_length = int(self.headers["content-length"])
1159            return self.parentClass.do_POST(self)
1160    requestHandler = RequestHandler
1161
1162    class Transport(xmlrpclib.Transport):
1163        #custom transport, stores the response length for our perusal
1164        fake_gzip = False
1165        def parse_response(self, response):
1166            self.response_length=int(response.getheader("content-length", 0))
1167            return xmlrpclib.Transport.parse_response(self, response)
1168
1169        def send_content(self, connection, body):
1170            if self.fake_gzip:
1171                #add a lone gzip header to induce decode error remotely
1172                connection.putheader("Content-Encoding", "gzip")
1173            return xmlrpclib.Transport.send_content(self, connection, body)
1174
1175    def setUp(self):
1176        BaseServerTestCase.setUp(self)
1177
1178    def test_gzip_request(self):
1179        t = self.Transport()
1180        t.encode_threshold = None
1181        p = xmlrpclib.ServerProxy(URL, transport=t)
1182        self.assertEqual(p.pow(6,8), 6**8)
1183        a = self.RequestHandler.content_length
1184        t.encode_threshold = 0 #turn on request encoding
1185        self.assertEqual(p.pow(6,8), 6**8)
1186        b = self.RequestHandler.content_length
1187        self.assertTrue(a>b)
1188        p("close")()
1189
1190    def test_bad_gzip_request(self):
1191        t = self.Transport()
1192        t.encode_threshold = None
1193        t.fake_gzip = True
1194        p = xmlrpclib.ServerProxy(URL, transport=t)
1195        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1196                                    re.compile(r"\b400\b"))
1197        with cm:
1198            p.pow(6, 8)
1199        p("close")()
1200
1201    def test_gzip_response(self):
1202        t = self.Transport()
1203        p = xmlrpclib.ServerProxy(URL, transport=t)
1204        old = self.requestHandler.encode_threshold
1205        self.requestHandler.encode_threshold = None #no encoding
1206        self.assertEqual(p.pow(6,8), 6**8)
1207        a = t.response_length
1208        self.requestHandler.encode_threshold = 0 #always encode
1209        self.assertEqual(p.pow(6,8), 6**8)
1210        p("close")()
1211        b = t.response_length
1212        self.requestHandler.encode_threshold = old
1213        self.assertTrue(a>b)
1214
1215
1216@unittest.skipIf(gzip is None, 'requires gzip')
1217class GzipUtilTestCase(unittest.TestCase):
1218
1219    def test_gzip_decode_limit(self):
1220        max_gzip_decode = 20 * 1024 * 1024
1221        data = b'\0' * max_gzip_decode
1222        encoded = xmlrpclib.gzip_encode(data)
1223        decoded = xmlrpclib.gzip_decode(encoded)
1224        self.assertEqual(len(decoded), max_gzip_decode)
1225
1226        data = b'\0' * (max_gzip_decode + 1)
1227        encoded = xmlrpclib.gzip_encode(data)
1228
1229        with self.assertRaisesRegex(ValueError,
1230                                    "max gzipped payload length exceeded"):
1231            xmlrpclib.gzip_decode(encoded)
1232
1233        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1234
1235
1236class HeadersServerTestCase(BaseServerTestCase):
1237    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1238        test_headers = None
1239
1240        def do_POST(self):
1241            self.__class__.test_headers = self.headers
1242            return super().do_POST()
1243    requestHandler = RequestHandler
1244    standard_headers = [
1245        'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
1246        'Content-Length']
1247
1248    def setUp(self):
1249        self.RequestHandler.test_headers = None
1250        return super().setUp()
1251
1252    def assertContainsAdditionalHeaders(self, headers, additional):
1253        expected_keys = sorted(self.standard_headers + list(additional.keys()))
1254        self.assertListEqual(sorted(headers.keys()), expected_keys)
1255
1256        for key, value in additional.items():
1257            self.assertEqual(headers.get(key), value)
1258
1259    def test_header(self):
1260        p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
1261        self.assertEqual(p.pow(6, 8), 6**8)
1262
1263        headers = self.RequestHandler.test_headers
1264        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1265
1266    def test_header_many(self):
1267        p = xmlrpclib.ServerProxy(
1268            URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
1269        self.assertEqual(p.pow(6, 8), 6**8)
1270
1271        headers = self.RequestHandler.test_headers
1272        self.assertContainsAdditionalHeaders(
1273            headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
1274
1275    def test_header_empty(self):
1276        p = xmlrpclib.ServerProxy(URL, headers=[])
1277        self.assertEqual(p.pow(6, 8), 6**8)
1278
1279        headers = self.RequestHandler.test_headers
1280        self.assertContainsAdditionalHeaders(headers, {})
1281
1282    def test_header_tuple(self):
1283        p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
1284        self.assertEqual(p.pow(6, 8), 6**8)
1285
1286        headers = self.RequestHandler.test_headers
1287        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1288
1289    def test_header_items(self):
1290        p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
1291        self.assertEqual(p.pow(6, 8), 6**8)
1292
1293        headers = self.RequestHandler.test_headers
1294        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1295
1296
1297#Test special attributes of the ServerProxy object
1298class ServerProxyTestCase(unittest.TestCase):
1299    def setUp(self):
1300        unittest.TestCase.setUp(self)
1301        # Actual value of the URL doesn't matter if it is a string in
1302        # the correct format.
1303        self.url = 'http://fake.localhost'
1304
1305    def test_close(self):
1306        p = xmlrpclib.ServerProxy(self.url)
1307        self.assertEqual(p('close')(), None)
1308
1309    def test_transport(self):
1310        t = xmlrpclib.Transport()
1311        p = xmlrpclib.ServerProxy(self.url, transport=t)
1312        self.assertEqual(p('transport'), t)
1313
1314
1315# This is a contrived way to make a failure occur on the server side
1316# in order to test the _send_traceback_header flag on the server
1317class FailingMessageClass(http.client.HTTPMessage):
1318    def get(self, key, failobj=None):
1319        key = key.lower()
1320        if key == 'content-length':
1321            return 'I am broken'
1322        return super().get(key, failobj)
1323
1324
1325class FailingServerTestCase(unittest.TestCase):
1326    def setUp(self):
1327        self.evt = threading.Event()
1328        # start server thread to handle requests
1329        serv_args = (self.evt, 1)
1330        thread = threading.Thread(target=http_server, args=serv_args)
1331        thread.start()
1332        self.addCleanup(thread.join)
1333
1334        # wait for the server to be ready
1335        self.evt.wait()
1336        self.evt.clear()
1337
1338    def tearDown(self):
1339        # wait on the server thread to terminate
1340        self.evt.wait()
1341        # reset flag
1342        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1343        # reset message class
1344        default_class = http.client.HTTPMessage
1345        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1346
1347    def test_basic(self):
1348        # check that flag is false by default
1349        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1350        self.assertEqual(flagval, False)
1351
1352        # enable traceback reporting
1353        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1354
1355        # test a call that shouldn't fail just as a smoke test
1356        try:
1357            p = xmlrpclib.ServerProxy(URL)
1358            self.assertEqual(p.pow(6,8), 6**8)
1359        except (xmlrpclib.ProtocolError, OSError) as e:
1360            # ignore failures due to non-blocking socket 'unavailable' errors
1361            if not is_unavailable_exception(e):
1362                # protocol error; provide additional information in test output
1363                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1364
1365    def test_fail_no_info(self):
1366        # use the broken message class
1367        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1368
1369        try:
1370            p = xmlrpclib.ServerProxy(URL)
1371            p.pow(6,8)
1372        except (xmlrpclib.ProtocolError, OSError) as e:
1373            # ignore failures due to non-blocking socket 'unavailable' errors
1374            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1375                # The two server-side error headers shouldn't be sent back in this case
1376                self.assertTrue(e.headers.get("X-exception") is None)
1377                self.assertTrue(e.headers.get("X-traceback") is None)
1378        else:
1379            self.fail('ProtocolError not raised')
1380
1381    def test_fail_with_info(self):
1382        # use the broken message class
1383        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1384
1385        # Check that errors in the server send back exception/traceback
1386        # info when flag is set
1387        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1388
1389        try:
1390            p = xmlrpclib.ServerProxy(URL)
1391            p.pow(6,8)
1392        except (xmlrpclib.ProtocolError, OSError) as e:
1393            # ignore failures due to non-blocking socket 'unavailable' errors
1394            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1395                # We should get error info in the response
1396                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1397                self.assertEqual(e.headers.get("X-exception"), expected_err)
1398                self.assertTrue(e.headers.get("X-traceback") is not None)
1399        else:
1400            self.fail('ProtocolError not raised')
1401
1402
1403@contextlib.contextmanager
1404def captured_stdout(encoding='utf-8'):
1405    """A variation on support.captured_stdout() which gives a text stream
1406    having a `buffer` attribute.
1407    """
1408    orig_stdout = sys.stdout
1409    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1410    try:
1411        yield sys.stdout
1412    finally:
1413        sys.stdout = orig_stdout
1414
1415
1416class CGIHandlerTestCase(unittest.TestCase):
1417    def setUp(self):
1418        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1419
1420    def tearDown(self):
1421        self.cgi = None
1422
1423    def test_cgi_get(self):
1424        with os_helper.EnvironmentVarGuard() as env:
1425            env['REQUEST_METHOD'] = 'GET'
1426            # if the method is GET and no request_text is given, it runs handle_get
1427            # get sysout output
1428            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1429                self.cgi.handle_request()
1430
1431            # parse Status header
1432            data_out.seek(0)
1433            handle = data_out.read()
1434            status = handle.split()[1]
1435            message = ' '.join(handle.split()[2:4])
1436
1437            self.assertEqual(status, '400')
1438            self.assertEqual(message, 'Bad Request')
1439
1440
1441    def test_cgi_xmlrpc_response(self):
1442        data = """<?xml version='1.0'?>
1443        <methodCall>
1444            <methodName>test_method</methodName>
1445            <params>
1446                <param>
1447                    <value><string>foo</string></value>
1448                </param>
1449                <param>
1450                    <value><string>bar</string></value>
1451                </param>
1452            </params>
1453        </methodCall>
1454        """
1455
1456        with os_helper.EnvironmentVarGuard() as env, \
1457             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1458             support.captured_stdin() as data_in:
1459            data_in.write(data)
1460            data_in.seek(0)
1461            env['CONTENT_LENGTH'] = str(len(data))
1462            self.cgi.handle_request()
1463        data_out.seek(0)
1464
1465        # will respond exception, if so, our goal is achieved ;)
1466        handle = data_out.read()
1467
1468        # start with 44th char so as not to get http header, we just
1469        # need only xml
1470        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1471
1472        # Also test the content-length returned  by handle_request
1473        # Using the same test method inorder to avoid all the datapassing
1474        # boilerplate code.
1475        # Test for bug: http://bugs.python.org/issue5040
1476
1477        content = handle[handle.find("<?xml"):]
1478
1479        self.assertEqual(
1480            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1481            len(content))
1482
1483
1484class UseBuiltinTypesTestCase(unittest.TestCase):
1485
1486    def test_use_builtin_types(self):
1487        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1488        # makes all dispatch of binary data as bytes instances, and all
1489        # dispatch of datetime argument as datetime.datetime instances.
1490        self.log = []
1491        expected_bytes = b"my dog has fleas"
1492        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1493        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1494        def foobar(*args):
1495            self.log.extend(args)
1496        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1497            allow_none=True, encoding=None, use_builtin_types=True)
1498        handler.register_function(foobar)
1499        handler._marshaled_dispatch(marshaled)
1500        self.assertEqual(len(self.log), 2)
1501        mybytes, mydate = self.log
1502        self.assertEqual(self.log, [expected_bytes, expected_date])
1503        self.assertIs(type(mydate), datetime.datetime)
1504        self.assertIs(type(mybytes), bytes)
1505
1506    def test_cgihandler_has_use_builtin_types_flag(self):
1507        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1508        self.assertTrue(handler.use_builtin_types)
1509
1510    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1511        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1512            use_builtin_types=True)
1513        server.server_close()
1514        self.assertTrue(server.use_builtin_types)
1515
1516
1517def setUpModule():
1518    thread_info = threading_helper.threading_setup()
1519    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1520
1521
1522if __name__ == "__main__":
1523    unittest.main()
1524