1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import threading 14import re 15import io 16import contextlib 17from test import support 18 19try: 20 import gzip 21except ImportError: 22 gzip = None 23 24alist = [{'astring': 'foo@bar.baz.spam', 25 'afloat': 7283.43, 26 'anint': 2**20, 27 'ashortlong': 2, 28 'anotherlist': ['.zyx.41'], 29 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 30 'b64bytes': b"my dog has fleas", 31 'b64bytearray': bytearray(b"my dog has fleas"), 32 'boolean': False, 33 'unicode': '\u4000\u6000\u8000', 34 'ukey\u4000': 'regular value', 35 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 36 'datetime2': xmlrpclib.DateTime( 37 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 38 'datetime3': xmlrpclib.DateTime( 39 datetime.datetime(2005, 2, 10, 11, 41, 23)), 40 }] 41 42class XMLRPCTestCase(unittest.TestCase): 43 44 def test_dump_load(self): 45 dump = xmlrpclib.dumps((alist,)) 46 load = xmlrpclib.loads(dump) 47 self.assertEqual(alist, load[0][0]) 48 49 def test_dump_bare_datetime(self): 50 # This checks that an unwrapped datetime.date object can be handled 51 # by the marshalling code. This can't be done via test_dump_load() 52 # since with use_builtin_types set to 1 the unmarshaller would create 53 # datetime objects for the 'datetime[123]' keys as well 54 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 55 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 56 s = xmlrpclib.dumps((dt,)) 57 58 result, m = xmlrpclib.loads(s, use_builtin_types=True) 59 (newdt,) = result 60 self.assertEqual(newdt, dt) 61 self.assertIs(type(newdt), datetime.datetime) 62 self.assertIsNone(m) 63 64 result, m = xmlrpclib.loads(s, use_builtin_types=False) 65 (newdt,) = result 66 self.assertEqual(newdt, dt) 67 self.assertIs(type(newdt), xmlrpclib.DateTime) 68 self.assertIsNone(m) 69 70 result, m = xmlrpclib.loads(s, use_datetime=True) 71 (newdt,) = result 72 self.assertEqual(newdt, dt) 73 self.assertIs(type(newdt), datetime.datetime) 74 self.assertIsNone(m) 75 76 result, m = xmlrpclib.loads(s, use_datetime=False) 77 (newdt,) = result 78 self.assertEqual(newdt, dt) 79 self.assertIs(type(newdt), xmlrpclib.DateTime) 80 self.assertIsNone(m) 81 82 83 def test_datetime_before_1900(self): 84 # same as before but with a date before 1900 85 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 86 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 87 s = xmlrpclib.dumps((dt,)) 88 89 result, m = xmlrpclib.loads(s, use_builtin_types=True) 90 (newdt,) = result 91 self.assertEqual(newdt, dt) 92 self.assertIs(type(newdt), datetime.datetime) 93 self.assertIsNone(m) 94 95 result, m = xmlrpclib.loads(s, use_builtin_types=False) 96 (newdt,) = result 97 self.assertEqual(newdt, dt) 98 self.assertIs(type(newdt), xmlrpclib.DateTime) 99 self.assertIsNone(m) 100 101 def test_bug_1164912 (self): 102 d = xmlrpclib.DateTime() 103 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 104 methodresponse=True)) 105 self.assertIsInstance(new_d.value, str) 106 107 # Check that the output of dumps() is still an 8-bit string 108 s = xmlrpclib.dumps((new_d,), methodresponse=True) 109 self.assertIsInstance(s, str) 110 111 def test_newstyle_class(self): 112 class T(object): 113 pass 114 t = T() 115 t.x = 100 116 t.y = "Hello" 117 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 118 self.assertEqual(t2, t.__dict__) 119 120 def test_dump_big_long(self): 121 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 122 123 def test_dump_bad_dict(self): 124 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 125 126 def test_dump_recursive_seq(self): 127 l = [1,2,3] 128 t = [3,4,5,l] 129 l.append(t) 130 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 131 132 def test_dump_recursive_dict(self): 133 d = {'1':1, '2':1} 134 t = {'3':3, 'd':d} 135 d['t'] = t 136 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 137 138 def test_dump_big_int(self): 139 if sys.maxsize > 2**31-1: 140 self.assertRaises(OverflowError, xmlrpclib.dumps, 141 (int(2**34),)) 142 143 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 144 self.assertRaises(OverflowError, xmlrpclib.dumps, 145 (xmlrpclib.MAXINT+1,)) 146 self.assertRaises(OverflowError, xmlrpclib.dumps, 147 (xmlrpclib.MININT-1,)) 148 149 def dummy_write(s): 150 pass 151 152 m = xmlrpclib.Marshaller() 153 m.dump_int(xmlrpclib.MAXINT, dummy_write) 154 m.dump_int(xmlrpclib.MININT, dummy_write) 155 self.assertRaises(OverflowError, m.dump_int, 156 xmlrpclib.MAXINT+1, dummy_write) 157 self.assertRaises(OverflowError, m.dump_int, 158 xmlrpclib.MININT-1, dummy_write) 159 160 def test_dump_double(self): 161 xmlrpclib.dumps((float(2 ** 34),)) 162 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 163 float(xmlrpclib.MININT))) 164 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 165 float(xmlrpclib.MININT - 42))) 166 167 def dummy_write(s): 168 pass 169 170 m = xmlrpclib.Marshaller() 171 m.dump_double(xmlrpclib.MAXINT, dummy_write) 172 m.dump_double(xmlrpclib.MININT, dummy_write) 173 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 174 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 175 176 def test_dump_none(self): 177 value = alist + [None] 178 arg1 = (alist + [None],) 179 strg = xmlrpclib.dumps(arg1, allow_none=True) 180 self.assertEqual(value, 181 xmlrpclib.loads(strg)[0][0]) 182 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 183 184 def test_dump_encoding(self): 185 value = {'key\u20ac\xa4': 186 'value\u20ac\xa4'} 187 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 188 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 189 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 190 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 191 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 192 193 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 194 methodresponse=True) 195 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 196 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 197 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 198 199 methodname = 'method\u20ac\xa4' 200 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 201 methodname=methodname) 202 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 203 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 204 205 def test_dump_bytes(self): 206 sample = b"my dog has fleas" 207 self.assertEqual(sample, xmlrpclib.Binary(sample)) 208 for type_ in bytes, bytearray, xmlrpclib.Binary: 209 value = type_(sample) 210 s = xmlrpclib.dumps((value,)) 211 212 result, m = xmlrpclib.loads(s, use_builtin_types=True) 213 (newvalue,) = result 214 self.assertEqual(newvalue, sample) 215 self.assertIs(type(newvalue), bytes) 216 self.assertIsNone(m) 217 218 result, m = xmlrpclib.loads(s, use_builtin_types=False) 219 (newvalue,) = result 220 self.assertEqual(newvalue, sample) 221 self.assertIs(type(newvalue), xmlrpclib.Binary) 222 self.assertIsNone(m) 223 224 def test_loads_unsupported(self): 225 ResponseError = xmlrpclib.ResponseError 226 data = '<params><param><value><spam/></value></param></params>' 227 self.assertRaises(ResponseError, xmlrpclib.loads, data) 228 data = ('<params><param><value><array>' 229 '<value><spam/></value>' 230 '</array></value></param></params>') 231 self.assertRaises(ResponseError, xmlrpclib.loads, data) 232 data = ('<params><param><value><struct>' 233 '<member><name>a</name><value><spam/></value></member>' 234 '<member><name>b</name><value><spam/></value></member>' 235 '</struct></value></param></params>') 236 self.assertRaises(ResponseError, xmlrpclib.loads, data) 237 238 def check_loads(self, s, value, **kwargs): 239 dump = '<params><param><value>%s</value></param></params>' % s 240 result, m = xmlrpclib.loads(dump, **kwargs) 241 (newvalue,) = result 242 self.assertEqual(newvalue, value) 243 self.assertIs(type(newvalue), type(value)) 244 self.assertIsNone(m) 245 246 def test_load_standard_types(self): 247 check = self.check_loads 248 check('string', 'string') 249 check('<string>string</string>', 'string') 250 check('<string> string</string>', ' string') 251 check('<int>2056183947</int>', 2056183947) 252 check('<int>-2056183947</int>', -2056183947) 253 check('<i4>2056183947</i4>', 2056183947) 254 check('<double>46093.78125</double>', 46093.78125) 255 check('<boolean>0</boolean>', False) 256 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 257 xmlrpclib.Binary(b'\x00byte string\xff')) 258 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 259 b'\x00byte string\xff', use_builtin_types=True) 260 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 261 xmlrpclib.DateTime('20050210T11:41:23')) 262 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 263 datetime.datetime(2005, 2, 10, 11, 41, 23), 264 use_builtin_types=True) 265 check('<array><data>' 266 '<value><int>1</int></value><value><int>2</int></value>' 267 '</data></array>', [1, 2]) 268 check('<struct>' 269 '<member><name>b</name><value><int>2</int></value></member>' 270 '<member><name>a</name><value><int>1</int></value></member>' 271 '</struct>', {'a': 1, 'b': 2}) 272 273 def test_load_extension_types(self): 274 check = self.check_loads 275 check('<nil/>', None) 276 check('<ex:nil/>', None) 277 check('<i1>205</i1>', 205) 278 check('<i2>20561</i2>', 20561) 279 check('<i8>9876543210</i8>', 9876543210) 280 check('<biginteger>98765432100123456789</biginteger>', 281 98765432100123456789) 282 check('<float>93.78125</float>', 93.78125) 283 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 284 decimal.Decimal('9876543210.0123456789')) 285 286 def test_limit_int(self): 287 check = self.check_loads 288 maxdigits = 5000 289 with support.adjust_int_max_str_digits(maxdigits): 290 s = '1' * (maxdigits + 1) 291 with self.assertRaises(ValueError): 292 check(f'<int>{s}</int>', None) 293 with self.assertRaises(ValueError): 294 check(f'<biginteger>{s}</biginteger>', None) 295 296 def test_get_host_info(self): 297 # see bug #3613, this raised a TypeError 298 transp = xmlrpc.client.Transport() 299 self.assertEqual(transp.get_host_info("user@host.tld"), 300 ('host.tld', 301 [('Authorization', 'Basic dXNlcg==')], {})) 302 303 def test_ssl_presence(self): 304 try: 305 import ssl 306 except ImportError: 307 has_ssl = False 308 else: 309 has_ssl = True 310 try: 311 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 312 except NotImplementedError: 313 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 314 except OSError: 315 self.assertTrue(has_ssl) 316 317 def test_keepalive_disconnect(self): 318 class RequestHandler(http.server.BaseHTTPRequestHandler): 319 protocol_version = "HTTP/1.1" 320 handled = False 321 322 def do_POST(self): 323 length = int(self.headers.get("Content-Length")) 324 self.rfile.read(length) 325 if self.handled: 326 self.close_connection = True 327 return 328 response = xmlrpclib.dumps((5,), methodresponse=True) 329 response = response.encode() 330 self.send_response(http.HTTPStatus.OK) 331 self.send_header("Content-Length", len(response)) 332 self.end_headers() 333 self.wfile.write(response) 334 self.handled = True 335 self.close_connection = False 336 337 def log_message(self, format, *args): 338 # don't clobber sys.stderr 339 pass 340 341 def run_server(): 342 server.socket.settimeout(float(1)) # Don't hang if client fails 343 server.handle_request() # First request and attempt at second 344 server.handle_request() # Retried second request 345 346 server = http.server.HTTPServer((support.HOST, 0), RequestHandler) 347 self.addCleanup(server.server_close) 348 thread = threading.Thread(target=run_server) 349 thread.start() 350 self.addCleanup(thread.join) 351 url = "http://{}:{}/".format(*server.server_address) 352 with xmlrpclib.ServerProxy(url) as p: 353 self.assertEqual(p.method(), 5) 354 self.assertEqual(p.method(), 5) 355 356 357class SimpleXMLRPCDispatcherTestCase(unittest.TestCase): 358 class DispatchExc(Exception): 359 """Raised inside the dispatched functions when checking for 360 chained exceptions""" 361 362 def test_call_registered_func(self): 363 """Calls explicitly registered function""" 364 # Makes sure any exception raised inside the function has no other 365 # exception chained to it 366 367 exp_params = 1, 2, 3 368 369 def dispatched_func(*params): 370 raise self.DispatchExc(params) 371 372 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 373 dispatcher.register_function(dispatched_func) 374 with self.assertRaises(self.DispatchExc) as exc_ctx: 375 dispatcher._dispatch('dispatched_func', exp_params) 376 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 377 self.assertIsNone(exc_ctx.exception.__cause__) 378 self.assertIsNone(exc_ctx.exception.__context__) 379 380 def test_call_instance_func(self): 381 """Calls a registered instance attribute as a function""" 382 # Makes sure any exception raised inside the function has no other 383 # exception chained to it 384 385 exp_params = 1, 2, 3 386 387 class DispatchedClass: 388 def dispatched_func(self, *params): 389 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params) 390 391 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 392 dispatcher.register_instance(DispatchedClass()) 393 with self.assertRaises(self.DispatchExc) as exc_ctx: 394 dispatcher._dispatch('dispatched_func', exp_params) 395 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 396 self.assertIsNone(exc_ctx.exception.__cause__) 397 self.assertIsNone(exc_ctx.exception.__context__) 398 399 def test_call_dispatch_func(self): 400 """Calls the registered instance's `_dispatch` function""" 401 # Makes sure any exception raised inside the function has no other 402 # exception chained to it 403 404 exp_method = 'method' 405 exp_params = 1, 2, 3 406 407 class TestInstance: 408 def _dispatch(self, method, params): 409 raise SimpleXMLRPCDispatcherTestCase.DispatchExc( 410 method, params) 411 412 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 413 dispatcher.register_instance(TestInstance()) 414 with self.assertRaises(self.DispatchExc) as exc_ctx: 415 dispatcher._dispatch(exp_method, exp_params) 416 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params)) 417 self.assertIsNone(exc_ctx.exception.__cause__) 418 self.assertIsNone(exc_ctx.exception.__context__) 419 420 def test_registered_func_is_none(self): 421 """Calls explicitly registered function which is None""" 422 423 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 424 dispatcher.register_function(None, name='method') 425 with self.assertRaisesRegex(Exception, 'method'): 426 dispatcher._dispatch('method', ('param',)) 427 428 def test_instance_has_no_func(self): 429 """Attempts to call nonexistent function on a registered instance""" 430 431 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 432 dispatcher.register_instance(object()) 433 with self.assertRaisesRegex(Exception, 'method'): 434 dispatcher._dispatch('method', ('param',)) 435 436 def test_cannot_locate_func(self): 437 """Calls a function that the dispatcher cannot locate""" 438 439 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 440 with self.assertRaisesRegex(Exception, 'method'): 441 dispatcher._dispatch('method', ('param',)) 442 443 444class HelperTestCase(unittest.TestCase): 445 def test_escape(self): 446 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 447 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 448 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 449 450class FaultTestCase(unittest.TestCase): 451 def test_repr(self): 452 f = xmlrpclib.Fault(42, 'Test Fault') 453 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 454 self.assertEqual(repr(f), str(f)) 455 456 def test_dump_fault(self): 457 f = xmlrpclib.Fault(42, 'Test Fault') 458 s = xmlrpclib.dumps((f,)) 459 (newf,), m = xmlrpclib.loads(s) 460 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 461 self.assertEqual(m, None) 462 463 s = xmlrpclib.Marshaller().dumps(f) 464 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 465 466 def test_dotted_attribute(self): 467 # this will raise AttributeError because code don't want us to use 468 # private methods 469 self.assertRaises(AttributeError, 470 xmlrpc.server.resolve_dotted_attribute, str, '__add') 471 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 472 473class DateTimeTestCase(unittest.TestCase): 474 def test_default(self): 475 with mock.patch('time.localtime') as localtime_mock: 476 time_struct = time.struct_time( 477 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 478 localtime_mock.return_value = time_struct 479 localtime = time.localtime() 480 t = xmlrpclib.DateTime() 481 self.assertEqual(str(t), 482 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 483 484 def test_time(self): 485 d = 1181399930.036952 486 t = xmlrpclib.DateTime(d) 487 self.assertEqual(str(t), 488 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 489 490 def test_time_tuple(self): 491 d = (2007,6,9,10,38,50,5,160,0) 492 t = xmlrpclib.DateTime(d) 493 self.assertEqual(str(t), '20070609T10:38:50') 494 495 def test_time_struct(self): 496 d = time.localtime(1181399930.036952) 497 t = xmlrpclib.DateTime(d) 498 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 499 500 def test_datetime_datetime(self): 501 d = datetime.datetime(2007,1,2,3,4,5) 502 t = xmlrpclib.DateTime(d) 503 self.assertEqual(str(t), '20070102T03:04:05') 504 505 def test_repr(self): 506 d = datetime.datetime(2007,1,2,3,4,5) 507 t = xmlrpclib.DateTime(d) 508 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 509 self.assertEqual(repr(t), val) 510 511 def test_decode(self): 512 d = ' 20070908T07:11:13 ' 513 t1 = xmlrpclib.DateTime() 514 t1.decode(d) 515 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 516 self.assertEqual(t1, tref) 517 518 t2 = xmlrpclib._datetime(d) 519 self.assertEqual(t2, tref) 520 521 def test_comparison(self): 522 now = datetime.datetime.now() 523 dtime = xmlrpclib.DateTime(now.timetuple()) 524 525 # datetime vs. DateTime 526 self.assertTrue(dtime == now) 527 self.assertTrue(now == dtime) 528 then = now + datetime.timedelta(seconds=4) 529 self.assertTrue(then >= dtime) 530 self.assertTrue(dtime < then) 531 532 # str vs. DateTime 533 dstr = now.strftime("%Y%m%dT%H:%M:%S") 534 self.assertTrue(dtime == dstr) 535 self.assertTrue(dstr == dtime) 536 dtime_then = xmlrpclib.DateTime(then.timetuple()) 537 self.assertTrue(dtime_then >= dstr) 538 self.assertTrue(dstr < dtime_then) 539 540 # some other types 541 dbytes = dstr.encode('ascii') 542 dtuple = now.timetuple() 543 with self.assertRaises(TypeError): 544 dtime == 1970 545 with self.assertRaises(TypeError): 546 dtime != dbytes 547 with self.assertRaises(TypeError): 548 dtime == bytearray(dbytes) 549 with self.assertRaises(TypeError): 550 dtime != dtuple 551 with self.assertRaises(TypeError): 552 dtime < float(1970) 553 with self.assertRaises(TypeError): 554 dtime > dbytes 555 with self.assertRaises(TypeError): 556 dtime <= bytearray(dbytes) 557 with self.assertRaises(TypeError): 558 dtime >= dtuple 559 560class BinaryTestCase(unittest.TestCase): 561 562 # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" 563 # for now (i.e. interpreting the binary data as Latin-1-encoded 564 # text). But this feels very unsatisfactory. Perhaps we should 565 # only define repr(), and return r"Binary(b'\xff')" instead? 566 567 def test_default(self): 568 t = xmlrpclib.Binary() 569 self.assertEqual(str(t), '') 570 571 def test_string(self): 572 d = b'\x01\x02\x03abc123\xff\xfe' 573 t = xmlrpclib.Binary(d) 574 self.assertEqual(str(t), str(d, "latin-1")) 575 576 def test_decode(self): 577 d = b'\x01\x02\x03abc123\xff\xfe' 578 de = base64.encodebytes(d) 579 t1 = xmlrpclib.Binary() 580 t1.decode(de) 581 self.assertEqual(str(t1), str(d, "latin-1")) 582 583 t2 = xmlrpclib._binary(de) 584 self.assertEqual(str(t2), str(d, "latin-1")) 585 586 587ADDR = PORT = URL = None 588 589# The evt is set twice. First when the server is ready to serve. 590# Second when the server has been shutdown. The user must clear 591# the event after it has been set the first time to catch the second set. 592def http_server(evt, numrequests, requestHandler=None, encoding=None): 593 class TestInstanceClass: 594 def div(self, x, y): 595 return x // y 596 597 def _methodHelp(self, name): 598 if name == 'div': 599 return 'This is the div function' 600 601 class Fixture: 602 @staticmethod 603 def getData(): 604 return '42' 605 606 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 607 def get_request(self): 608 # Ensure the socket is always non-blocking. On Linux, socket 609 # attributes are not inherited like they are on *BSD and Windows. 610 s, port = self.socket.accept() 611 s.setblocking(True) 612 return s, port 613 614 if not requestHandler: 615 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 616 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 617 encoding=encoding, 618 logRequests=False, bind_and_activate=False) 619 try: 620 serv.server_bind() 621 global ADDR, PORT, URL 622 ADDR, PORT = serv.socket.getsockname() 623 #connect to IP address directly. This avoids socket.create_connection() 624 #trying to connect to "localhost" using all address families, which 625 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 626 #on AF_INET only. 627 URL = "http://%s:%d"%(ADDR, PORT) 628 serv.server_activate() 629 serv.register_introspection_functions() 630 serv.register_multicall_functions() 631 serv.register_function(pow) 632 serv.register_function(lambda x: x, 'têšt') 633 @serv.register_function 634 def my_function(): 635 '''This is my function''' 636 return True 637 @serv.register_function(name='add') 638 def _(x, y): 639 return x + y 640 testInstance = TestInstanceClass() 641 serv.register_instance(testInstance, allow_dotted_names=True) 642 evt.set() 643 644 # handle up to 'numrequests' requests 645 while numrequests > 0: 646 serv.handle_request() 647 numrequests -= 1 648 649 except socket.timeout: 650 pass 651 finally: 652 serv.socket.close() 653 PORT = None 654 evt.set() 655 656def http_multi_server(evt, numrequests, requestHandler=None): 657 class TestInstanceClass: 658 def div(self, x, y): 659 return x // y 660 661 def _methodHelp(self, name): 662 if name == 'div': 663 return 'This is the div function' 664 665 def my_function(): 666 '''This is my function''' 667 return True 668 669 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 670 def get_request(self): 671 # Ensure the socket is always non-blocking. On Linux, socket 672 # attributes are not inherited like they are on *BSD and Windows. 673 s, port = self.socket.accept() 674 s.setblocking(True) 675 return s, port 676 677 if not requestHandler: 678 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 679 class MyRequestHandler(requestHandler): 680 rpc_paths = [] 681 682 class BrokenDispatcher: 683 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 684 raise RuntimeError("broken dispatcher") 685 686 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 687 logRequests=False, bind_and_activate=False) 688 serv.socket.settimeout(3) 689 serv.server_bind() 690 try: 691 global ADDR, PORT, URL 692 ADDR, PORT = serv.socket.getsockname() 693 #connect to IP address directly. This avoids socket.create_connection() 694 #trying to connect to "localhost" using all address families, which 695 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 696 #on AF_INET only. 697 URL = "http://%s:%d"%(ADDR, PORT) 698 serv.server_activate() 699 paths = ["/foo", "/foo/bar"] 700 for path in paths: 701 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 702 d.register_introspection_functions() 703 d.register_multicall_functions() 704 serv.get_dispatcher(paths[0]).register_function(pow) 705 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 706 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 707 evt.set() 708 709 # handle up to 'numrequests' requests 710 while numrequests > 0: 711 serv.handle_request() 712 numrequests -= 1 713 714 except socket.timeout: 715 pass 716 finally: 717 serv.socket.close() 718 PORT = None 719 evt.set() 720 721# This function prevents errors like: 722# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 723def is_unavailable_exception(e): 724 '''Returns True if the given ProtocolError is the product of a server-side 725 exception caused by the 'temporarily unavailable' response sometimes 726 given by operations on non-blocking sockets.''' 727 728 # sometimes we get a -1 error code and/or empty headers 729 try: 730 if e.errcode == -1 or e.headers is None: 731 return True 732 exc_mess = e.headers.get('X-exception') 733 except AttributeError: 734 # Ignore OSErrors here. 735 exc_mess = str(e) 736 737 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 738 return True 739 740def make_request_and_skipIf(condition, reason): 741 # If we skip the test, we have to make a request because 742 # the server created in setUp blocks expecting one to come in. 743 if not condition: 744 return lambda func: func 745 def decorator(func): 746 def make_request_and_skip(self): 747 try: 748 xmlrpclib.ServerProxy(URL).my_function() 749 except (xmlrpclib.ProtocolError, OSError) as e: 750 if not is_unavailable_exception(e): 751 raise 752 raise unittest.SkipTest(reason) 753 return make_request_and_skip 754 return decorator 755 756class BaseServerTestCase(unittest.TestCase): 757 requestHandler = None 758 request_count = 1 759 threadFunc = staticmethod(http_server) 760 761 def setUp(self): 762 # enable traceback reporting 763 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 764 765 self.evt = threading.Event() 766 # start server thread to handle requests 767 serv_args = (self.evt, self.request_count, self.requestHandler) 768 thread = threading.Thread(target=self.threadFunc, args=serv_args) 769 thread.start() 770 self.addCleanup(thread.join) 771 772 # wait for the server to be ready 773 self.evt.wait() 774 self.evt.clear() 775 776 def tearDown(self): 777 # wait on the server thread to terminate 778 self.evt.wait() 779 780 # disable traceback reporting 781 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 782 783class SimpleServerTestCase(BaseServerTestCase): 784 def test_simple1(self): 785 try: 786 p = xmlrpclib.ServerProxy(URL) 787 self.assertEqual(p.pow(6,8), 6**8) 788 except (xmlrpclib.ProtocolError, OSError) as e: 789 # ignore failures due to non-blocking socket 'unavailable' errors 790 if not is_unavailable_exception(e): 791 # protocol error; provide additional information in test output 792 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 793 794 def test_nonascii(self): 795 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 796 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 797 try: 798 p = xmlrpclib.ServerProxy(URL) 799 self.assertEqual(p.add(start_string, end_string), 800 start_string + end_string) 801 except (xmlrpclib.ProtocolError, OSError) as e: 802 # ignore failures due to non-blocking socket 'unavailable' errors 803 if not is_unavailable_exception(e): 804 # protocol error; provide additional information in test output 805 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 806 807 def test_client_encoding(self): 808 start_string = '\u20ac' 809 end_string = '\xa4' 810 811 try: 812 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 813 self.assertEqual(p.add(start_string, end_string), 814 start_string + end_string) 815 except (xmlrpclib.ProtocolError, socket.error) as e: 816 # ignore failures due to non-blocking socket unavailable errors. 817 if not is_unavailable_exception(e): 818 # protocol error; provide additional information in test output 819 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 820 821 def test_nonascii_methodname(self): 822 try: 823 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 824 self.assertEqual(p.têšt(42), 42) 825 except (xmlrpclib.ProtocolError, socket.error) as e: 826 # ignore failures due to non-blocking socket unavailable errors. 827 if not is_unavailable_exception(e): 828 # protocol error; provide additional information in test output 829 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 830 831 def test_404(self): 832 # send POST with http.client, it should return 404 header and 833 # 'Not Found' message. 834 with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: 835 conn.request('POST', '/this-is-not-valid') 836 response = conn.getresponse() 837 838 self.assertEqual(response.status, 404) 839 self.assertEqual(response.reason, 'Not Found') 840 841 def test_introspection1(self): 842 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 843 'system.listMethods', 'system.methodHelp', 844 'system.methodSignature', 'system.multicall', 845 'Fixture']) 846 try: 847 p = xmlrpclib.ServerProxy(URL) 848 meth = p.system.listMethods() 849 self.assertEqual(set(meth), expected_methods) 850 except (xmlrpclib.ProtocolError, OSError) as e: 851 # ignore failures due to non-blocking socket 'unavailable' errors 852 if not is_unavailable_exception(e): 853 # protocol error; provide additional information in test output 854 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 855 856 857 def test_introspection2(self): 858 try: 859 # test _methodHelp() 860 p = xmlrpclib.ServerProxy(URL) 861 divhelp = p.system.methodHelp('div') 862 self.assertEqual(divhelp, 'This is the div function') 863 except (xmlrpclib.ProtocolError, OSError) as e: 864 # ignore failures due to non-blocking socket 'unavailable' errors 865 if not is_unavailable_exception(e): 866 # protocol error; provide additional information in test output 867 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 868 869 @make_request_and_skipIf(sys.flags.optimize >= 2, 870 "Docstrings are omitted with -O2 and above") 871 def test_introspection3(self): 872 try: 873 # test native doc 874 p = xmlrpclib.ServerProxy(URL) 875 myfunction = p.system.methodHelp('my_function') 876 self.assertEqual(myfunction, 'This is my function') 877 except (xmlrpclib.ProtocolError, OSError) as e: 878 # ignore failures due to non-blocking socket 'unavailable' errors 879 if not is_unavailable_exception(e): 880 # protocol error; provide additional information in test output 881 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 882 883 def test_introspection4(self): 884 # the SimpleXMLRPCServer doesn't support signatures, but 885 # at least check that we can try making the call 886 try: 887 p = xmlrpclib.ServerProxy(URL) 888 divsig = p.system.methodSignature('div') 889 self.assertEqual(divsig, 'signatures not supported') 890 except (xmlrpclib.ProtocolError, OSError) as e: 891 # ignore failures due to non-blocking socket 'unavailable' errors 892 if not is_unavailable_exception(e): 893 # protocol error; provide additional information in test output 894 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 895 896 def test_multicall(self): 897 try: 898 p = xmlrpclib.ServerProxy(URL) 899 multicall = xmlrpclib.MultiCall(p) 900 multicall.add(2,3) 901 multicall.pow(6,8) 902 multicall.div(127,42) 903 add_result, pow_result, div_result = multicall() 904 self.assertEqual(add_result, 2+3) 905 self.assertEqual(pow_result, 6**8) 906 self.assertEqual(div_result, 127//42) 907 except (xmlrpclib.ProtocolError, OSError) as e: 908 # ignore failures due to non-blocking socket 'unavailable' errors 909 if not is_unavailable_exception(e): 910 # protocol error; provide additional information in test output 911 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 912 913 def test_non_existing_multicall(self): 914 try: 915 p = xmlrpclib.ServerProxy(URL) 916 multicall = xmlrpclib.MultiCall(p) 917 multicall.this_is_not_exists() 918 result = multicall() 919 920 # result.results contains; 921 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 922 # 'method "this_is_not_exists" is not supported'>}] 923 924 self.assertEqual(result.results[0]['faultCode'], 1) 925 self.assertEqual(result.results[0]['faultString'], 926 '<class \'Exception\'>:method "this_is_not_exists" ' 927 'is not supported') 928 except (xmlrpclib.ProtocolError, OSError) as e: 929 # ignore failures due to non-blocking socket 'unavailable' errors 930 if not is_unavailable_exception(e): 931 # protocol error; provide additional information in test output 932 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 933 934 def test_dotted_attribute(self): 935 # Raises an AttributeError because private methods are not allowed. 936 self.assertRaises(AttributeError, 937 xmlrpc.server.resolve_dotted_attribute, str, '__add') 938 939 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 940 # Get the test to run faster by sending a request with test_simple1. 941 # This avoids waiting for the socket timeout. 942 self.test_simple1() 943 944 def test_allow_dotted_names_true(self): 945 # XXX also need allow_dotted_names_false test. 946 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 947 data = server.Fixture.getData() 948 self.assertEqual(data, '42') 949 950 def test_unicode_host(self): 951 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 952 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 953 954 def test_partial_post(self): 955 # Check that a partial POST doesn't make the server loop: issue #14001. 956 with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn: 957 conn.send('POST /RPC2 HTTP/1.0\r\n' 958 'Content-Length: 100\r\n\r\n' 959 'bye HTTP/1.1\r\n' 960 f'Host: {ADDR}:{PORT}\r\n' 961 'Accept-Encoding: identity\r\n' 962 'Content-Length: 0\r\n\r\n'.encode('ascii')) 963 964 def test_context_manager(self): 965 with xmlrpclib.ServerProxy(URL) as server: 966 server.add(2, 3) 967 self.assertNotEqual(server('transport')._connection, 968 (None, None)) 969 self.assertEqual(server('transport')._connection, 970 (None, None)) 971 972 def test_context_manager_method_error(self): 973 try: 974 with xmlrpclib.ServerProxy(URL) as server: 975 server.add(2, "a") 976 except xmlrpclib.Fault: 977 pass 978 self.assertEqual(server('transport')._connection, 979 (None, None)) 980 981 982class SimpleServerEncodingTestCase(BaseServerTestCase): 983 @staticmethod 984 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 985 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 986 987 def test_server_encoding(self): 988 start_string = '\u20ac' 989 end_string = '\xa4' 990 991 try: 992 p = xmlrpclib.ServerProxy(URL) 993 self.assertEqual(p.add(start_string, end_string), 994 start_string + end_string) 995 except (xmlrpclib.ProtocolError, socket.error) as e: 996 # ignore failures due to non-blocking socket unavailable errors. 997 if not is_unavailable_exception(e): 998 # protocol error; provide additional information in test output 999 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1000 1001 1002class MultiPathServerTestCase(BaseServerTestCase): 1003 threadFunc = staticmethod(http_multi_server) 1004 request_count = 2 1005 def test_path1(self): 1006 p = xmlrpclib.ServerProxy(URL+"/foo") 1007 self.assertEqual(p.pow(6,8), 6**8) 1008 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1009 1010 def test_path2(self): 1011 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 1012 self.assertEqual(p.add(6,8), 6+8) 1013 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 1014 1015 def test_path3(self): 1016 p = xmlrpclib.ServerProxy(URL+"/is/broken") 1017 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1018 1019#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1020#does indeed serve subsequent requests on the same connection 1021class BaseKeepaliveServerTestCase(BaseServerTestCase): 1022 #a request handler that supports keep-alive and logs requests into a 1023 #class variable 1024 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1025 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1026 protocol_version = 'HTTP/1.1' 1027 myRequests = [] 1028 def handle(self): 1029 self.myRequests.append([]) 1030 self.reqidx = len(self.myRequests)-1 1031 return self.parentClass.handle(self) 1032 def handle_one_request(self): 1033 result = self.parentClass.handle_one_request(self) 1034 self.myRequests[self.reqidx].append(self.raw_requestline) 1035 return result 1036 1037 requestHandler = RequestHandler 1038 def setUp(self): 1039 #clear request log 1040 self.RequestHandler.myRequests = [] 1041 return BaseServerTestCase.setUp(self) 1042 1043#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1044#does indeed serve subsequent requests on the same connection 1045class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 1046 def test_two(self): 1047 p = xmlrpclib.ServerProxy(URL) 1048 #do three requests. 1049 self.assertEqual(p.pow(6,8), 6**8) 1050 self.assertEqual(p.pow(6,8), 6**8) 1051 self.assertEqual(p.pow(6,8), 6**8) 1052 p("close")() 1053 1054 #they should have all been handled by a single request handler 1055 self.assertEqual(len(self.RequestHandler.myRequests), 1) 1056 1057 #check that we did at least two (the third may be pending append 1058 #due to thread scheduling) 1059 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1060 1061 1062#test special attribute access on the serverproxy, through the __call__ 1063#function. 1064class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 1065 #ask for two keepalive requests to be handled. 1066 request_count=2 1067 1068 def test_close(self): 1069 p = xmlrpclib.ServerProxy(URL) 1070 #do some requests with close. 1071 self.assertEqual(p.pow(6,8), 6**8) 1072 self.assertEqual(p.pow(6,8), 6**8) 1073 self.assertEqual(p.pow(6,8), 6**8) 1074 p("close")() #this should trigger a new keep-alive request 1075 self.assertEqual(p.pow(6,8), 6**8) 1076 self.assertEqual(p.pow(6,8), 6**8) 1077 self.assertEqual(p.pow(6,8), 6**8) 1078 p("close")() 1079 1080 #they should have all been two request handlers, each having logged at least 1081 #two complete requests 1082 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1083 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1084 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 1085 1086 1087 def test_transport(self): 1088 p = xmlrpclib.ServerProxy(URL) 1089 #do some requests with close. 1090 self.assertEqual(p.pow(6,8), 6**8) 1091 p("transport").close() #same as above, really. 1092 self.assertEqual(p.pow(6,8), 6**8) 1093 p("close")() 1094 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1095 1096#A test case that verifies that gzip encoding works in both directions 1097#(for a request and the response) 1098@unittest.skipIf(gzip is None, 'requires gzip') 1099class GzipServerTestCase(BaseServerTestCase): 1100 #a request handler that supports keep-alive and logs requests into a 1101 #class variable 1102 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1103 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1104 protocol_version = 'HTTP/1.1' 1105 1106 def do_POST(self): 1107 #store content of last request in class 1108 self.__class__.content_length = int(self.headers["content-length"]) 1109 return self.parentClass.do_POST(self) 1110 requestHandler = RequestHandler 1111 1112 class Transport(xmlrpclib.Transport): 1113 #custom transport, stores the response length for our perusal 1114 fake_gzip = False 1115 def parse_response(self, response): 1116 self.response_length=int(response.getheader("content-length", 0)) 1117 return xmlrpclib.Transport.parse_response(self, response) 1118 1119 def send_content(self, connection, body): 1120 if self.fake_gzip: 1121 #add a lone gzip header to induce decode error remotely 1122 connection.putheader("Content-Encoding", "gzip") 1123 return xmlrpclib.Transport.send_content(self, connection, body) 1124 1125 def setUp(self): 1126 BaseServerTestCase.setUp(self) 1127 1128 def test_gzip_request(self): 1129 t = self.Transport() 1130 t.encode_threshold = None 1131 p = xmlrpclib.ServerProxy(URL, transport=t) 1132 self.assertEqual(p.pow(6,8), 6**8) 1133 a = self.RequestHandler.content_length 1134 t.encode_threshold = 0 #turn on request encoding 1135 self.assertEqual(p.pow(6,8), 6**8) 1136 b = self.RequestHandler.content_length 1137 self.assertTrue(a>b) 1138 p("close")() 1139 1140 def test_bad_gzip_request(self): 1141 t = self.Transport() 1142 t.encode_threshold = None 1143 t.fake_gzip = True 1144 p = xmlrpclib.ServerProxy(URL, transport=t) 1145 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1146 re.compile(r"\b400\b")) 1147 with cm: 1148 p.pow(6, 8) 1149 p("close")() 1150 1151 def test_gzip_response(self): 1152 t = self.Transport() 1153 p = xmlrpclib.ServerProxy(URL, transport=t) 1154 old = self.requestHandler.encode_threshold 1155 self.requestHandler.encode_threshold = None #no encoding 1156 self.assertEqual(p.pow(6,8), 6**8) 1157 a = t.response_length 1158 self.requestHandler.encode_threshold = 0 #always encode 1159 self.assertEqual(p.pow(6,8), 6**8) 1160 p("close")() 1161 b = t.response_length 1162 self.requestHandler.encode_threshold = old 1163 self.assertTrue(a>b) 1164 1165 1166@unittest.skipIf(gzip is None, 'requires gzip') 1167class GzipUtilTestCase(unittest.TestCase): 1168 1169 def test_gzip_decode_limit(self): 1170 max_gzip_decode = 20 * 1024 * 1024 1171 data = b'\0' * max_gzip_decode 1172 encoded = xmlrpclib.gzip_encode(data) 1173 decoded = xmlrpclib.gzip_decode(encoded) 1174 self.assertEqual(len(decoded), max_gzip_decode) 1175 1176 data = b'\0' * (max_gzip_decode + 1) 1177 encoded = xmlrpclib.gzip_encode(data) 1178 1179 with self.assertRaisesRegex(ValueError, 1180 "max gzipped payload length exceeded"): 1181 xmlrpclib.gzip_decode(encoded) 1182 1183 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1184 1185 1186class HeadersServerTestCase(BaseServerTestCase): 1187 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1188 test_headers = None 1189 1190 def do_POST(self): 1191 self.__class__.test_headers = self.headers 1192 return super().do_POST() 1193 requestHandler = RequestHandler 1194 standard_headers = [ 1195 'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent', 1196 'Content-Length'] 1197 1198 def setUp(self): 1199 self.RequestHandler.test_headers = None 1200 return super().setUp() 1201 1202 def assertContainsAdditionalHeaders(self, headers, additional): 1203 expected_keys = sorted(self.standard_headers + list(additional.keys())) 1204 self.assertListEqual(sorted(headers.keys()), expected_keys) 1205 1206 for key, value in additional.items(): 1207 self.assertEqual(headers.get(key), value) 1208 1209 def test_header(self): 1210 p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')]) 1211 self.assertEqual(p.pow(6, 8), 6**8) 1212 1213 headers = self.RequestHandler.test_headers 1214 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1215 1216 def test_header_many(self): 1217 p = xmlrpclib.ServerProxy( 1218 URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')]) 1219 self.assertEqual(p.pow(6, 8), 6**8) 1220 1221 headers = self.RequestHandler.test_headers 1222 self.assertContainsAdditionalHeaders( 1223 headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'}) 1224 1225 def test_header_empty(self): 1226 p = xmlrpclib.ServerProxy(URL, headers=[]) 1227 self.assertEqual(p.pow(6, 8), 6**8) 1228 1229 headers = self.RequestHandler.test_headers 1230 self.assertContainsAdditionalHeaders(headers, {}) 1231 1232 def test_header_tuple(self): 1233 p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),)) 1234 self.assertEqual(p.pow(6, 8), 6**8) 1235 1236 headers = self.RequestHandler.test_headers 1237 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1238 1239 def test_header_items(self): 1240 p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items()) 1241 self.assertEqual(p.pow(6, 8), 6**8) 1242 1243 headers = self.RequestHandler.test_headers 1244 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1245 1246 1247#Test special attributes of the ServerProxy object 1248class ServerProxyTestCase(unittest.TestCase): 1249 def setUp(self): 1250 unittest.TestCase.setUp(self) 1251 # Actual value of the URL doesn't matter if it is a string in 1252 # the correct format. 1253 self.url = 'http://fake.localhost' 1254 1255 def test_close(self): 1256 p = xmlrpclib.ServerProxy(self.url) 1257 self.assertEqual(p('close')(), None) 1258 1259 def test_transport(self): 1260 t = xmlrpclib.Transport() 1261 p = xmlrpclib.ServerProxy(self.url, transport=t) 1262 self.assertEqual(p('transport'), t) 1263 1264 1265# This is a contrived way to make a failure occur on the server side 1266# in order to test the _send_traceback_header flag on the server 1267class FailingMessageClass(http.client.HTTPMessage): 1268 def get(self, key, failobj=None): 1269 key = key.lower() 1270 if key == 'content-length': 1271 return 'I am broken' 1272 return super().get(key, failobj) 1273 1274 1275class FailingServerTestCase(unittest.TestCase): 1276 def setUp(self): 1277 self.evt = threading.Event() 1278 # start server thread to handle requests 1279 serv_args = (self.evt, 1) 1280 thread = threading.Thread(target=http_server, args=serv_args) 1281 thread.start() 1282 self.addCleanup(thread.join) 1283 1284 # wait for the server to be ready 1285 self.evt.wait() 1286 self.evt.clear() 1287 1288 def tearDown(self): 1289 # wait on the server thread to terminate 1290 self.evt.wait() 1291 # reset flag 1292 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1293 # reset message class 1294 default_class = http.client.HTTPMessage 1295 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1296 1297 def test_basic(self): 1298 # check that flag is false by default 1299 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1300 self.assertEqual(flagval, False) 1301 1302 # enable traceback reporting 1303 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1304 1305 # test a call that shouldn't fail just as a smoke test 1306 try: 1307 p = xmlrpclib.ServerProxy(URL) 1308 self.assertEqual(p.pow(6,8), 6**8) 1309 except (xmlrpclib.ProtocolError, OSError) as e: 1310 # ignore failures due to non-blocking socket 'unavailable' errors 1311 if not is_unavailable_exception(e): 1312 # protocol error; provide additional information in test output 1313 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1314 1315 def test_fail_no_info(self): 1316 # use the broken message class 1317 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1318 1319 try: 1320 p = xmlrpclib.ServerProxy(URL) 1321 p.pow(6,8) 1322 except (xmlrpclib.ProtocolError, OSError) as e: 1323 # ignore failures due to non-blocking socket 'unavailable' errors 1324 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1325 # The two server-side error headers shouldn't be sent back in this case 1326 self.assertTrue(e.headers.get("X-exception") is None) 1327 self.assertTrue(e.headers.get("X-traceback") is None) 1328 else: 1329 self.fail('ProtocolError not raised') 1330 1331 def test_fail_with_info(self): 1332 # use the broken message class 1333 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1334 1335 # Check that errors in the server send back exception/traceback 1336 # info when flag is set 1337 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1338 1339 try: 1340 p = xmlrpclib.ServerProxy(URL) 1341 p.pow(6,8) 1342 except (xmlrpclib.ProtocolError, OSError) as e: 1343 # ignore failures due to non-blocking socket 'unavailable' errors 1344 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1345 # We should get error info in the response 1346 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1347 self.assertEqual(e.headers.get("X-exception"), expected_err) 1348 self.assertTrue(e.headers.get("X-traceback") is not None) 1349 else: 1350 self.fail('ProtocolError not raised') 1351 1352 1353@contextlib.contextmanager 1354def captured_stdout(encoding='utf-8'): 1355 """A variation on support.captured_stdout() which gives a text stream 1356 having a `buffer` attribute. 1357 """ 1358 orig_stdout = sys.stdout 1359 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1360 try: 1361 yield sys.stdout 1362 finally: 1363 sys.stdout = orig_stdout 1364 1365 1366class CGIHandlerTestCase(unittest.TestCase): 1367 def setUp(self): 1368 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1369 1370 def tearDown(self): 1371 self.cgi = None 1372 1373 def test_cgi_get(self): 1374 with support.EnvironmentVarGuard() as env: 1375 env['REQUEST_METHOD'] = 'GET' 1376 # if the method is GET and no request_text is given, it runs handle_get 1377 # get sysout output 1378 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1379 self.cgi.handle_request() 1380 1381 # parse Status header 1382 data_out.seek(0) 1383 handle = data_out.read() 1384 status = handle.split()[1] 1385 message = ' '.join(handle.split()[2:4]) 1386 1387 self.assertEqual(status, '400') 1388 self.assertEqual(message, 'Bad Request') 1389 1390 1391 def test_cgi_xmlrpc_response(self): 1392 data = """<?xml version='1.0'?> 1393 <methodCall> 1394 <methodName>test_method</methodName> 1395 <params> 1396 <param> 1397 <value><string>foo</string></value> 1398 </param> 1399 <param> 1400 <value><string>bar</string></value> 1401 </param> 1402 </params> 1403 </methodCall> 1404 """ 1405 1406 with support.EnvironmentVarGuard() as env, \ 1407 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1408 support.captured_stdin() as data_in: 1409 data_in.write(data) 1410 data_in.seek(0) 1411 env['CONTENT_LENGTH'] = str(len(data)) 1412 self.cgi.handle_request() 1413 data_out.seek(0) 1414 1415 # will respond exception, if so, our goal is achieved ;) 1416 handle = data_out.read() 1417 1418 # start with 44th char so as not to get http header, we just 1419 # need only xml 1420 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1421 1422 # Also test the content-length returned by handle_request 1423 # Using the same test method inorder to avoid all the datapassing 1424 # boilerplate code. 1425 # Test for bug: http://bugs.python.org/issue5040 1426 1427 content = handle[handle.find("<?xml"):] 1428 1429 self.assertEqual( 1430 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1431 len(content)) 1432 1433 1434class UseBuiltinTypesTestCase(unittest.TestCase): 1435 1436 def test_use_builtin_types(self): 1437 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1438 # makes all dispatch of binary data as bytes instances, and all 1439 # dispatch of datetime argument as datetime.datetime instances. 1440 self.log = [] 1441 expected_bytes = b"my dog has fleas" 1442 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1443 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1444 def foobar(*args): 1445 self.log.extend(args) 1446 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1447 allow_none=True, encoding=None, use_builtin_types=True) 1448 handler.register_function(foobar) 1449 handler._marshaled_dispatch(marshaled) 1450 self.assertEqual(len(self.log), 2) 1451 mybytes, mydate = self.log 1452 self.assertEqual(self.log, [expected_bytes, expected_date]) 1453 self.assertIs(type(mydate), datetime.datetime) 1454 self.assertIs(type(mybytes), bytes) 1455 1456 def test_cgihandler_has_use_builtin_types_flag(self): 1457 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1458 self.assertTrue(handler.use_builtin_types) 1459 1460 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1461 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1462 use_builtin_types=True) 1463 server.server_close() 1464 self.assertTrue(server.use_builtin_types) 1465 1466 1467@support.reap_threads 1468def test_main(): 1469 support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 1470 BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase, 1471 SimpleServerTestCase, SimpleServerEncodingTestCase, 1472 KeepaliveServerTestCase1, KeepaliveServerTestCase2, 1473 GzipServerTestCase, GzipUtilTestCase, HeadersServerTestCase, 1474 MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase, 1475 CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase) 1476 1477 1478if __name__ == "__main__": 1479 test_main() 1480