• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import threading
14import re
15import io
16import contextlib
17from test import support
18from test.support import os_helper
19from test.support import socket_helper
20from test.support import threading_helper
21from test.support import ALWAYS_EQ, LARGEST, SMALLEST
22
23try:
24    import gzip
25except ImportError:
26    gzip = None
27
28alist = [{'astring': 'foo@bar.baz.spam',
29          'afloat': 7283.43,
30          'anint': 2**20,
31          'ashortlong': 2,
32          'anotherlist': ['.zyx.41'],
33          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
34          'b64bytes': b"my dog has fleas",
35          'b64bytearray': bytearray(b"my dog has fleas"),
36          'boolean': False,
37          'unicode': '\u4000\u6000\u8000',
38          'ukey\u4000': 'regular value',
39          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
40          'datetime2': xmlrpclib.DateTime(
41                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
42          'datetime3': xmlrpclib.DateTime(
43                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
44          }]
45
46class XMLRPCTestCase(unittest.TestCase):
47
48    def test_dump_load(self):
49        dump = xmlrpclib.dumps((alist,))
50        load = xmlrpclib.loads(dump)
51        self.assertEqual(alist, load[0][0])
52
53    def test_dump_bare_datetime(self):
54        # This checks that an unwrapped datetime.date object can be handled
55        # by the marshalling code.  This can't be done via test_dump_load()
56        # since with use_builtin_types set to 1 the unmarshaller would create
57        # datetime objects for the 'datetime[123]' keys as well
58        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
59        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
60        s = xmlrpclib.dumps((dt,))
61
62        result, m = xmlrpclib.loads(s, use_builtin_types=True)
63        (newdt,) = result
64        self.assertEqual(newdt, dt)
65        self.assertIs(type(newdt), datetime.datetime)
66        self.assertIsNone(m)
67
68        result, m = xmlrpclib.loads(s, use_builtin_types=False)
69        (newdt,) = result
70        self.assertEqual(newdt, dt)
71        self.assertIs(type(newdt), xmlrpclib.DateTime)
72        self.assertIsNone(m)
73
74        result, m = xmlrpclib.loads(s, use_datetime=True)
75        (newdt,) = result
76        self.assertEqual(newdt, dt)
77        self.assertIs(type(newdt), datetime.datetime)
78        self.assertIsNone(m)
79
80        result, m = xmlrpclib.loads(s, use_datetime=False)
81        (newdt,) = result
82        self.assertEqual(newdt, dt)
83        self.assertIs(type(newdt), xmlrpclib.DateTime)
84        self.assertIsNone(m)
85
86
87    def test_datetime_before_1900(self):
88        # same as before but with a date before 1900
89        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
90        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
91        s = xmlrpclib.dumps((dt,))
92
93        result, m = xmlrpclib.loads(s, use_builtin_types=True)
94        (newdt,) = result
95        self.assertEqual(newdt, dt)
96        self.assertIs(type(newdt), datetime.datetime)
97        self.assertIsNone(m)
98
99        result, m = xmlrpclib.loads(s, use_builtin_types=False)
100        (newdt,) = result
101        self.assertEqual(newdt, dt)
102        self.assertIs(type(newdt), xmlrpclib.DateTime)
103        self.assertIsNone(m)
104
105    def test_bug_1164912 (self):
106        d = xmlrpclib.DateTime()
107        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
108                                            methodresponse=True))
109        self.assertIsInstance(new_d.value, str)
110
111        # Check that the output of dumps() is still an 8-bit string
112        s = xmlrpclib.dumps((new_d,), methodresponse=True)
113        self.assertIsInstance(s, str)
114
115    def test_newstyle_class(self):
116        class T(object):
117            pass
118        t = T()
119        t.x = 100
120        t.y = "Hello"
121        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
122        self.assertEqual(t2, t.__dict__)
123
124    def test_dump_big_long(self):
125        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
126
127    def test_dump_bad_dict(self):
128        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
129
130    def test_dump_recursive_seq(self):
131        l = [1,2,3]
132        t = [3,4,5,l]
133        l.append(t)
134        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
135
136    def test_dump_recursive_dict(self):
137        d = {'1':1, '2':1}
138        t = {'3':3, 'd':d}
139        d['t'] = t
140        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
141
142    def test_dump_big_int(self):
143        if sys.maxsize > 2**31-1:
144            self.assertRaises(OverflowError, xmlrpclib.dumps,
145                              (int(2**34),))
146
147        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
148        self.assertRaises(OverflowError, xmlrpclib.dumps,
149                          (xmlrpclib.MAXINT+1,))
150        self.assertRaises(OverflowError, xmlrpclib.dumps,
151                          (xmlrpclib.MININT-1,))
152
153        def dummy_write(s):
154            pass
155
156        m = xmlrpclib.Marshaller()
157        m.dump_int(xmlrpclib.MAXINT, dummy_write)
158        m.dump_int(xmlrpclib.MININT, dummy_write)
159        self.assertRaises(OverflowError, m.dump_int,
160                          xmlrpclib.MAXINT+1, dummy_write)
161        self.assertRaises(OverflowError, m.dump_int,
162                          xmlrpclib.MININT-1, dummy_write)
163
164    def test_dump_double(self):
165        xmlrpclib.dumps((float(2 ** 34),))
166        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
167                         float(xmlrpclib.MININT)))
168        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
169                         float(xmlrpclib.MININT - 42)))
170
171        def dummy_write(s):
172            pass
173
174        m = xmlrpclib.Marshaller()
175        m.dump_double(xmlrpclib.MAXINT, dummy_write)
176        m.dump_double(xmlrpclib.MININT, dummy_write)
177        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
178        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
179
180    def test_dump_none(self):
181        value = alist + [None]
182        arg1 = (alist + [None],)
183        strg = xmlrpclib.dumps(arg1, allow_none=True)
184        self.assertEqual(value,
185                          xmlrpclib.loads(strg)[0][0])
186        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
187
188    def test_dump_encoding(self):
189        value = {'key\u20ac\xa4':
190                 'value\u20ac\xa4'}
191        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
192        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
193        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
194        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
195        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196
197        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
198                               methodresponse=True)
199        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
200        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
201        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
202
203        methodname = 'method\u20ac\xa4'
204        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
205                               methodname=methodname)
206        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
207        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
208
209    def test_dump_bytes(self):
210        sample = b"my dog has fleas"
211        self.assertEqual(sample, xmlrpclib.Binary(sample))
212        for type_ in bytes, bytearray, xmlrpclib.Binary:
213            value = type_(sample)
214            s = xmlrpclib.dumps((value,))
215
216            result, m = xmlrpclib.loads(s, use_builtin_types=True)
217            (newvalue,) = result
218            self.assertEqual(newvalue, sample)
219            self.assertIs(type(newvalue), bytes)
220            self.assertIsNone(m)
221
222            result, m = xmlrpclib.loads(s, use_builtin_types=False)
223            (newvalue,) = result
224            self.assertEqual(newvalue, sample)
225            self.assertIs(type(newvalue), xmlrpclib.Binary)
226            self.assertIsNone(m)
227
228    def test_loads_unsupported(self):
229        ResponseError = xmlrpclib.ResponseError
230        data = '<params><param><value><spam/></value></param></params>'
231        self.assertRaises(ResponseError, xmlrpclib.loads, data)
232        data = ('<params><param><value><array>'
233                '<value><spam/></value>'
234                '</array></value></param></params>')
235        self.assertRaises(ResponseError, xmlrpclib.loads, data)
236        data = ('<params><param><value><struct>'
237                '<member><name>a</name><value><spam/></value></member>'
238                '<member><name>b</name><value><spam/></value></member>'
239                '</struct></value></param></params>')
240        self.assertRaises(ResponseError, xmlrpclib.loads, data)
241
242    def check_loads(self, s, value, **kwargs):
243        dump = '<params><param><value>%s</value></param></params>' % s
244        result, m = xmlrpclib.loads(dump, **kwargs)
245        (newvalue,) = result
246        self.assertEqual(newvalue, value)
247        self.assertIs(type(newvalue), type(value))
248        self.assertIsNone(m)
249
250    def test_load_standard_types(self):
251        check = self.check_loads
252        check('string', 'string')
253        check('<string>string</string>', 'string')
254        check('<string>�������������� string</string>', '�������������� string')
255        check('<int>2056183947</int>', 2056183947)
256        check('<int>-2056183947</int>', -2056183947)
257        check('<i4>2056183947</i4>', 2056183947)
258        check('<double>46093.78125</double>', 46093.78125)
259        check('<boolean>0</boolean>', False)
260        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
261              xmlrpclib.Binary(b'\x00byte string\xff'))
262        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
263              b'\x00byte string\xff', use_builtin_types=True)
264        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
265              xmlrpclib.DateTime('20050210T11:41:23'))
266        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
267              datetime.datetime(2005, 2, 10, 11, 41, 23),
268              use_builtin_types=True)
269        check('<array><data>'
270              '<value><int>1</int></value><value><int>2</int></value>'
271              '</data></array>', [1, 2])
272        check('<struct>'
273              '<member><name>b</name><value><int>2</int></value></member>'
274              '<member><name>a</name><value><int>1</int></value></member>'
275              '</struct>', {'a': 1, 'b': 2})
276
277    def test_load_extension_types(self):
278        check = self.check_loads
279        check('<nil/>', None)
280        check('<ex:nil/>', None)
281        check('<i1>205</i1>', 205)
282        check('<i2>20561</i2>', 20561)
283        check('<i8>9876543210</i8>', 9876543210)
284        check('<biginteger>98765432100123456789</biginteger>',
285              98765432100123456789)
286        check('<float>93.78125</float>', 93.78125)
287        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
288              decimal.Decimal('9876543210.0123456789'))
289
290    def test_get_host_info(self):
291        # see bug #3613, this raised a TypeError
292        transp = xmlrpc.client.Transport()
293        self.assertEqual(transp.get_host_info("user@host.tld"),
294                          ('host.tld',
295                           [('Authorization', 'Basic dXNlcg==')], {}))
296
297    def test_ssl_presence(self):
298        try:
299            import ssl
300        except ImportError:
301            has_ssl = False
302        else:
303            has_ssl = True
304        try:
305            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
306        except NotImplementedError:
307            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
308        except OSError:
309            self.assertTrue(has_ssl)
310
311    def test_keepalive_disconnect(self):
312        class RequestHandler(http.server.BaseHTTPRequestHandler):
313            protocol_version = "HTTP/1.1"
314            handled = False
315
316            def do_POST(self):
317                length = int(self.headers.get("Content-Length"))
318                self.rfile.read(length)
319                if self.handled:
320                    self.close_connection = True
321                    return
322                response = xmlrpclib.dumps((5,), methodresponse=True)
323                response = response.encode()
324                self.send_response(http.HTTPStatus.OK)
325                self.send_header("Content-Length", len(response))
326                self.end_headers()
327                self.wfile.write(response)
328                self.handled = True
329                self.close_connection = False
330
331            def log_message(self, format, *args):
332                # don't clobber sys.stderr
333                pass
334
335        def run_server():
336            server.socket.settimeout(float(1))  # Don't hang if client fails
337            server.handle_request()  # First request and attempt at second
338            server.handle_request()  # Retried second request
339
340        server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler)
341        self.addCleanup(server.server_close)
342        thread = threading.Thread(target=run_server)
343        thread.start()
344        self.addCleanup(thread.join)
345        url = "http://{}:{}/".format(*server.server_address)
346        with xmlrpclib.ServerProxy(url) as p:
347            self.assertEqual(p.method(), 5)
348            self.assertEqual(p.method(), 5)
349
350
351class SimpleXMLRPCDispatcherTestCase(unittest.TestCase):
352    class DispatchExc(Exception):
353        """Raised inside the dispatched functions when checking for
354        chained exceptions"""
355
356    def test_call_registered_func(self):
357        """Calls explicitly registered function"""
358        # Makes sure any exception raised inside the function has no other
359        # exception chained to it
360
361        exp_params = 1, 2, 3
362
363        def dispatched_func(*params):
364            raise self.DispatchExc(params)
365
366        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
367        dispatcher.register_function(dispatched_func)
368        with self.assertRaises(self.DispatchExc) as exc_ctx:
369            dispatcher._dispatch('dispatched_func', exp_params)
370        self.assertEqual(exc_ctx.exception.args, (exp_params,))
371        self.assertIsNone(exc_ctx.exception.__cause__)
372        self.assertIsNone(exc_ctx.exception.__context__)
373
374    def test_call_instance_func(self):
375        """Calls a registered instance attribute as a function"""
376        # Makes sure any exception raised inside the function has no other
377        # exception chained to it
378
379        exp_params = 1, 2, 3
380
381        class DispatchedClass:
382            def dispatched_func(self, *params):
383                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
384
385        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
386        dispatcher.register_instance(DispatchedClass())
387        with self.assertRaises(self.DispatchExc) as exc_ctx:
388            dispatcher._dispatch('dispatched_func', exp_params)
389        self.assertEqual(exc_ctx.exception.args, (exp_params,))
390        self.assertIsNone(exc_ctx.exception.__cause__)
391        self.assertIsNone(exc_ctx.exception.__context__)
392
393    def test_call_dispatch_func(self):
394        """Calls the registered instance's `_dispatch` function"""
395        # Makes sure any exception raised inside the function has no other
396        # exception chained to it
397
398        exp_method = 'method'
399        exp_params = 1, 2, 3
400
401        class TestInstance:
402            def _dispatch(self, method, params):
403                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
404                    method, params)
405
406        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
407        dispatcher.register_instance(TestInstance())
408        with self.assertRaises(self.DispatchExc) as exc_ctx:
409            dispatcher._dispatch(exp_method, exp_params)
410        self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
411        self.assertIsNone(exc_ctx.exception.__cause__)
412        self.assertIsNone(exc_ctx.exception.__context__)
413
414    def test_registered_func_is_none(self):
415        """Calls explicitly registered function which is None"""
416
417        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
418        dispatcher.register_function(None, name='method')
419        with self.assertRaisesRegex(Exception, 'method'):
420            dispatcher._dispatch('method', ('param',))
421
422    def test_instance_has_no_func(self):
423        """Attempts to call nonexistent function on a registered instance"""
424
425        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
426        dispatcher.register_instance(object())
427        with self.assertRaisesRegex(Exception, 'method'):
428            dispatcher._dispatch('method', ('param',))
429
430    def test_cannot_locate_func(self):
431        """Calls a function that the dispatcher cannot locate"""
432
433        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
434        with self.assertRaisesRegex(Exception, 'method'):
435            dispatcher._dispatch('method', ('param',))
436
437
438class HelperTestCase(unittest.TestCase):
439    def test_escape(self):
440        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
441        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
442        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
443
444class FaultTestCase(unittest.TestCase):
445    def test_repr(self):
446        f = xmlrpclib.Fault(42, 'Test Fault')
447        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
448        self.assertEqual(repr(f), str(f))
449
450    def test_dump_fault(self):
451        f = xmlrpclib.Fault(42, 'Test Fault')
452        s = xmlrpclib.dumps((f,))
453        (newf,), m = xmlrpclib.loads(s)
454        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
455        self.assertEqual(m, None)
456
457        s = xmlrpclib.Marshaller().dumps(f)
458        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
459
460    def test_dotted_attribute(self):
461        # this will raise AttributeError because code don't want us to use
462        # private methods
463        self.assertRaises(AttributeError,
464                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
465        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
466
467class DateTimeTestCase(unittest.TestCase):
468    def test_default(self):
469        with mock.patch('time.localtime') as localtime_mock:
470            time_struct = time.struct_time(
471                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
472            localtime_mock.return_value = time_struct
473            localtime = time.localtime()
474            t = xmlrpclib.DateTime()
475            self.assertEqual(str(t),
476                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
477
478    def test_time(self):
479        d = 1181399930.036952
480        t = xmlrpclib.DateTime(d)
481        self.assertEqual(str(t),
482                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
483
484    def test_time_tuple(self):
485        d = (2007,6,9,10,38,50,5,160,0)
486        t = xmlrpclib.DateTime(d)
487        self.assertEqual(str(t), '20070609T10:38:50')
488
489    def test_time_struct(self):
490        d = time.localtime(1181399930.036952)
491        t = xmlrpclib.DateTime(d)
492        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
493
494    def test_datetime_datetime(self):
495        d = datetime.datetime(2007,1,2,3,4,5)
496        t = xmlrpclib.DateTime(d)
497        self.assertEqual(str(t), '20070102T03:04:05')
498
499    def test_repr(self):
500        d = datetime.datetime(2007,1,2,3,4,5)
501        t = xmlrpclib.DateTime(d)
502        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
503        self.assertEqual(repr(t), val)
504
505    def test_decode(self):
506        d = ' 20070908T07:11:13  '
507        t1 = xmlrpclib.DateTime()
508        t1.decode(d)
509        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
510        self.assertEqual(t1, tref)
511
512        t2 = xmlrpclib._datetime(d)
513        self.assertEqual(t2, tref)
514
515    def test_comparison(self):
516        now = datetime.datetime.now()
517        dtime = xmlrpclib.DateTime(now.timetuple())
518
519        # datetime vs. DateTime
520        self.assertTrue(dtime == now)
521        self.assertTrue(now == dtime)
522        then = now + datetime.timedelta(seconds=4)
523        self.assertTrue(then >= dtime)
524        self.assertTrue(dtime < then)
525
526        # str vs. DateTime
527        dstr = now.strftime("%Y%m%dT%H:%M:%S")
528        self.assertTrue(dtime == dstr)
529        self.assertTrue(dstr == dtime)
530        dtime_then = xmlrpclib.DateTime(then.timetuple())
531        self.assertTrue(dtime_then >= dstr)
532        self.assertTrue(dstr < dtime_then)
533
534        # some other types
535        dbytes = dstr.encode('ascii')
536        dtuple = now.timetuple()
537        self.assertFalse(dtime == 1970)
538        self.assertTrue(dtime != dbytes)
539        self.assertFalse(dtime == bytearray(dbytes))
540        self.assertTrue(dtime != dtuple)
541        with self.assertRaises(TypeError):
542            dtime < float(1970)
543        with self.assertRaises(TypeError):
544            dtime > dbytes
545        with self.assertRaises(TypeError):
546            dtime <= bytearray(dbytes)
547        with self.assertRaises(TypeError):
548            dtime >= dtuple
549
550        self.assertTrue(dtime == ALWAYS_EQ)
551        self.assertFalse(dtime != ALWAYS_EQ)
552        self.assertTrue(dtime < LARGEST)
553        self.assertFalse(dtime > LARGEST)
554        self.assertTrue(dtime <= LARGEST)
555        self.assertFalse(dtime >= LARGEST)
556        self.assertFalse(dtime < SMALLEST)
557        self.assertTrue(dtime > SMALLEST)
558        self.assertFalse(dtime <= SMALLEST)
559        self.assertTrue(dtime >= SMALLEST)
560
561
562class BinaryTestCase(unittest.TestCase):
563
564    # XXX What should str(Binary(b"\xff")) return?  I'm choosing "\xff"
565    # for now (i.e. interpreting the binary data as Latin-1-encoded
566    # text).  But this feels very unsatisfactory.  Perhaps we should
567    # only define repr(), and return r"Binary(b'\xff')" instead?
568
569    def test_default(self):
570        t = xmlrpclib.Binary()
571        self.assertEqual(str(t), '')
572
573    def test_string(self):
574        d = b'\x01\x02\x03abc123\xff\xfe'
575        t = xmlrpclib.Binary(d)
576        self.assertEqual(str(t), str(d, "latin-1"))
577
578    def test_decode(self):
579        d = b'\x01\x02\x03abc123\xff\xfe'
580        de = base64.encodebytes(d)
581        t1 = xmlrpclib.Binary()
582        t1.decode(de)
583        self.assertEqual(str(t1), str(d, "latin-1"))
584
585        t2 = xmlrpclib._binary(de)
586        self.assertEqual(str(t2), str(d, "latin-1"))
587
588
589ADDR = PORT = URL = None
590
591# The evt is set twice.  First when the server is ready to serve.
592# Second when the server has been shutdown.  The user must clear
593# the event after it has been set the first time to catch the second set.
594def http_server(evt, numrequests, requestHandler=None, encoding=None):
595    class TestInstanceClass:
596        def div(self, x, y):
597            return x // y
598
599        def _methodHelp(self, name):
600            if name == 'div':
601                return 'This is the div function'
602
603        class Fixture:
604            @staticmethod
605            def getData():
606                return '42'
607
608    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
609        def get_request(self):
610            # Ensure the socket is always non-blocking.  On Linux, socket
611            # attributes are not inherited like they are on *BSD and Windows.
612            s, port = self.socket.accept()
613            s.setblocking(True)
614            return s, port
615
616    if not requestHandler:
617        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
618    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
619                          encoding=encoding,
620                          logRequests=False, bind_and_activate=False)
621    try:
622        serv.server_bind()
623        global ADDR, PORT, URL
624        ADDR, PORT = serv.socket.getsockname()
625        #connect to IP address directly.  This avoids socket.create_connection()
626        #trying to connect to "localhost" using all address families, which
627        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
628        #on AF_INET only.
629        URL = "http://%s:%d"%(ADDR, PORT)
630        serv.server_activate()
631        serv.register_introspection_functions()
632        serv.register_multicall_functions()
633        serv.register_function(pow)
634        serv.register_function(lambda x: x, 'têšt')
635        @serv.register_function
636        def my_function():
637            '''This is my function'''
638            return True
639        @serv.register_function(name='add')
640        def _(x, y):
641            return x + y
642        testInstance = TestInstanceClass()
643        serv.register_instance(testInstance, allow_dotted_names=True)
644        evt.set()
645
646        # handle up to 'numrequests' requests
647        while numrequests > 0:
648            serv.handle_request()
649            numrequests -= 1
650
651    except TimeoutError:
652        pass
653    finally:
654        serv.socket.close()
655        PORT = None
656        evt.set()
657
658def http_multi_server(evt, numrequests, requestHandler=None):
659    class TestInstanceClass:
660        def div(self, x, y):
661            return x // y
662
663        def _methodHelp(self, name):
664            if name == 'div':
665                return 'This is the div function'
666
667    def my_function():
668        '''This is my function'''
669        return True
670
671    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
672        def get_request(self):
673            # Ensure the socket is always non-blocking.  On Linux, socket
674            # attributes are not inherited like they are on *BSD and Windows.
675            s, port = self.socket.accept()
676            s.setblocking(True)
677            return s, port
678
679    if not requestHandler:
680        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
681    class MyRequestHandler(requestHandler):
682        rpc_paths = []
683
684    class BrokenDispatcher:
685        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
686            raise RuntimeError("broken dispatcher")
687
688    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
689                          logRequests=False, bind_and_activate=False)
690    serv.socket.settimeout(3)
691    serv.server_bind()
692    try:
693        global ADDR, PORT, URL
694        ADDR, PORT = serv.socket.getsockname()
695        #connect to IP address directly.  This avoids socket.create_connection()
696        #trying to connect to "localhost" using all address families, which
697        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
698        #on AF_INET only.
699        URL = "http://%s:%d"%(ADDR, PORT)
700        serv.server_activate()
701        paths = [
702            "/foo", "/foo/bar",
703            "/foo?k=v", "/foo#frag", "/foo?k=v#frag",
704            "", "/", "/RPC2", "?k=v", "#frag",
705        ]
706        for path in paths:
707            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
708            d.register_introspection_functions()
709            d.register_multicall_functions()
710            d.register_function(lambda p=path: p, 'test')
711        serv.get_dispatcher(paths[0]).register_function(pow)
712        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
713        serv.add_dispatcher("/is/broken", BrokenDispatcher())
714        evt.set()
715
716        # handle up to 'numrequests' requests
717        while numrequests > 0:
718            serv.handle_request()
719            numrequests -= 1
720
721    except TimeoutError:
722        pass
723    finally:
724        serv.socket.close()
725        PORT = None
726        evt.set()
727
728# This function prevents errors like:
729#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
730def is_unavailable_exception(e):
731    '''Returns True if the given ProtocolError is the product of a server-side
732       exception caused by the 'temporarily unavailable' response sometimes
733       given by operations on non-blocking sockets.'''
734
735    # sometimes we get a -1 error code and/or empty headers
736    try:
737        if e.errcode == -1 or e.headers is None:
738            return True
739        exc_mess = e.headers.get('X-exception')
740    except AttributeError:
741        # Ignore OSErrors here.
742        exc_mess = str(e)
743
744    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
745        return True
746
747def make_request_and_skipIf(condition, reason):
748    # If we skip the test, we have to make a request because
749    # the server created in setUp blocks expecting one to come in.
750    if not condition:
751        return lambda func: func
752    def decorator(func):
753        def make_request_and_skip(self):
754            try:
755                xmlrpclib.ServerProxy(URL).my_function()
756            except (xmlrpclib.ProtocolError, OSError) as e:
757                if not is_unavailable_exception(e):
758                    raise
759            raise unittest.SkipTest(reason)
760        return make_request_and_skip
761    return decorator
762
763class BaseServerTestCase(unittest.TestCase):
764    requestHandler = None
765    request_count = 1
766    threadFunc = staticmethod(http_server)
767
768    def setUp(self):
769        # enable traceback reporting
770        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
771
772        self.evt = threading.Event()
773        # start server thread to handle requests
774        serv_args = (self.evt, self.request_count, self.requestHandler)
775        thread = threading.Thread(target=self.threadFunc, args=serv_args)
776        thread.start()
777        self.addCleanup(thread.join)
778
779        # wait for the server to be ready
780        self.evt.wait()
781        self.evt.clear()
782
783    def tearDown(self):
784        # wait on the server thread to terminate
785        self.evt.wait()
786
787        # disable traceback reporting
788        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
789
790class SimpleServerTestCase(BaseServerTestCase):
791    def test_simple1(self):
792        try:
793            p = xmlrpclib.ServerProxy(URL)
794            self.assertEqual(p.pow(6,8), 6**8)
795        except (xmlrpclib.ProtocolError, OSError) as e:
796            # ignore failures due to non-blocking socket 'unavailable' errors
797            if not is_unavailable_exception(e):
798                # protocol error; provide additional information in test output
799                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
800
801    def test_nonascii(self):
802        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
803        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
804        try:
805            p = xmlrpclib.ServerProxy(URL)
806            self.assertEqual(p.add(start_string, end_string),
807                             start_string + end_string)
808        except (xmlrpclib.ProtocolError, OSError) as e:
809            # ignore failures due to non-blocking socket 'unavailable' errors
810            if not is_unavailable_exception(e):
811                # protocol error; provide additional information in test output
812                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
813
814    def test_client_encoding(self):
815        start_string = '\u20ac'
816        end_string = '\xa4'
817
818        try:
819            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
820            self.assertEqual(p.add(start_string, end_string),
821                             start_string + end_string)
822        except (xmlrpclib.ProtocolError, socket.error) as e:
823            # ignore failures due to non-blocking socket unavailable errors.
824            if not is_unavailable_exception(e):
825                # protocol error; provide additional information in test output
826                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
827
828    def test_nonascii_methodname(self):
829        try:
830            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
831            self.assertEqual(p.têšt(42), 42)
832        except (xmlrpclib.ProtocolError, socket.error) as e:
833            # ignore failures due to non-blocking socket unavailable errors.
834            if not is_unavailable_exception(e):
835                # protocol error; provide additional information in test output
836                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
837
838    def test_404(self):
839        # send POST with http.client, it should return 404 header and
840        # 'Not Found' message.
841        with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
842            conn.request('POST', '/this-is-not-valid')
843            response = conn.getresponse()
844
845        self.assertEqual(response.status, 404)
846        self.assertEqual(response.reason, 'Not Found')
847
848    def test_introspection1(self):
849        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
850                                'system.listMethods', 'system.methodHelp',
851                                'system.methodSignature', 'system.multicall',
852                                'Fixture'])
853        try:
854            p = xmlrpclib.ServerProxy(URL)
855            meth = p.system.listMethods()
856            self.assertEqual(set(meth), expected_methods)
857        except (xmlrpclib.ProtocolError, OSError) as e:
858            # ignore failures due to non-blocking socket 'unavailable' errors
859            if not is_unavailable_exception(e):
860                # protocol error; provide additional information in test output
861                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
862
863
864    def test_introspection2(self):
865        try:
866            # test _methodHelp()
867            p = xmlrpclib.ServerProxy(URL)
868            divhelp = p.system.methodHelp('div')
869            self.assertEqual(divhelp, 'This is the div function')
870        except (xmlrpclib.ProtocolError, OSError) as e:
871            # ignore failures due to non-blocking socket 'unavailable' errors
872            if not is_unavailable_exception(e):
873                # protocol error; provide additional information in test output
874                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
875
876    @make_request_and_skipIf(sys.flags.optimize >= 2,
877                     "Docstrings are omitted with -O2 and above")
878    def test_introspection3(self):
879        try:
880            # test native doc
881            p = xmlrpclib.ServerProxy(URL)
882            myfunction = p.system.methodHelp('my_function')
883            self.assertEqual(myfunction, 'This is my function')
884        except (xmlrpclib.ProtocolError, OSError) as e:
885            # ignore failures due to non-blocking socket 'unavailable' errors
886            if not is_unavailable_exception(e):
887                # protocol error; provide additional information in test output
888                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
889
890    def test_introspection4(self):
891        # the SimpleXMLRPCServer doesn't support signatures, but
892        # at least check that we can try making the call
893        try:
894            p = xmlrpclib.ServerProxy(URL)
895            divsig = p.system.methodSignature('div')
896            self.assertEqual(divsig, 'signatures not supported')
897        except (xmlrpclib.ProtocolError, OSError) as e:
898            # ignore failures due to non-blocking socket 'unavailable' errors
899            if not is_unavailable_exception(e):
900                # protocol error; provide additional information in test output
901                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
902
903    def test_multicall(self):
904        try:
905            p = xmlrpclib.ServerProxy(URL)
906            multicall = xmlrpclib.MultiCall(p)
907            multicall.add(2,3)
908            multicall.pow(6,8)
909            multicall.div(127,42)
910            add_result, pow_result, div_result = multicall()
911            self.assertEqual(add_result, 2+3)
912            self.assertEqual(pow_result, 6**8)
913            self.assertEqual(div_result, 127//42)
914        except (xmlrpclib.ProtocolError, OSError) as e:
915            # ignore failures due to non-blocking socket 'unavailable' errors
916            if not is_unavailable_exception(e):
917                # protocol error; provide additional information in test output
918                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
919
920    def test_non_existing_multicall(self):
921        try:
922            p = xmlrpclib.ServerProxy(URL)
923            multicall = xmlrpclib.MultiCall(p)
924            multicall.this_is_not_exists()
925            result = multicall()
926
927            # result.results contains;
928            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
929            #   'method "this_is_not_exists" is not supported'>}]
930
931            self.assertEqual(result.results[0]['faultCode'], 1)
932            self.assertEqual(result.results[0]['faultString'],
933                '<class \'Exception\'>:method "this_is_not_exists" '
934                'is not supported')
935        except (xmlrpclib.ProtocolError, OSError) as e:
936            # ignore failures due to non-blocking socket 'unavailable' errors
937            if not is_unavailable_exception(e):
938                # protocol error; provide additional information in test output
939                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
940
941    def test_dotted_attribute(self):
942        # Raises an AttributeError because private methods are not allowed.
943        self.assertRaises(AttributeError,
944                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
945
946        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
947        # Get the test to run faster by sending a request with test_simple1.
948        # This avoids waiting for the socket timeout.
949        self.test_simple1()
950
951    def test_allow_dotted_names_true(self):
952        # XXX also need allow_dotted_names_false test.
953        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
954        data = server.Fixture.getData()
955        self.assertEqual(data, '42')
956
957    def test_unicode_host(self):
958        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
959        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
960
961    def test_partial_post(self):
962        # Check that a partial POST doesn't make the server loop: issue #14001.
963        with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
964            conn.send('POST /RPC2 HTTP/1.0\r\n'
965                      'Content-Length: 100\r\n\r\n'
966                      'bye HTTP/1.1\r\n'
967                      f'Host: {ADDR}:{PORT}\r\n'
968                      'Accept-Encoding: identity\r\n'
969                      'Content-Length: 0\r\n\r\n'.encode('ascii'))
970
971    def test_context_manager(self):
972        with xmlrpclib.ServerProxy(URL) as server:
973            server.add(2, 3)
974            self.assertNotEqual(server('transport')._connection,
975                                (None, None))
976        self.assertEqual(server('transport')._connection,
977                         (None, None))
978
979    def test_context_manager_method_error(self):
980        try:
981            with xmlrpclib.ServerProxy(URL) as server:
982                server.add(2, "a")
983        except xmlrpclib.Fault:
984            pass
985        self.assertEqual(server('transport')._connection,
986                         (None, None))
987
988
989class SimpleServerEncodingTestCase(BaseServerTestCase):
990    @staticmethod
991    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
992        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
993
994    def test_server_encoding(self):
995        start_string = '\u20ac'
996        end_string = '\xa4'
997
998        try:
999            p = xmlrpclib.ServerProxy(URL)
1000            self.assertEqual(p.add(start_string, end_string),
1001                             start_string + end_string)
1002        except (xmlrpclib.ProtocolError, socket.error) as e:
1003            # ignore failures due to non-blocking socket unavailable errors.
1004            if not is_unavailable_exception(e):
1005                # protocol error; provide additional information in test output
1006                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1007
1008
1009class MultiPathServerTestCase(BaseServerTestCase):
1010    threadFunc = staticmethod(http_multi_server)
1011    request_count = 2
1012    def test_path1(self):
1013        p = xmlrpclib.ServerProxy(URL+"/foo")
1014        self.assertEqual(p.pow(6,8), 6**8)
1015        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1016
1017    def test_path2(self):
1018        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
1019        self.assertEqual(p.add(6,8), 6+8)
1020        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1021
1022    def test_path3(self):
1023        p = xmlrpclib.ServerProxy(URL+"/is/broken")
1024        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1025
1026    def test_invalid_path(self):
1027        p = xmlrpclib.ServerProxy(URL+"/invalid")
1028        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1029
1030    def test_path_query_fragment(self):
1031        p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
1032        self.assertEqual(p.test(), "/foo?k=v#frag")
1033
1034    def test_path_fragment(self):
1035        p = xmlrpclib.ServerProxy(URL+"/foo#frag")
1036        self.assertEqual(p.test(), "/foo#frag")
1037
1038    def test_path_query(self):
1039        p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
1040        self.assertEqual(p.test(), "/foo?k=v")
1041
1042    def test_empty_path(self):
1043        p = xmlrpclib.ServerProxy(URL)
1044        self.assertEqual(p.test(), "/RPC2")
1045
1046    def test_root_path(self):
1047        p = xmlrpclib.ServerProxy(URL + "/")
1048        self.assertEqual(p.test(), "/")
1049
1050    def test_empty_path_query(self):
1051        p = xmlrpclib.ServerProxy(URL + "?k=v")
1052        self.assertEqual(p.test(), "?k=v")
1053
1054    def test_empty_path_fragment(self):
1055        p = xmlrpclib.ServerProxy(URL + "#frag")
1056        self.assertEqual(p.test(), "#frag")
1057
1058
1059#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1060#does indeed serve subsequent requests on the same connection
1061class BaseKeepaliveServerTestCase(BaseServerTestCase):
1062    #a request handler that supports keep-alive and logs requests into a
1063    #class variable
1064    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1065        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1066        protocol_version = 'HTTP/1.1'
1067        myRequests = []
1068        def handle(self):
1069            self.myRequests.append([])
1070            self.reqidx = len(self.myRequests)-1
1071            return self.parentClass.handle(self)
1072        def handle_one_request(self):
1073            result = self.parentClass.handle_one_request(self)
1074            self.myRequests[self.reqidx].append(self.raw_requestline)
1075            return result
1076
1077    requestHandler = RequestHandler
1078    def setUp(self):
1079        #clear request log
1080        self.RequestHandler.myRequests = []
1081        return BaseServerTestCase.setUp(self)
1082
1083#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1084#does indeed serve subsequent requests on the same connection
1085class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
1086    def test_two(self):
1087        p = xmlrpclib.ServerProxy(URL)
1088        #do three requests.
1089        self.assertEqual(p.pow(6,8), 6**8)
1090        self.assertEqual(p.pow(6,8), 6**8)
1091        self.assertEqual(p.pow(6,8), 6**8)
1092        p("close")()
1093
1094        #they should have all been handled by a single request handler
1095        self.assertEqual(len(self.RequestHandler.myRequests), 1)
1096
1097        #check that we did at least two (the third may be pending append
1098        #due to thread scheduling)
1099        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1100
1101
1102#test special attribute access on the serverproxy, through the __call__
1103#function.
1104class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
1105    #ask for two keepalive requests to be handled.
1106    request_count=2
1107
1108    def test_close(self):
1109        p = xmlrpclib.ServerProxy(URL)
1110        #do some requests with close.
1111        self.assertEqual(p.pow(6,8), 6**8)
1112        self.assertEqual(p.pow(6,8), 6**8)
1113        self.assertEqual(p.pow(6,8), 6**8)
1114        p("close")() #this should trigger a new keep-alive request
1115        self.assertEqual(p.pow(6,8), 6**8)
1116        self.assertEqual(p.pow(6,8), 6**8)
1117        self.assertEqual(p.pow(6,8), 6**8)
1118        p("close")()
1119
1120        #they should have all been two request handlers, each having logged at least
1121        #two complete requests
1122        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1123        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1124        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1125
1126
1127    def test_transport(self):
1128        p = xmlrpclib.ServerProxy(URL)
1129        #do some requests with close.
1130        self.assertEqual(p.pow(6,8), 6**8)
1131        p("transport").close() #same as above, really.
1132        self.assertEqual(p.pow(6,8), 6**8)
1133        p("close")()
1134        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1135
1136#A test case that verifies that gzip encoding works in both directions
1137#(for a request and the response)
1138@unittest.skipIf(gzip is None, 'requires gzip')
1139class GzipServerTestCase(BaseServerTestCase):
1140    #a request handler that supports keep-alive and logs requests into a
1141    #class variable
1142    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1143        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1144        protocol_version = 'HTTP/1.1'
1145
1146        def do_POST(self):
1147            #store content of last request in class
1148            self.__class__.content_length = int(self.headers["content-length"])
1149            return self.parentClass.do_POST(self)
1150    requestHandler = RequestHandler
1151
1152    class Transport(xmlrpclib.Transport):
1153        #custom transport, stores the response length for our perusal
1154        fake_gzip = False
1155        def parse_response(self, response):
1156            self.response_length=int(response.getheader("content-length", 0))
1157            return xmlrpclib.Transport.parse_response(self, response)
1158
1159        def send_content(self, connection, body):
1160            if self.fake_gzip:
1161                #add a lone gzip header to induce decode error remotely
1162                connection.putheader("Content-Encoding", "gzip")
1163            return xmlrpclib.Transport.send_content(self, connection, body)
1164
1165    def setUp(self):
1166        BaseServerTestCase.setUp(self)
1167
1168    def test_gzip_request(self):
1169        t = self.Transport()
1170        t.encode_threshold = None
1171        p = xmlrpclib.ServerProxy(URL, transport=t)
1172        self.assertEqual(p.pow(6,8), 6**8)
1173        a = self.RequestHandler.content_length
1174        t.encode_threshold = 0 #turn on request encoding
1175        self.assertEqual(p.pow(6,8), 6**8)
1176        b = self.RequestHandler.content_length
1177        self.assertTrue(a>b)
1178        p("close")()
1179
1180    def test_bad_gzip_request(self):
1181        t = self.Transport()
1182        t.encode_threshold = None
1183        t.fake_gzip = True
1184        p = xmlrpclib.ServerProxy(URL, transport=t)
1185        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1186                                    re.compile(r"\b400\b"))
1187        with cm:
1188            p.pow(6, 8)
1189        p("close")()
1190
1191    def test_gzip_response(self):
1192        t = self.Transport()
1193        p = xmlrpclib.ServerProxy(URL, transport=t)
1194        old = self.requestHandler.encode_threshold
1195        self.requestHandler.encode_threshold = None #no encoding
1196        self.assertEqual(p.pow(6,8), 6**8)
1197        a = t.response_length
1198        self.requestHandler.encode_threshold = 0 #always encode
1199        self.assertEqual(p.pow(6,8), 6**8)
1200        p("close")()
1201        b = t.response_length
1202        self.requestHandler.encode_threshold = old
1203        self.assertTrue(a>b)
1204
1205
1206@unittest.skipIf(gzip is None, 'requires gzip')
1207class GzipUtilTestCase(unittest.TestCase):
1208
1209    def test_gzip_decode_limit(self):
1210        max_gzip_decode = 20 * 1024 * 1024
1211        data = b'\0' * max_gzip_decode
1212        encoded = xmlrpclib.gzip_encode(data)
1213        decoded = xmlrpclib.gzip_decode(encoded)
1214        self.assertEqual(len(decoded), max_gzip_decode)
1215
1216        data = b'\0' * (max_gzip_decode + 1)
1217        encoded = xmlrpclib.gzip_encode(data)
1218
1219        with self.assertRaisesRegex(ValueError,
1220                                    "max gzipped payload length exceeded"):
1221            xmlrpclib.gzip_decode(encoded)
1222
1223        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1224
1225
1226class HeadersServerTestCase(BaseServerTestCase):
1227    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1228        test_headers = None
1229
1230        def do_POST(self):
1231            self.__class__.test_headers = self.headers
1232            return super().do_POST()
1233    requestHandler = RequestHandler
1234    standard_headers = [
1235        'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
1236        'Content-Length']
1237
1238    def setUp(self):
1239        self.RequestHandler.test_headers = None
1240        return super().setUp()
1241
1242    def assertContainsAdditionalHeaders(self, headers, additional):
1243        expected_keys = sorted(self.standard_headers + list(additional.keys()))
1244        self.assertListEqual(sorted(headers.keys()), expected_keys)
1245
1246        for key, value in additional.items():
1247            self.assertEqual(headers.get(key), value)
1248
1249    def test_header(self):
1250        p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
1251        self.assertEqual(p.pow(6, 8), 6**8)
1252
1253        headers = self.RequestHandler.test_headers
1254        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1255
1256    def test_header_many(self):
1257        p = xmlrpclib.ServerProxy(
1258            URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
1259        self.assertEqual(p.pow(6, 8), 6**8)
1260
1261        headers = self.RequestHandler.test_headers
1262        self.assertContainsAdditionalHeaders(
1263            headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
1264
1265    def test_header_empty(self):
1266        p = xmlrpclib.ServerProxy(URL, headers=[])
1267        self.assertEqual(p.pow(6, 8), 6**8)
1268
1269        headers = self.RequestHandler.test_headers
1270        self.assertContainsAdditionalHeaders(headers, {})
1271
1272    def test_header_tuple(self):
1273        p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
1274        self.assertEqual(p.pow(6, 8), 6**8)
1275
1276        headers = self.RequestHandler.test_headers
1277        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1278
1279    def test_header_items(self):
1280        p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
1281        self.assertEqual(p.pow(6, 8), 6**8)
1282
1283        headers = self.RequestHandler.test_headers
1284        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1285
1286
1287#Test special attributes of the ServerProxy object
1288class ServerProxyTestCase(unittest.TestCase):
1289    def setUp(self):
1290        unittest.TestCase.setUp(self)
1291        # Actual value of the URL doesn't matter if it is a string in
1292        # the correct format.
1293        self.url = 'http://fake.localhost'
1294
1295    def test_close(self):
1296        p = xmlrpclib.ServerProxy(self.url)
1297        self.assertEqual(p('close')(), None)
1298
1299    def test_transport(self):
1300        t = xmlrpclib.Transport()
1301        p = xmlrpclib.ServerProxy(self.url, transport=t)
1302        self.assertEqual(p('transport'), t)
1303
1304
1305# This is a contrived way to make a failure occur on the server side
1306# in order to test the _send_traceback_header flag on the server
1307class FailingMessageClass(http.client.HTTPMessage):
1308    def get(self, key, failobj=None):
1309        key = key.lower()
1310        if key == 'content-length':
1311            return 'I am broken'
1312        return super().get(key, failobj)
1313
1314
1315class FailingServerTestCase(unittest.TestCase):
1316    def setUp(self):
1317        self.evt = threading.Event()
1318        # start server thread to handle requests
1319        serv_args = (self.evt, 1)
1320        thread = threading.Thread(target=http_server, args=serv_args)
1321        thread.start()
1322        self.addCleanup(thread.join)
1323
1324        # wait for the server to be ready
1325        self.evt.wait()
1326        self.evt.clear()
1327
1328    def tearDown(self):
1329        # wait on the server thread to terminate
1330        self.evt.wait()
1331        # reset flag
1332        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1333        # reset message class
1334        default_class = http.client.HTTPMessage
1335        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1336
1337    def test_basic(self):
1338        # check that flag is false by default
1339        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1340        self.assertEqual(flagval, False)
1341
1342        # enable traceback reporting
1343        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1344
1345        # test a call that shouldn't fail just as a smoke test
1346        try:
1347            p = xmlrpclib.ServerProxy(URL)
1348            self.assertEqual(p.pow(6,8), 6**8)
1349        except (xmlrpclib.ProtocolError, OSError) as e:
1350            # ignore failures due to non-blocking socket 'unavailable' errors
1351            if not is_unavailable_exception(e):
1352                # protocol error; provide additional information in test output
1353                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1354
1355    def test_fail_no_info(self):
1356        # use the broken message class
1357        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1358
1359        try:
1360            p = xmlrpclib.ServerProxy(URL)
1361            p.pow(6,8)
1362        except (xmlrpclib.ProtocolError, OSError) as e:
1363            # ignore failures due to non-blocking socket 'unavailable' errors
1364            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1365                # The two server-side error headers shouldn't be sent back in this case
1366                self.assertTrue(e.headers.get("X-exception") is None)
1367                self.assertTrue(e.headers.get("X-traceback") is None)
1368        else:
1369            self.fail('ProtocolError not raised')
1370
1371    def test_fail_with_info(self):
1372        # use the broken message class
1373        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1374
1375        # Check that errors in the server send back exception/traceback
1376        # info when flag is set
1377        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1378
1379        try:
1380            p = xmlrpclib.ServerProxy(URL)
1381            p.pow(6,8)
1382        except (xmlrpclib.ProtocolError, OSError) as e:
1383            # ignore failures due to non-blocking socket 'unavailable' errors
1384            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1385                # We should get error info in the response
1386                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1387                self.assertEqual(e.headers.get("X-exception"), expected_err)
1388                self.assertTrue(e.headers.get("X-traceback") is not None)
1389        else:
1390            self.fail('ProtocolError not raised')
1391
1392
1393@contextlib.contextmanager
1394def captured_stdout(encoding='utf-8'):
1395    """A variation on support.captured_stdout() which gives a text stream
1396    having a `buffer` attribute.
1397    """
1398    orig_stdout = sys.stdout
1399    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1400    try:
1401        yield sys.stdout
1402    finally:
1403        sys.stdout = orig_stdout
1404
1405
1406class CGIHandlerTestCase(unittest.TestCase):
1407    def setUp(self):
1408        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1409
1410    def tearDown(self):
1411        self.cgi = None
1412
1413    def test_cgi_get(self):
1414        with os_helper.EnvironmentVarGuard() as env:
1415            env['REQUEST_METHOD'] = 'GET'
1416            # if the method is GET and no request_text is given, it runs handle_get
1417            # get sysout output
1418            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1419                self.cgi.handle_request()
1420
1421            # parse Status header
1422            data_out.seek(0)
1423            handle = data_out.read()
1424            status = handle.split()[1]
1425            message = ' '.join(handle.split()[2:4])
1426
1427            self.assertEqual(status, '400')
1428            self.assertEqual(message, 'Bad Request')
1429
1430
1431    def test_cgi_xmlrpc_response(self):
1432        data = """<?xml version='1.0'?>
1433        <methodCall>
1434            <methodName>test_method</methodName>
1435            <params>
1436                <param>
1437                    <value><string>foo</string></value>
1438                </param>
1439                <param>
1440                    <value><string>bar</string></value>
1441                </param>
1442            </params>
1443        </methodCall>
1444        """
1445
1446        with os_helper.EnvironmentVarGuard() as env, \
1447             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1448             support.captured_stdin() as data_in:
1449            data_in.write(data)
1450            data_in.seek(0)
1451            env['CONTENT_LENGTH'] = str(len(data))
1452            self.cgi.handle_request()
1453        data_out.seek(0)
1454
1455        # will respond exception, if so, our goal is achieved ;)
1456        handle = data_out.read()
1457
1458        # start with 44th char so as not to get http header, we just
1459        # need only xml
1460        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1461
1462        # Also test the content-length returned  by handle_request
1463        # Using the same test method inorder to avoid all the datapassing
1464        # boilerplate code.
1465        # Test for bug: http://bugs.python.org/issue5040
1466
1467        content = handle[handle.find("<?xml"):]
1468
1469        self.assertEqual(
1470            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1471            len(content))
1472
1473
1474class UseBuiltinTypesTestCase(unittest.TestCase):
1475
1476    def test_use_builtin_types(self):
1477        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1478        # makes all dispatch of binary data as bytes instances, and all
1479        # dispatch of datetime argument as datetime.datetime instances.
1480        self.log = []
1481        expected_bytes = b"my dog has fleas"
1482        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1483        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1484        def foobar(*args):
1485            self.log.extend(args)
1486        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1487            allow_none=True, encoding=None, use_builtin_types=True)
1488        handler.register_function(foobar)
1489        handler._marshaled_dispatch(marshaled)
1490        self.assertEqual(len(self.log), 2)
1491        mybytes, mydate = self.log
1492        self.assertEqual(self.log, [expected_bytes, expected_date])
1493        self.assertIs(type(mydate), datetime.datetime)
1494        self.assertIs(type(mybytes), bytes)
1495
1496    def test_cgihandler_has_use_builtin_types_flag(self):
1497        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1498        self.assertTrue(handler.use_builtin_types)
1499
1500    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1501        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1502            use_builtin_types=True)
1503        server.server_close()
1504        self.assertTrue(server.use_builtin_types)
1505
1506
1507def setUpModule():
1508    thread_info = threading_helper.threading_setup()
1509    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1510
1511
1512if __name__ == "__main__":
1513    unittest.main()
1514