1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import threading 14import re 15import io 16import contextlib 17from test import support 18from test.support import os_helper 19from test.support import socket_helper 20from test.support import threading_helper 21from test.support import ALWAYS_EQ, LARGEST, SMALLEST 22 23try: 24 import gzip 25except ImportError: 26 gzip = None 27 28alist = [{'astring': 'foo@bar.baz.spam', 29 'afloat': 7283.43, 30 'anint': 2**20, 31 'ashortlong': 2, 32 'anotherlist': ['.zyx.41'], 33 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 34 'b64bytes': b"my dog has fleas", 35 'b64bytearray': bytearray(b"my dog has fleas"), 36 'boolean': False, 37 'unicode': '\u4000\u6000\u8000', 38 'ukey\u4000': 'regular value', 39 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 40 'datetime2': xmlrpclib.DateTime( 41 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 42 'datetime3': xmlrpclib.DateTime( 43 datetime.datetime(2005, 2, 10, 11, 41, 23)), 44 }] 45 46class XMLRPCTestCase(unittest.TestCase): 47 48 def test_dump_load(self): 49 dump = xmlrpclib.dumps((alist,)) 50 load = xmlrpclib.loads(dump) 51 self.assertEqual(alist, load[0][0]) 52 53 def test_dump_bare_datetime(self): 54 # This checks that an unwrapped datetime.date object can be handled 55 # by the marshalling code. This can't be done via test_dump_load() 56 # since with use_builtin_types set to 1 the unmarshaller would create 57 # datetime objects for the 'datetime[123]' keys as well 58 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 59 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 60 s = xmlrpclib.dumps((dt,)) 61 62 result, m = xmlrpclib.loads(s, use_builtin_types=True) 63 (newdt,) = result 64 self.assertEqual(newdt, dt) 65 self.assertIs(type(newdt), datetime.datetime) 66 self.assertIsNone(m) 67 68 result, m = xmlrpclib.loads(s, use_builtin_types=False) 69 (newdt,) = result 70 self.assertEqual(newdt, dt) 71 self.assertIs(type(newdt), xmlrpclib.DateTime) 72 self.assertIsNone(m) 73 74 result, m = xmlrpclib.loads(s, use_datetime=True) 75 (newdt,) = result 76 self.assertEqual(newdt, dt) 77 self.assertIs(type(newdt), datetime.datetime) 78 self.assertIsNone(m) 79 80 result, m = xmlrpclib.loads(s, use_datetime=False) 81 (newdt,) = result 82 self.assertEqual(newdt, dt) 83 self.assertIs(type(newdt), xmlrpclib.DateTime) 84 self.assertIsNone(m) 85 86 87 def test_datetime_before_1900(self): 88 # same as before but with a date before 1900 89 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 90 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 91 s = xmlrpclib.dumps((dt,)) 92 93 result, m = xmlrpclib.loads(s, use_builtin_types=True) 94 (newdt,) = result 95 self.assertEqual(newdt, dt) 96 self.assertIs(type(newdt), datetime.datetime) 97 self.assertIsNone(m) 98 99 result, m = xmlrpclib.loads(s, use_builtin_types=False) 100 (newdt,) = result 101 self.assertEqual(newdt, dt) 102 self.assertIs(type(newdt), xmlrpclib.DateTime) 103 self.assertIsNone(m) 104 105 def test_bug_1164912 (self): 106 d = xmlrpclib.DateTime() 107 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 108 methodresponse=True)) 109 self.assertIsInstance(new_d.value, str) 110 111 # Check that the output of dumps() is still an 8-bit string 112 s = xmlrpclib.dumps((new_d,), methodresponse=True) 113 self.assertIsInstance(s, str) 114 115 def test_newstyle_class(self): 116 class T(object): 117 pass 118 t = T() 119 t.x = 100 120 t.y = "Hello" 121 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 122 self.assertEqual(t2, t.__dict__) 123 124 def test_dump_big_long(self): 125 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 126 127 def test_dump_bad_dict(self): 128 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 129 130 def test_dump_recursive_seq(self): 131 l = [1,2,3] 132 t = [3,4,5,l] 133 l.append(t) 134 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 135 136 def test_dump_recursive_dict(self): 137 d = {'1':1, '2':1} 138 t = {'3':3, 'd':d} 139 d['t'] = t 140 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 141 142 def test_dump_big_int(self): 143 if sys.maxsize > 2**31-1: 144 self.assertRaises(OverflowError, xmlrpclib.dumps, 145 (int(2**34),)) 146 147 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 148 self.assertRaises(OverflowError, xmlrpclib.dumps, 149 (xmlrpclib.MAXINT+1,)) 150 self.assertRaises(OverflowError, xmlrpclib.dumps, 151 (xmlrpclib.MININT-1,)) 152 153 def dummy_write(s): 154 pass 155 156 m = xmlrpclib.Marshaller() 157 m.dump_int(xmlrpclib.MAXINT, dummy_write) 158 m.dump_int(xmlrpclib.MININT, dummy_write) 159 self.assertRaises(OverflowError, m.dump_int, 160 xmlrpclib.MAXINT+1, dummy_write) 161 self.assertRaises(OverflowError, m.dump_int, 162 xmlrpclib.MININT-1, dummy_write) 163 164 def test_dump_double(self): 165 xmlrpclib.dumps((float(2 ** 34),)) 166 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 167 float(xmlrpclib.MININT))) 168 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 169 float(xmlrpclib.MININT - 42))) 170 171 def dummy_write(s): 172 pass 173 174 m = xmlrpclib.Marshaller() 175 m.dump_double(xmlrpclib.MAXINT, dummy_write) 176 m.dump_double(xmlrpclib.MININT, dummy_write) 177 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 178 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 179 180 def test_dump_none(self): 181 value = alist + [None] 182 arg1 = (alist + [None],) 183 strg = xmlrpclib.dumps(arg1, allow_none=True) 184 self.assertEqual(value, 185 xmlrpclib.loads(strg)[0][0]) 186 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 187 188 def test_dump_encoding(self): 189 value = {'key\u20ac\xa4': 190 'value\u20ac\xa4'} 191 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 192 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 193 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 194 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 195 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 196 197 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 198 methodresponse=True) 199 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 200 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 201 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 202 203 methodname = 'method\u20ac\xa4' 204 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 205 methodname=methodname) 206 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 207 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 208 209 def test_dump_bytes(self): 210 sample = b"my dog has fleas" 211 self.assertEqual(sample, xmlrpclib.Binary(sample)) 212 for type_ in bytes, bytearray, xmlrpclib.Binary: 213 value = type_(sample) 214 s = xmlrpclib.dumps((value,)) 215 216 result, m = xmlrpclib.loads(s, use_builtin_types=True) 217 (newvalue,) = result 218 self.assertEqual(newvalue, sample) 219 self.assertIs(type(newvalue), bytes) 220 self.assertIsNone(m) 221 222 result, m = xmlrpclib.loads(s, use_builtin_types=False) 223 (newvalue,) = result 224 self.assertEqual(newvalue, sample) 225 self.assertIs(type(newvalue), xmlrpclib.Binary) 226 self.assertIsNone(m) 227 228 def test_loads_unsupported(self): 229 ResponseError = xmlrpclib.ResponseError 230 data = '<params><param><value><spam/></value></param></params>' 231 self.assertRaises(ResponseError, xmlrpclib.loads, data) 232 data = ('<params><param><value><array>' 233 '<value><spam/></value>' 234 '</array></value></param></params>') 235 self.assertRaises(ResponseError, xmlrpclib.loads, data) 236 data = ('<params><param><value><struct>' 237 '<member><name>a</name><value><spam/></value></member>' 238 '<member><name>b</name><value><spam/></value></member>' 239 '</struct></value></param></params>') 240 self.assertRaises(ResponseError, xmlrpclib.loads, data) 241 242 def check_loads(self, s, value, **kwargs): 243 dump = '<params><param><value>%s</value></param></params>' % s 244 result, m = xmlrpclib.loads(dump, **kwargs) 245 (newvalue,) = result 246 self.assertEqual(newvalue, value) 247 self.assertIs(type(newvalue), type(value)) 248 self.assertIsNone(m) 249 250 def test_load_standard_types(self): 251 check = self.check_loads 252 check('string', 'string') 253 check('<string>string</string>', 'string') 254 check('<string> string</string>', ' string') 255 check('<int>2056183947</int>', 2056183947) 256 check('<int>-2056183947</int>', -2056183947) 257 check('<i4>2056183947</i4>', 2056183947) 258 check('<double>46093.78125</double>', 46093.78125) 259 check('<boolean>0</boolean>', False) 260 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 261 xmlrpclib.Binary(b'\x00byte string\xff')) 262 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 263 b'\x00byte string\xff', use_builtin_types=True) 264 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 265 xmlrpclib.DateTime('20050210T11:41:23')) 266 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 267 datetime.datetime(2005, 2, 10, 11, 41, 23), 268 use_builtin_types=True) 269 check('<array><data>' 270 '<value><int>1</int></value><value><int>2</int></value>' 271 '</data></array>', [1, 2]) 272 check('<struct>' 273 '<member><name>b</name><value><int>2</int></value></member>' 274 '<member><name>a</name><value><int>1</int></value></member>' 275 '</struct>', {'a': 1, 'b': 2}) 276 277 def test_load_extension_types(self): 278 check = self.check_loads 279 check('<nil/>', None) 280 check('<ex:nil/>', None) 281 check('<i1>205</i1>', 205) 282 check('<i2>20561</i2>', 20561) 283 check('<i8>9876543210</i8>', 9876543210) 284 check('<biginteger>98765432100123456789</biginteger>', 285 98765432100123456789) 286 check('<float>93.78125</float>', 93.78125) 287 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 288 decimal.Decimal('9876543210.0123456789')) 289 290 def test_limit_int(self): 291 check = self.check_loads 292 maxdigits = 5000 293 with support.adjust_int_max_str_digits(maxdigits): 294 s = '1' * (maxdigits + 1) 295 with self.assertRaises(ValueError): 296 check(f'<int>{s}</int>', None) 297 with self.assertRaises(ValueError): 298 check(f'<biginteger>{s}</biginteger>', None) 299 300 def test_get_host_info(self): 301 # see bug #3613, this raised a TypeError 302 transp = xmlrpc.client.Transport() 303 self.assertEqual(transp.get_host_info("user@host.tld"), 304 ('host.tld', 305 [('Authorization', 'Basic dXNlcg==')], {})) 306 307 def test_ssl_presence(self): 308 try: 309 import ssl 310 except ImportError: 311 has_ssl = False 312 else: 313 has_ssl = True 314 try: 315 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 316 except NotImplementedError: 317 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 318 except OSError: 319 self.assertTrue(has_ssl) 320 321 def test_keepalive_disconnect(self): 322 class RequestHandler(http.server.BaseHTTPRequestHandler): 323 protocol_version = "HTTP/1.1" 324 handled = False 325 326 def do_POST(self): 327 length = int(self.headers.get("Content-Length")) 328 self.rfile.read(length) 329 if self.handled: 330 self.close_connection = True 331 return 332 response = xmlrpclib.dumps((5,), methodresponse=True) 333 response = response.encode() 334 self.send_response(http.HTTPStatus.OK) 335 self.send_header("Content-Length", len(response)) 336 self.end_headers() 337 self.wfile.write(response) 338 self.handled = True 339 self.close_connection = False 340 341 def log_message(self, format, *args): 342 # don't clobber sys.stderr 343 pass 344 345 def run_server(): 346 server.socket.settimeout(float(1)) # Don't hang if client fails 347 server.handle_request() # First request and attempt at second 348 server.handle_request() # Retried second request 349 350 server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler) 351 self.addCleanup(server.server_close) 352 thread = threading.Thread(target=run_server) 353 thread.start() 354 self.addCleanup(thread.join) 355 url = "http://{}:{}/".format(*server.server_address) 356 with xmlrpclib.ServerProxy(url) as p: 357 self.assertEqual(p.method(), 5) 358 self.assertEqual(p.method(), 5) 359 360 361class SimpleXMLRPCDispatcherTestCase(unittest.TestCase): 362 class DispatchExc(Exception): 363 """Raised inside the dispatched functions when checking for 364 chained exceptions""" 365 366 def test_call_registered_func(self): 367 """Calls explicitly registered function""" 368 # Makes sure any exception raised inside the function has no other 369 # exception chained to it 370 371 exp_params = 1, 2, 3 372 373 def dispatched_func(*params): 374 raise self.DispatchExc(params) 375 376 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 377 dispatcher.register_function(dispatched_func) 378 with self.assertRaises(self.DispatchExc) as exc_ctx: 379 dispatcher._dispatch('dispatched_func', exp_params) 380 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 381 self.assertIsNone(exc_ctx.exception.__cause__) 382 self.assertIsNone(exc_ctx.exception.__context__) 383 384 def test_call_instance_func(self): 385 """Calls a registered instance attribute as a function""" 386 # Makes sure any exception raised inside the function has no other 387 # exception chained to it 388 389 exp_params = 1, 2, 3 390 391 class DispatchedClass: 392 def dispatched_func(self, *params): 393 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params) 394 395 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 396 dispatcher.register_instance(DispatchedClass()) 397 with self.assertRaises(self.DispatchExc) as exc_ctx: 398 dispatcher._dispatch('dispatched_func', exp_params) 399 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 400 self.assertIsNone(exc_ctx.exception.__cause__) 401 self.assertIsNone(exc_ctx.exception.__context__) 402 403 def test_call_dispatch_func(self): 404 """Calls the registered instance's `_dispatch` function""" 405 # Makes sure any exception raised inside the function has no other 406 # exception chained to it 407 408 exp_method = 'method' 409 exp_params = 1, 2, 3 410 411 class TestInstance: 412 def _dispatch(self, method, params): 413 raise SimpleXMLRPCDispatcherTestCase.DispatchExc( 414 method, params) 415 416 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 417 dispatcher.register_instance(TestInstance()) 418 with self.assertRaises(self.DispatchExc) as exc_ctx: 419 dispatcher._dispatch(exp_method, exp_params) 420 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params)) 421 self.assertIsNone(exc_ctx.exception.__cause__) 422 self.assertIsNone(exc_ctx.exception.__context__) 423 424 def test_registered_func_is_none(self): 425 """Calls explicitly registered function which is None""" 426 427 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 428 dispatcher.register_function(None, name='method') 429 with self.assertRaisesRegex(Exception, 'method'): 430 dispatcher._dispatch('method', ('param',)) 431 432 def test_instance_has_no_func(self): 433 """Attempts to call nonexistent function on a registered instance""" 434 435 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 436 dispatcher.register_instance(object()) 437 with self.assertRaisesRegex(Exception, 'method'): 438 dispatcher._dispatch('method', ('param',)) 439 440 def test_cannot_locate_func(self): 441 """Calls a function that the dispatcher cannot locate""" 442 443 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 444 with self.assertRaisesRegex(Exception, 'method'): 445 dispatcher._dispatch('method', ('param',)) 446 447 448class HelperTestCase(unittest.TestCase): 449 def test_escape(self): 450 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 451 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 452 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 453 454class FaultTestCase(unittest.TestCase): 455 def test_repr(self): 456 f = xmlrpclib.Fault(42, 'Test Fault') 457 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 458 self.assertEqual(repr(f), str(f)) 459 460 def test_dump_fault(self): 461 f = xmlrpclib.Fault(42, 'Test Fault') 462 s = xmlrpclib.dumps((f,)) 463 (newf,), m = xmlrpclib.loads(s) 464 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 465 self.assertEqual(m, None) 466 467 s = xmlrpclib.Marshaller().dumps(f) 468 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 469 470 def test_dotted_attribute(self): 471 # this will raise AttributeError because code don't want us to use 472 # private methods 473 self.assertRaises(AttributeError, 474 xmlrpc.server.resolve_dotted_attribute, str, '__add') 475 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 476 477class DateTimeTestCase(unittest.TestCase): 478 def test_default(self): 479 with mock.patch('time.localtime') as localtime_mock: 480 time_struct = time.struct_time( 481 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 482 localtime_mock.return_value = time_struct 483 localtime = time.localtime() 484 t = xmlrpclib.DateTime() 485 self.assertEqual(str(t), 486 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 487 488 def test_time(self): 489 d = 1181399930.036952 490 t = xmlrpclib.DateTime(d) 491 self.assertEqual(str(t), 492 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 493 494 def test_time_tuple(self): 495 d = (2007,6,9,10,38,50,5,160,0) 496 t = xmlrpclib.DateTime(d) 497 self.assertEqual(str(t), '20070609T10:38:50') 498 499 def test_time_struct(self): 500 d = time.localtime(1181399930.036952) 501 t = xmlrpclib.DateTime(d) 502 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 503 504 def test_datetime_datetime(self): 505 d = datetime.datetime(2007,1,2,3,4,5) 506 t = xmlrpclib.DateTime(d) 507 self.assertEqual(str(t), '20070102T03:04:05') 508 509 def test_repr(self): 510 d = datetime.datetime(2007,1,2,3,4,5) 511 t = xmlrpclib.DateTime(d) 512 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 513 self.assertEqual(repr(t), val) 514 515 def test_decode(self): 516 d = ' 20070908T07:11:13 ' 517 t1 = xmlrpclib.DateTime() 518 t1.decode(d) 519 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 520 self.assertEqual(t1, tref) 521 522 t2 = xmlrpclib._datetime(d) 523 self.assertEqual(t2, tref) 524 525 def test_comparison(self): 526 now = datetime.datetime.now() 527 dtime = xmlrpclib.DateTime(now.timetuple()) 528 529 # datetime vs. DateTime 530 self.assertTrue(dtime == now) 531 self.assertTrue(now == dtime) 532 then = now + datetime.timedelta(seconds=4) 533 self.assertTrue(then >= dtime) 534 self.assertTrue(dtime < then) 535 536 # str vs. DateTime 537 dstr = now.strftime("%Y%m%dT%H:%M:%S") 538 self.assertTrue(dtime == dstr) 539 self.assertTrue(dstr == dtime) 540 dtime_then = xmlrpclib.DateTime(then.timetuple()) 541 self.assertTrue(dtime_then >= dstr) 542 self.assertTrue(dstr < dtime_then) 543 544 # some other types 545 dbytes = dstr.encode('ascii') 546 dtuple = now.timetuple() 547 self.assertFalse(dtime == 1970) 548 self.assertTrue(dtime != dbytes) 549 self.assertFalse(dtime == bytearray(dbytes)) 550 self.assertTrue(dtime != dtuple) 551 with self.assertRaises(TypeError): 552 dtime < float(1970) 553 with self.assertRaises(TypeError): 554 dtime > dbytes 555 with self.assertRaises(TypeError): 556 dtime <= bytearray(dbytes) 557 with self.assertRaises(TypeError): 558 dtime >= dtuple 559 560 self.assertTrue(dtime == ALWAYS_EQ) 561 self.assertFalse(dtime != ALWAYS_EQ) 562 self.assertTrue(dtime < LARGEST) 563 self.assertFalse(dtime > LARGEST) 564 self.assertTrue(dtime <= LARGEST) 565 self.assertFalse(dtime >= LARGEST) 566 self.assertFalse(dtime < SMALLEST) 567 self.assertTrue(dtime > SMALLEST) 568 self.assertFalse(dtime <= SMALLEST) 569 self.assertTrue(dtime >= SMALLEST) 570 571 572class BinaryTestCase(unittest.TestCase): 573 574 # XXX What should str(Binary(b"\xff")) return? I'm choosing "\xff" 575 # for now (i.e. interpreting the binary data as Latin-1-encoded 576 # text). But this feels very unsatisfactory. Perhaps we should 577 # only define repr(), and return r"Binary(b'\xff')" instead? 578 579 def test_default(self): 580 t = xmlrpclib.Binary() 581 self.assertEqual(str(t), '') 582 583 def test_string(self): 584 d = b'\x01\x02\x03abc123\xff\xfe' 585 t = xmlrpclib.Binary(d) 586 self.assertEqual(str(t), str(d, "latin-1")) 587 588 def test_decode(self): 589 d = b'\x01\x02\x03abc123\xff\xfe' 590 de = base64.encodebytes(d) 591 t1 = xmlrpclib.Binary() 592 t1.decode(de) 593 self.assertEqual(str(t1), str(d, "latin-1")) 594 595 t2 = xmlrpclib._binary(de) 596 self.assertEqual(str(t2), str(d, "latin-1")) 597 598 599ADDR = PORT = URL = None 600 601# The evt is set twice. First when the server is ready to serve. 602# Second when the server has been shutdown. The user must clear 603# the event after it has been set the first time to catch the second set. 604def http_server(evt, numrequests, requestHandler=None, encoding=None): 605 class TestInstanceClass: 606 def div(self, x, y): 607 return x // y 608 609 def _methodHelp(self, name): 610 if name == 'div': 611 return 'This is the div function' 612 613 class Fixture: 614 @staticmethod 615 def getData(): 616 return '42' 617 618 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 619 def get_request(self): 620 # Ensure the socket is always non-blocking. On Linux, socket 621 # attributes are not inherited like they are on *BSD and Windows. 622 s, port = self.socket.accept() 623 s.setblocking(True) 624 return s, port 625 626 if not requestHandler: 627 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 628 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 629 encoding=encoding, 630 logRequests=False, bind_and_activate=False) 631 try: 632 serv.server_bind() 633 global ADDR, PORT, URL 634 ADDR, PORT = serv.socket.getsockname() 635 #connect to IP address directly. This avoids socket.create_connection() 636 #trying to connect to "localhost" using all address families, which 637 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 638 #on AF_INET only. 639 URL = "http://%s:%d"%(ADDR, PORT) 640 serv.server_activate() 641 serv.register_introspection_functions() 642 serv.register_multicall_functions() 643 serv.register_function(pow) 644 serv.register_function(lambda x: x, 'têšt') 645 @serv.register_function 646 def my_function(): 647 '''This is my function''' 648 return True 649 @serv.register_function(name='add') 650 def _(x, y): 651 return x + y 652 testInstance = TestInstanceClass() 653 serv.register_instance(testInstance, allow_dotted_names=True) 654 evt.set() 655 656 # handle up to 'numrequests' requests 657 while numrequests > 0: 658 serv.handle_request() 659 numrequests -= 1 660 661 except TimeoutError: 662 pass 663 finally: 664 serv.socket.close() 665 PORT = None 666 evt.set() 667 668def http_multi_server(evt, numrequests, requestHandler=None): 669 class TestInstanceClass: 670 def div(self, x, y): 671 return x // y 672 673 def _methodHelp(self, name): 674 if name == 'div': 675 return 'This is the div function' 676 677 def my_function(): 678 '''This is my function''' 679 return True 680 681 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 682 def get_request(self): 683 # Ensure the socket is always non-blocking. On Linux, socket 684 # attributes are not inherited like they are on *BSD and Windows. 685 s, port = self.socket.accept() 686 s.setblocking(True) 687 return s, port 688 689 if not requestHandler: 690 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 691 class MyRequestHandler(requestHandler): 692 rpc_paths = [] 693 694 class BrokenDispatcher: 695 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 696 raise RuntimeError("broken dispatcher") 697 698 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 699 logRequests=False, bind_and_activate=False) 700 serv.socket.settimeout(3) 701 serv.server_bind() 702 try: 703 global ADDR, PORT, URL 704 ADDR, PORT = serv.socket.getsockname() 705 #connect to IP address directly. This avoids socket.create_connection() 706 #trying to connect to "localhost" using all address families, which 707 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 708 #on AF_INET only. 709 URL = "http://%s:%d"%(ADDR, PORT) 710 serv.server_activate() 711 paths = [ 712 "/foo", "/foo/bar", 713 "/foo?k=v", "/foo#frag", "/foo?k=v#frag", 714 "", "/", "/RPC2", "?k=v", "#frag", 715 ] 716 for path in paths: 717 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 718 d.register_introspection_functions() 719 d.register_multicall_functions() 720 d.register_function(lambda p=path: p, 'test') 721 serv.get_dispatcher(paths[0]).register_function(pow) 722 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 723 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 724 evt.set() 725 726 # handle up to 'numrequests' requests 727 while numrequests > 0: 728 serv.handle_request() 729 numrequests -= 1 730 731 except TimeoutError: 732 pass 733 finally: 734 serv.socket.close() 735 PORT = None 736 evt.set() 737 738# This function prevents errors like: 739# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 740def is_unavailable_exception(e): 741 '''Returns True if the given ProtocolError is the product of a server-side 742 exception caused by the 'temporarily unavailable' response sometimes 743 given by operations on non-blocking sockets.''' 744 745 # sometimes we get a -1 error code and/or empty headers 746 try: 747 if e.errcode == -1 or e.headers is None: 748 return True 749 exc_mess = e.headers.get('X-exception') 750 except AttributeError: 751 # Ignore OSErrors here. 752 exc_mess = str(e) 753 754 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 755 return True 756 757def make_request_and_skipIf(condition, reason): 758 # If we skip the test, we have to make a request because 759 # the server created in setUp blocks expecting one to come in. 760 if not condition: 761 return lambda func: func 762 def decorator(func): 763 def make_request_and_skip(self): 764 try: 765 xmlrpclib.ServerProxy(URL).my_function() 766 except (xmlrpclib.ProtocolError, OSError) as e: 767 if not is_unavailable_exception(e): 768 raise 769 raise unittest.SkipTest(reason) 770 return make_request_and_skip 771 return decorator 772 773class BaseServerTestCase(unittest.TestCase): 774 requestHandler = None 775 request_count = 1 776 threadFunc = staticmethod(http_server) 777 778 def setUp(self): 779 # enable traceback reporting 780 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 781 782 self.evt = threading.Event() 783 # start server thread to handle requests 784 serv_args = (self.evt, self.request_count, self.requestHandler) 785 thread = threading.Thread(target=self.threadFunc, args=serv_args) 786 thread.start() 787 self.addCleanup(thread.join) 788 789 # wait for the server to be ready 790 self.evt.wait() 791 self.evt.clear() 792 793 def tearDown(self): 794 # wait on the server thread to terminate 795 self.evt.wait() 796 797 # disable traceback reporting 798 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 799 800class SimpleServerTestCase(BaseServerTestCase): 801 def test_simple1(self): 802 try: 803 p = xmlrpclib.ServerProxy(URL) 804 self.assertEqual(p.pow(6,8), 6**8) 805 except (xmlrpclib.ProtocolError, OSError) as e: 806 # ignore failures due to non-blocking socket 'unavailable' errors 807 if not is_unavailable_exception(e): 808 # protocol error; provide additional information in test output 809 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 810 811 def test_nonascii(self): 812 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 813 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 814 try: 815 p = xmlrpclib.ServerProxy(URL) 816 self.assertEqual(p.add(start_string, end_string), 817 start_string + end_string) 818 except (xmlrpclib.ProtocolError, OSError) as e: 819 # ignore failures due to non-blocking socket 'unavailable' errors 820 if not is_unavailable_exception(e): 821 # protocol error; provide additional information in test output 822 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 823 824 def test_client_encoding(self): 825 start_string = '\u20ac' 826 end_string = '\xa4' 827 828 try: 829 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 830 self.assertEqual(p.add(start_string, end_string), 831 start_string + end_string) 832 except (xmlrpclib.ProtocolError, socket.error) as e: 833 # ignore failures due to non-blocking socket unavailable errors. 834 if not is_unavailable_exception(e): 835 # protocol error; provide additional information in test output 836 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 837 838 def test_nonascii_methodname(self): 839 try: 840 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 841 self.assertEqual(p.têšt(42), 42) 842 except (xmlrpclib.ProtocolError, socket.error) as e: 843 # ignore failures due to non-blocking socket unavailable errors. 844 if not is_unavailable_exception(e): 845 # protocol error; provide additional information in test output 846 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 847 848 def test_404(self): 849 # send POST with http.client, it should return 404 header and 850 # 'Not Found' message. 851 with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: 852 conn.request('POST', '/this-is-not-valid') 853 response = conn.getresponse() 854 855 self.assertEqual(response.status, 404) 856 self.assertEqual(response.reason, 'Not Found') 857 858 def test_introspection1(self): 859 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 860 'system.listMethods', 'system.methodHelp', 861 'system.methodSignature', 'system.multicall', 862 'Fixture']) 863 try: 864 p = xmlrpclib.ServerProxy(URL) 865 meth = p.system.listMethods() 866 self.assertEqual(set(meth), expected_methods) 867 except (xmlrpclib.ProtocolError, OSError) as e: 868 # ignore failures due to non-blocking socket 'unavailable' errors 869 if not is_unavailable_exception(e): 870 # protocol error; provide additional information in test output 871 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 872 873 874 def test_introspection2(self): 875 try: 876 # test _methodHelp() 877 p = xmlrpclib.ServerProxy(URL) 878 divhelp = p.system.methodHelp('div') 879 self.assertEqual(divhelp, 'This is the div function') 880 except (xmlrpclib.ProtocolError, OSError) as e: 881 # ignore failures due to non-blocking socket 'unavailable' errors 882 if not is_unavailable_exception(e): 883 # protocol error; provide additional information in test output 884 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 885 886 @make_request_and_skipIf(sys.flags.optimize >= 2, 887 "Docstrings are omitted with -O2 and above") 888 def test_introspection3(self): 889 try: 890 # test native doc 891 p = xmlrpclib.ServerProxy(URL) 892 myfunction = p.system.methodHelp('my_function') 893 self.assertEqual(myfunction, 'This is my function') 894 except (xmlrpclib.ProtocolError, OSError) as e: 895 # ignore failures due to non-blocking socket 'unavailable' errors 896 if not is_unavailable_exception(e): 897 # protocol error; provide additional information in test output 898 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 899 900 def test_introspection4(self): 901 # the SimpleXMLRPCServer doesn't support signatures, but 902 # at least check that we can try making the call 903 try: 904 p = xmlrpclib.ServerProxy(URL) 905 divsig = p.system.methodSignature('div') 906 self.assertEqual(divsig, 'signatures not supported') 907 except (xmlrpclib.ProtocolError, OSError) as e: 908 # ignore failures due to non-blocking socket 'unavailable' errors 909 if not is_unavailable_exception(e): 910 # protocol error; provide additional information in test output 911 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 912 913 def test_multicall(self): 914 try: 915 p = xmlrpclib.ServerProxy(URL) 916 multicall = xmlrpclib.MultiCall(p) 917 multicall.add(2,3) 918 multicall.pow(6,8) 919 multicall.div(127,42) 920 add_result, pow_result, div_result = multicall() 921 self.assertEqual(add_result, 2+3) 922 self.assertEqual(pow_result, 6**8) 923 self.assertEqual(div_result, 127//42) 924 except (xmlrpclib.ProtocolError, OSError) as e: 925 # ignore failures due to non-blocking socket 'unavailable' errors 926 if not is_unavailable_exception(e): 927 # protocol error; provide additional information in test output 928 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 929 930 def test_non_existing_multicall(self): 931 try: 932 p = xmlrpclib.ServerProxy(URL) 933 multicall = xmlrpclib.MultiCall(p) 934 multicall.this_is_not_exists() 935 result = multicall() 936 937 # result.results contains; 938 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 939 # 'method "this_is_not_exists" is not supported'>}] 940 941 self.assertEqual(result.results[0]['faultCode'], 1) 942 self.assertEqual(result.results[0]['faultString'], 943 '<class \'Exception\'>:method "this_is_not_exists" ' 944 'is not supported') 945 except (xmlrpclib.ProtocolError, OSError) as e: 946 # ignore failures due to non-blocking socket 'unavailable' errors 947 if not is_unavailable_exception(e): 948 # protocol error; provide additional information in test output 949 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 950 951 def test_dotted_attribute(self): 952 # Raises an AttributeError because private methods are not allowed. 953 self.assertRaises(AttributeError, 954 xmlrpc.server.resolve_dotted_attribute, str, '__add') 955 956 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 957 # Get the test to run faster by sending a request with test_simple1. 958 # This avoids waiting for the socket timeout. 959 self.test_simple1() 960 961 def test_allow_dotted_names_true(self): 962 # XXX also need allow_dotted_names_false test. 963 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 964 data = server.Fixture.getData() 965 self.assertEqual(data, '42') 966 967 def test_unicode_host(self): 968 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 969 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 970 971 def test_partial_post(self): 972 # Check that a partial POST doesn't make the server loop: issue #14001. 973 with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn: 974 conn.send('POST /RPC2 HTTP/1.0\r\n' 975 'Content-Length: 100\r\n\r\n' 976 'bye HTTP/1.1\r\n' 977 f'Host: {ADDR}:{PORT}\r\n' 978 'Accept-Encoding: identity\r\n' 979 'Content-Length: 0\r\n\r\n'.encode('ascii')) 980 981 def test_context_manager(self): 982 with xmlrpclib.ServerProxy(URL) as server: 983 server.add(2, 3) 984 self.assertNotEqual(server('transport')._connection, 985 (None, None)) 986 self.assertEqual(server('transport')._connection, 987 (None, None)) 988 989 def test_context_manager_method_error(self): 990 try: 991 with xmlrpclib.ServerProxy(URL) as server: 992 server.add(2, "a") 993 except xmlrpclib.Fault: 994 pass 995 self.assertEqual(server('transport')._connection, 996 (None, None)) 997 998 999class SimpleServerEncodingTestCase(BaseServerTestCase): 1000 @staticmethod 1001 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 1002 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 1003 1004 def test_server_encoding(self): 1005 start_string = '\u20ac' 1006 end_string = '\xa4' 1007 1008 try: 1009 p = xmlrpclib.ServerProxy(URL) 1010 self.assertEqual(p.add(start_string, end_string), 1011 start_string + end_string) 1012 except (xmlrpclib.ProtocolError, socket.error) as e: 1013 # ignore failures due to non-blocking socket unavailable errors. 1014 if not is_unavailable_exception(e): 1015 # protocol error; provide additional information in test output 1016 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1017 1018 1019class MultiPathServerTestCase(BaseServerTestCase): 1020 threadFunc = staticmethod(http_multi_server) 1021 request_count = 2 1022 def test_path1(self): 1023 p = xmlrpclib.ServerProxy(URL+"/foo") 1024 self.assertEqual(p.pow(6,8), 6**8) 1025 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1026 1027 def test_path2(self): 1028 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 1029 self.assertEqual(p.add(6,8), 6+8) 1030 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 1031 1032 def test_path3(self): 1033 p = xmlrpclib.ServerProxy(URL+"/is/broken") 1034 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1035 1036 def test_invalid_path(self): 1037 p = xmlrpclib.ServerProxy(URL+"/invalid") 1038 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1039 1040 def test_path_query_fragment(self): 1041 p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag") 1042 self.assertEqual(p.test(), "/foo?k=v#frag") 1043 1044 def test_path_fragment(self): 1045 p = xmlrpclib.ServerProxy(URL+"/foo#frag") 1046 self.assertEqual(p.test(), "/foo#frag") 1047 1048 def test_path_query(self): 1049 p = xmlrpclib.ServerProxy(URL+"/foo?k=v") 1050 self.assertEqual(p.test(), "/foo?k=v") 1051 1052 def test_empty_path(self): 1053 p = xmlrpclib.ServerProxy(URL) 1054 self.assertEqual(p.test(), "/RPC2") 1055 1056 def test_root_path(self): 1057 p = xmlrpclib.ServerProxy(URL + "/") 1058 self.assertEqual(p.test(), "/") 1059 1060 def test_empty_path_query(self): 1061 p = xmlrpclib.ServerProxy(URL + "?k=v") 1062 self.assertEqual(p.test(), "?k=v") 1063 1064 def test_empty_path_fragment(self): 1065 p = xmlrpclib.ServerProxy(URL + "#frag") 1066 self.assertEqual(p.test(), "#frag") 1067 1068 1069#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1070#does indeed serve subsequent requests on the same connection 1071class BaseKeepaliveServerTestCase(BaseServerTestCase): 1072 #a request handler that supports keep-alive and logs requests into a 1073 #class variable 1074 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1075 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1076 protocol_version = 'HTTP/1.1' 1077 myRequests = [] 1078 def handle(self): 1079 self.myRequests.append([]) 1080 self.reqidx = len(self.myRequests)-1 1081 return self.parentClass.handle(self) 1082 def handle_one_request(self): 1083 result = self.parentClass.handle_one_request(self) 1084 self.myRequests[self.reqidx].append(self.raw_requestline) 1085 return result 1086 1087 requestHandler = RequestHandler 1088 def setUp(self): 1089 #clear request log 1090 self.RequestHandler.myRequests = [] 1091 return BaseServerTestCase.setUp(self) 1092 1093#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1094#does indeed serve subsequent requests on the same connection 1095class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 1096 def test_two(self): 1097 p = xmlrpclib.ServerProxy(URL) 1098 #do three requests. 1099 self.assertEqual(p.pow(6,8), 6**8) 1100 self.assertEqual(p.pow(6,8), 6**8) 1101 self.assertEqual(p.pow(6,8), 6**8) 1102 p("close")() 1103 1104 #they should have all been handled by a single request handler 1105 self.assertEqual(len(self.RequestHandler.myRequests), 1) 1106 1107 #check that we did at least two (the third may be pending append 1108 #due to thread scheduling) 1109 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1110 1111 1112#test special attribute access on the serverproxy, through the __call__ 1113#function. 1114class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 1115 #ask for two keepalive requests to be handled. 1116 request_count=2 1117 1118 def test_close(self): 1119 p = xmlrpclib.ServerProxy(URL) 1120 #do some requests with close. 1121 self.assertEqual(p.pow(6,8), 6**8) 1122 self.assertEqual(p.pow(6,8), 6**8) 1123 self.assertEqual(p.pow(6,8), 6**8) 1124 p("close")() #this should trigger a new keep-alive request 1125 self.assertEqual(p.pow(6,8), 6**8) 1126 self.assertEqual(p.pow(6,8), 6**8) 1127 self.assertEqual(p.pow(6,8), 6**8) 1128 p("close")() 1129 1130 #they should have all been two request handlers, each having logged at least 1131 #two complete requests 1132 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1133 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1134 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 1135 1136 1137 def test_transport(self): 1138 p = xmlrpclib.ServerProxy(URL) 1139 #do some requests with close. 1140 self.assertEqual(p.pow(6,8), 6**8) 1141 p("transport").close() #same as above, really. 1142 self.assertEqual(p.pow(6,8), 6**8) 1143 p("close")() 1144 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1145 1146#A test case that verifies that gzip encoding works in both directions 1147#(for a request and the response) 1148@unittest.skipIf(gzip is None, 'requires gzip') 1149class GzipServerTestCase(BaseServerTestCase): 1150 #a request handler that supports keep-alive and logs requests into a 1151 #class variable 1152 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1153 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1154 protocol_version = 'HTTP/1.1' 1155 1156 def do_POST(self): 1157 #store content of last request in class 1158 self.__class__.content_length = int(self.headers["content-length"]) 1159 return self.parentClass.do_POST(self) 1160 requestHandler = RequestHandler 1161 1162 class Transport(xmlrpclib.Transport): 1163 #custom transport, stores the response length for our perusal 1164 fake_gzip = False 1165 def parse_response(self, response): 1166 self.response_length=int(response.getheader("content-length", 0)) 1167 return xmlrpclib.Transport.parse_response(self, response) 1168 1169 def send_content(self, connection, body): 1170 if self.fake_gzip: 1171 #add a lone gzip header to induce decode error remotely 1172 connection.putheader("Content-Encoding", "gzip") 1173 return xmlrpclib.Transport.send_content(self, connection, body) 1174 1175 def setUp(self): 1176 BaseServerTestCase.setUp(self) 1177 1178 def test_gzip_request(self): 1179 t = self.Transport() 1180 t.encode_threshold = None 1181 p = xmlrpclib.ServerProxy(URL, transport=t) 1182 self.assertEqual(p.pow(6,8), 6**8) 1183 a = self.RequestHandler.content_length 1184 t.encode_threshold = 0 #turn on request encoding 1185 self.assertEqual(p.pow(6,8), 6**8) 1186 b = self.RequestHandler.content_length 1187 self.assertTrue(a>b) 1188 p("close")() 1189 1190 def test_bad_gzip_request(self): 1191 t = self.Transport() 1192 t.encode_threshold = None 1193 t.fake_gzip = True 1194 p = xmlrpclib.ServerProxy(URL, transport=t) 1195 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1196 re.compile(r"\b400\b")) 1197 with cm: 1198 p.pow(6, 8) 1199 p("close")() 1200 1201 def test_gzip_response(self): 1202 t = self.Transport() 1203 p = xmlrpclib.ServerProxy(URL, transport=t) 1204 old = self.requestHandler.encode_threshold 1205 self.requestHandler.encode_threshold = None #no encoding 1206 self.assertEqual(p.pow(6,8), 6**8) 1207 a = t.response_length 1208 self.requestHandler.encode_threshold = 0 #always encode 1209 self.assertEqual(p.pow(6,8), 6**8) 1210 p("close")() 1211 b = t.response_length 1212 self.requestHandler.encode_threshold = old 1213 self.assertTrue(a>b) 1214 1215 1216@unittest.skipIf(gzip is None, 'requires gzip') 1217class GzipUtilTestCase(unittest.TestCase): 1218 1219 def test_gzip_decode_limit(self): 1220 max_gzip_decode = 20 * 1024 * 1024 1221 data = b'\0' * max_gzip_decode 1222 encoded = xmlrpclib.gzip_encode(data) 1223 decoded = xmlrpclib.gzip_decode(encoded) 1224 self.assertEqual(len(decoded), max_gzip_decode) 1225 1226 data = b'\0' * (max_gzip_decode + 1) 1227 encoded = xmlrpclib.gzip_encode(data) 1228 1229 with self.assertRaisesRegex(ValueError, 1230 "max gzipped payload length exceeded"): 1231 xmlrpclib.gzip_decode(encoded) 1232 1233 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1234 1235 1236class HeadersServerTestCase(BaseServerTestCase): 1237 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1238 test_headers = None 1239 1240 def do_POST(self): 1241 self.__class__.test_headers = self.headers 1242 return super().do_POST() 1243 requestHandler = RequestHandler 1244 standard_headers = [ 1245 'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent', 1246 'Content-Length'] 1247 1248 def setUp(self): 1249 self.RequestHandler.test_headers = None 1250 return super().setUp() 1251 1252 def assertContainsAdditionalHeaders(self, headers, additional): 1253 expected_keys = sorted(self.standard_headers + list(additional.keys())) 1254 self.assertListEqual(sorted(headers.keys()), expected_keys) 1255 1256 for key, value in additional.items(): 1257 self.assertEqual(headers.get(key), value) 1258 1259 def test_header(self): 1260 p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')]) 1261 self.assertEqual(p.pow(6, 8), 6**8) 1262 1263 headers = self.RequestHandler.test_headers 1264 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1265 1266 def test_header_many(self): 1267 p = xmlrpclib.ServerProxy( 1268 URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')]) 1269 self.assertEqual(p.pow(6, 8), 6**8) 1270 1271 headers = self.RequestHandler.test_headers 1272 self.assertContainsAdditionalHeaders( 1273 headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'}) 1274 1275 def test_header_empty(self): 1276 p = xmlrpclib.ServerProxy(URL, headers=[]) 1277 self.assertEqual(p.pow(6, 8), 6**8) 1278 1279 headers = self.RequestHandler.test_headers 1280 self.assertContainsAdditionalHeaders(headers, {}) 1281 1282 def test_header_tuple(self): 1283 p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),)) 1284 self.assertEqual(p.pow(6, 8), 6**8) 1285 1286 headers = self.RequestHandler.test_headers 1287 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1288 1289 def test_header_items(self): 1290 p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items()) 1291 self.assertEqual(p.pow(6, 8), 6**8) 1292 1293 headers = self.RequestHandler.test_headers 1294 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1295 1296 1297#Test special attributes of the ServerProxy object 1298class ServerProxyTestCase(unittest.TestCase): 1299 def setUp(self): 1300 unittest.TestCase.setUp(self) 1301 # Actual value of the URL doesn't matter if it is a string in 1302 # the correct format. 1303 self.url = 'http://fake.localhost' 1304 1305 def test_close(self): 1306 p = xmlrpclib.ServerProxy(self.url) 1307 self.assertEqual(p('close')(), None) 1308 1309 def test_transport(self): 1310 t = xmlrpclib.Transport() 1311 p = xmlrpclib.ServerProxy(self.url, transport=t) 1312 self.assertEqual(p('transport'), t) 1313 1314 1315# This is a contrived way to make a failure occur on the server side 1316# in order to test the _send_traceback_header flag on the server 1317class FailingMessageClass(http.client.HTTPMessage): 1318 def get(self, key, failobj=None): 1319 key = key.lower() 1320 if key == 'content-length': 1321 return 'I am broken' 1322 return super().get(key, failobj) 1323 1324 1325class FailingServerTestCase(unittest.TestCase): 1326 def setUp(self): 1327 self.evt = threading.Event() 1328 # start server thread to handle requests 1329 serv_args = (self.evt, 1) 1330 thread = threading.Thread(target=http_server, args=serv_args) 1331 thread.start() 1332 self.addCleanup(thread.join) 1333 1334 # wait for the server to be ready 1335 self.evt.wait() 1336 self.evt.clear() 1337 1338 def tearDown(self): 1339 # wait on the server thread to terminate 1340 self.evt.wait() 1341 # reset flag 1342 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1343 # reset message class 1344 default_class = http.client.HTTPMessage 1345 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1346 1347 def test_basic(self): 1348 # check that flag is false by default 1349 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1350 self.assertEqual(flagval, False) 1351 1352 # enable traceback reporting 1353 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1354 1355 # test a call that shouldn't fail just as a smoke test 1356 try: 1357 p = xmlrpclib.ServerProxy(URL) 1358 self.assertEqual(p.pow(6,8), 6**8) 1359 except (xmlrpclib.ProtocolError, OSError) as e: 1360 # ignore failures due to non-blocking socket 'unavailable' errors 1361 if not is_unavailable_exception(e): 1362 # protocol error; provide additional information in test output 1363 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1364 1365 def test_fail_no_info(self): 1366 # use the broken message class 1367 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1368 1369 try: 1370 p = xmlrpclib.ServerProxy(URL) 1371 p.pow(6,8) 1372 except (xmlrpclib.ProtocolError, OSError) as e: 1373 # ignore failures due to non-blocking socket 'unavailable' errors 1374 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1375 # The two server-side error headers shouldn't be sent back in this case 1376 self.assertTrue(e.headers.get("X-exception") is None) 1377 self.assertTrue(e.headers.get("X-traceback") is None) 1378 else: 1379 self.fail('ProtocolError not raised') 1380 1381 def test_fail_with_info(self): 1382 # use the broken message class 1383 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1384 1385 # Check that errors in the server send back exception/traceback 1386 # info when flag is set 1387 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1388 1389 try: 1390 p = xmlrpclib.ServerProxy(URL) 1391 p.pow(6,8) 1392 except (xmlrpclib.ProtocolError, OSError) as e: 1393 # ignore failures due to non-blocking socket 'unavailable' errors 1394 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1395 # We should get error info in the response 1396 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1397 self.assertEqual(e.headers.get("X-exception"), expected_err) 1398 self.assertTrue(e.headers.get("X-traceback") is not None) 1399 else: 1400 self.fail('ProtocolError not raised') 1401 1402 1403@contextlib.contextmanager 1404def captured_stdout(encoding='utf-8'): 1405 """A variation on support.captured_stdout() which gives a text stream 1406 having a `buffer` attribute. 1407 """ 1408 orig_stdout = sys.stdout 1409 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1410 try: 1411 yield sys.stdout 1412 finally: 1413 sys.stdout = orig_stdout 1414 1415 1416class CGIHandlerTestCase(unittest.TestCase): 1417 def setUp(self): 1418 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1419 1420 def tearDown(self): 1421 self.cgi = None 1422 1423 def test_cgi_get(self): 1424 with os_helper.EnvironmentVarGuard() as env: 1425 env['REQUEST_METHOD'] = 'GET' 1426 # if the method is GET and no request_text is given, it runs handle_get 1427 # get sysout output 1428 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1429 self.cgi.handle_request() 1430 1431 # parse Status header 1432 data_out.seek(0) 1433 handle = data_out.read() 1434 status = handle.split()[1] 1435 message = ' '.join(handle.split()[2:4]) 1436 1437 self.assertEqual(status, '400') 1438 self.assertEqual(message, 'Bad Request') 1439 1440 1441 def test_cgi_xmlrpc_response(self): 1442 data = """<?xml version='1.0'?> 1443 <methodCall> 1444 <methodName>test_method</methodName> 1445 <params> 1446 <param> 1447 <value><string>foo</string></value> 1448 </param> 1449 <param> 1450 <value><string>bar</string></value> 1451 </param> 1452 </params> 1453 </methodCall> 1454 """ 1455 1456 with os_helper.EnvironmentVarGuard() as env, \ 1457 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1458 support.captured_stdin() as data_in: 1459 data_in.write(data) 1460 data_in.seek(0) 1461 env['CONTENT_LENGTH'] = str(len(data)) 1462 self.cgi.handle_request() 1463 data_out.seek(0) 1464 1465 # will respond exception, if so, our goal is achieved ;) 1466 handle = data_out.read() 1467 1468 # start with 44th char so as not to get http header, we just 1469 # need only xml 1470 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1471 1472 # Also test the content-length returned by handle_request 1473 # Using the same test method inorder to avoid all the datapassing 1474 # boilerplate code. 1475 # Test for bug: http://bugs.python.org/issue5040 1476 1477 content = handle[handle.find("<?xml"):] 1478 1479 self.assertEqual( 1480 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1481 len(content)) 1482 1483 1484class UseBuiltinTypesTestCase(unittest.TestCase): 1485 1486 def test_use_builtin_types(self): 1487 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1488 # makes all dispatch of binary data as bytes instances, and all 1489 # dispatch of datetime argument as datetime.datetime instances. 1490 self.log = [] 1491 expected_bytes = b"my dog has fleas" 1492 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1493 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1494 def foobar(*args): 1495 self.log.extend(args) 1496 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1497 allow_none=True, encoding=None, use_builtin_types=True) 1498 handler.register_function(foobar) 1499 handler._marshaled_dispatch(marshaled) 1500 self.assertEqual(len(self.log), 2) 1501 mybytes, mydate = self.log 1502 self.assertEqual(self.log, [expected_bytes, expected_date]) 1503 self.assertIs(type(mydate), datetime.datetime) 1504 self.assertIs(type(mybytes), bytes) 1505 1506 def test_cgihandler_has_use_builtin_types_flag(self): 1507 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1508 self.assertTrue(handler.use_builtin_types) 1509 1510 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1511 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1512 use_builtin_types=True) 1513 server.server_close() 1514 self.assertTrue(server.use_builtin_types) 1515 1516 1517def setUpModule(): 1518 thread_info = threading_helper.threading_setup() 1519 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 1520 1521 1522if __name__ == "__main__": 1523 unittest.main() 1524