1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import threading 14import re 15import io 16import contextlib 17from test import support 18from test.support import socket_helper 19from test.support import ALWAYS_EQ, LARGEST, SMALLEST 20 21try: 22 import gzip 23except ImportError: 24 gzip = None 25 26alist = [{'astring': 'foo@bar.baz.spam', 27 'afloat': 7283.43, 28 'anint': 2**20, 29 'ashortlong': 2, 30 'anotherlist': ['.zyx.41'], 31 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 32 'b64bytes': b"my dog has fleas", 33 'b64bytearray': bytearray(b"my dog has fleas"), 34 'boolean': False, 35 'unicode': '\u4000\u6000\u8000', 36 'ukey\u4000': 'regular value', 37 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 38 'datetime2': xmlrpclib.DateTime( 39 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 40 'datetime3': xmlrpclib.DateTime( 41 datetime.datetime(2005, 2, 10, 11, 41, 23)), 42 }] 43 44class XMLRPCTestCase(unittest.TestCase): 45 46 def test_dump_load(self): 47 dump = xmlrpclib.dumps((alist,)) 48 load = xmlrpclib.loads(dump) 49 self.assertEqual(alist, load[0][0]) 50 51 def test_dump_bare_datetime(self): 52 # This checks that an unwrapped datetime.date object can be handled 53 # by the marshalling code. This can't be done via test_dump_load() 54 # since with use_builtin_types set to 1 the unmarshaller would create 55 # datetime objects for the 'datetime[123]' keys as well 56 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 57 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 58 s = xmlrpclib.dumps((dt,)) 59 60 result, m = xmlrpclib.loads(s, use_builtin_types=True) 61 (newdt,) = result 62 self.assertEqual(newdt, dt) 63 self.assertIs(type(newdt), datetime.datetime) 64 self.assertIsNone(m) 65 66 result, m = xmlrpclib.loads(s, use_builtin_types=False) 67 (newdt,) = result 68 self.assertEqual(newdt, dt) 69 self.assertIs(type(newdt), xmlrpclib.DateTime) 70 self.assertIsNone(m) 71 72 result, m = xmlrpclib.loads(s, use_datetime=True) 73 (newdt,) = result 74 self.assertEqual(newdt, dt) 75 self.assertIs(type(newdt), datetime.datetime) 76 self.assertIsNone(m) 77 78 result, m = xmlrpclib.loads(s, use_datetime=False) 79 (newdt,) = result 80 self.assertEqual(newdt, dt) 81 self.assertIs(type(newdt), xmlrpclib.DateTime) 82 self.assertIsNone(m) 83 84 85 def test_datetime_before_1900(self): 86 # same as before but with a date before 1900 87 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 88 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 89 s = xmlrpclib.dumps((dt,)) 90 91 result, m = xmlrpclib.loads(s, use_builtin_types=True) 92 (newdt,) = result 93 self.assertEqual(newdt, dt) 94 self.assertIs(type(newdt), datetime.datetime) 95 self.assertIsNone(m) 96 97 result, m = xmlrpclib.loads(s, use_builtin_types=False) 98 (newdt,) = result 99 self.assertEqual(newdt, dt) 100 self.assertIs(type(newdt), xmlrpclib.DateTime) 101 self.assertIsNone(m) 102 103 def test_bug_1164912 (self): 104 d = xmlrpclib.DateTime() 105 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 106 methodresponse=True)) 107 self.assertIsInstance(new_d.value, str) 108 109 # Check that the output of dumps() is still an 8-bit string 110 s = xmlrpclib.dumps((new_d,), methodresponse=True) 111 self.assertIsInstance(s, str) 112 113 def test_newstyle_class(self): 114 class T(object): 115 pass 116 t = T() 117 t.x = 100 118 t.y = "Hello" 119 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 120 self.assertEqual(t2, t.__dict__) 121 122 def test_dump_big_long(self): 123 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 124 125 def test_dump_bad_dict(self): 126 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 127 128 def test_dump_recursive_seq(self): 129 l = [1,2,3] 130 t = [3,4,5,l] 131 l.append(t) 132 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 133 134 def test_dump_recursive_dict(self): 135 d = {'1':1, '2':1} 136 t = {'3':3, 'd':d} 137 d['t'] = t 138 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 139 140 def test_dump_big_int(self): 141 if sys.maxsize > 2**31-1: 142 self.assertRaises(OverflowError, xmlrpclib.dumps, 143 (int(2**34),)) 144 145 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 146 self.assertRaises(OverflowError, xmlrpclib.dumps, 147 (xmlrpclib.MAXINT+1,)) 148 self.assertRaises(OverflowError, xmlrpclib.dumps, 149 (xmlrpclib.MININT-1,)) 150 151 def dummy_write(s): 152 pass 153 154 m = xmlrpclib.Marshaller() 155 m.dump_int(xmlrpclib.MAXINT, dummy_write) 156 m.dump_int(xmlrpclib.MININT, dummy_write) 157 self.assertRaises(OverflowError, m.dump_int, 158 xmlrpclib.MAXINT+1, dummy_write) 159 self.assertRaises(OverflowError, m.dump_int, 160 xmlrpclib.MININT-1, dummy_write) 161 162 def test_dump_double(self): 163 xmlrpclib.dumps((float(2 ** 34),)) 164 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 165 float(xmlrpclib.MININT))) 166 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 167 float(xmlrpclib.MININT - 42))) 168 169 def dummy_write(s): 170 pass 171 172 m = xmlrpclib.Marshaller() 173 m.dump_double(xmlrpclib.MAXINT, dummy_write) 174 m.dump_double(xmlrpclib.MININT, dummy_write) 175 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 176 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 177 178 def test_dump_none(self): 179 value = alist + [None] 180 arg1 = (alist + [None],) 181 strg = xmlrpclib.dumps(arg1, allow_none=True) 182 self.assertEqual(value, 183 xmlrpclib.loads(strg)[0][0]) 184 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 185 186 def test_dump_encoding(self): 187 value = {'key\u20ac\xa4': 188 'value\u20ac\xa4'} 189 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 190 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 191 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 192 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 193 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 194 195 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 196 methodresponse=True) 197 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 198 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 199 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 200 201 methodname = 'method\u20ac\xa4' 202 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 203 methodname=methodname) 204 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 205 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 206 207 def test_dump_bytes(self): 208 sample = b"my dog has fleas" 209 self.assertEqual(sample, xmlrpclib.Binary(sample)) 210 for type_ in bytes, bytearray, xmlrpclib.Binary: 211 value = type_(sample) 212 s = xmlrpclib.dumps((value,)) 213 214 result, m = xmlrpclib.loads(s, use_builtin_types=True) 215 (newvalue,) = result 216 self.assertEqual(newvalue, sample) 217 self.assertIs(type(newvalue), bytes) 218 self.assertIsNone(m) 219 220 result, m = xmlrpclib.loads(s, use_builtin_types=False) 221 (newvalue,) = result 222 self.assertEqual(newvalue, sample) 223 self.assertIs(type(newvalue), xmlrpclib.Binary) 224 self.assertIsNone(m) 225 226 def test_loads_unsupported(self): 227 ResponseError = xmlrpclib.ResponseError 228 data = '<params><param><value><spam/></value></param></params>' 229 self.assertRaises(ResponseError, xmlrpclib.loads, data) 230 data = ('<params><param><value><array>' 231 '<value><spam/></value>' 232 '</array></value></param></params>') 233 self.assertRaises(ResponseError, xmlrpclib.loads, data) 234 data = ('<params><param><value><struct>' 235 '<member><name>a</name><value><spam/></value></member>' 236 '<member><name>b</name><value><spam/></value></member>' 237 '</struct></value></param></params>') 238 self.assertRaises(ResponseError, xmlrpclib.loads, data) 239 240 def check_loads(self, s, value, **kwargs): 241 dump = '<params><param><value>%s</value></param></params>' % s 242 result, m = xmlrpclib.loads(dump, **kwargs) 243 (newvalue,) = result 244 self.assertEqual(newvalue, value) 245 self.assertIs(type(newvalue), type(value)) 246 self.assertIsNone(m) 247 248 def test_load_standard_types(self): 249 check = self.check_loads 250 check('string', 'string') 251 check('<string>string</string>', 'string') 252 check('<string> string</string>', ' string') 253 check('<int>2056183947</int>', 2056183947) 254 check('<int>-2056183947</int>', -2056183947) 255 check('<i4>2056183947</i4>', 2056183947) 256 check('<double>46093.78125</double>', 46093.78125) 257 check('<boolean>0</boolean>', False) 258 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 259 xmlrpclib.Binary(b'\x00byte string\xff')) 260 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 261 b'\x00byte string\xff', use_builtin_types=True) 262 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 263 xmlrpclib.DateTime('20050210T11:41:23')) 264 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 265 datetime.datetime(2005, 2, 10, 11, 41, 23), 266 use_builtin_types=True) 267 check('<array><data>' 268 '<value><int>1</int></value><value><int>2</int></value>' 269 '</data></array>', [1, 2]) 270 check('<struct>' 271 '<member><name>b</name><value><int>2</int></value></member>' 272 '<member><name>a</name><value><int>1</int></value></member>' 273 '</struct>', {'a': 1, 'b': 2}) 274 275 def test_load_extension_types(self): 276 check = self.check_loads 277 check('<nil/>', None) 278 check('<ex:nil/>', None) 279 check('<i1>205</i1>', 205) 280 check('<i2>20561</i2>', 20561) 281 check('<i8>9876543210</i8>', 9876543210) 282 check('<biginteger>98765432100123456789</biginteger>', 283 98765432100123456789) 284 check('<float>93.78125</float>', 93.78125) 285 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 286 decimal.Decimal('9876543210.0123456789')) 287 288 def test_limit_int(self): 289 check = self.check_loads 290 maxdigits = 5000 291 with support.adjust_int_max_str_digits(maxdigits): 292 s = '1' * (maxdigits + 1) 293 with self.assertRaises(ValueError): 294 check(f'<int>{s}</int>', None) 295 with self.assertRaises(ValueError): 296 check(f'<biginteger>{s}</biginteger>', None) 297 298 def test_get_host_info(self): 299 # see bug #3613, this raised a TypeError 300 transp = xmlrpc.client.Transport() 301 self.assertEqual(transp.get_host_info("user@host.tld"), 302 ('host.tld', 303 [('Authorization', 'Basic dXNlcg==')], {})) 304 305 def test_ssl_presence(self): 306 try: 307 import ssl 308 except ImportError: 309 has_ssl = False 310 else: 311 has_ssl = True 312 try: 313 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 314 except NotImplementedError: 315 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 316 except OSError: 317 self.assertTrue(has_ssl) 318 319 def test_keepalive_disconnect(self): 320 class RequestHandler(http.server.BaseHTTPRequestHandler): 321 protocol_version = "HTTP/1.1" 322 handled = False 323 324 def do_POST(self): 325 length = int(self.headers.get("Content-Length")) 326 self.rfile.read(length) 327 if self.handled: 328 self.close_connection = True 329 return 330 response = xmlrpclib.dumps((5,), methodresponse=True) 331 response = response.encode() 332 self.send_response(http.HTTPStatus.OK) 333 self.send_header("Content-Length", len(response)) 334 self.end_headers() 335 self.wfile.write(response) 336 self.handled = True 337 self.close_connection = False 338 339 def log_message(self, format, *args): 340 # don't clobber sys.stderr 341 pass 342 343 def run_server(): 344 server.socket.settimeout(float(1)) # Don't hang if client fails 345 server.handle_request() # First request and attempt at second 346 server.handle_request() # Retried second request 347 348 server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler) 349 self.addCleanup(server.server_close) 350 thread = threading.Thread(target=run_server) 351 thread.start() 352 self.addCleanup(thread.join) 353 url = "http://{}:{}/".format(*server.server_address) 354 with xmlrpclib.ServerProxy(url) as p: 355 self.assertEqual(p.method(), 5) 356 self.assertEqual(p.method(), 5) 357 358 359class SimpleXMLRPCDispatcherTestCase(unittest.TestCase): 360 class DispatchExc(Exception): 361 """Raised inside the dispatched functions when checking for 362 chained exceptions""" 363 364 def test_call_registered_func(self): 365 """Calls explicitly registered function""" 366 # Makes sure any exception raised inside the function has no other 367 # exception chained to it 368 369 exp_params = 1, 2, 3 370 371 def dispatched_func(*params): 372 raise self.DispatchExc(params) 373 374 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 375 dispatcher.register_function(dispatched_func) 376 with self.assertRaises(self.DispatchExc) as exc_ctx: 377 dispatcher._dispatch('dispatched_func', exp_params) 378 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 379 self.assertIsNone(exc_ctx.exception.__cause__) 380 self.assertIsNone(exc_ctx.exception.__context__) 381 382 def test_call_instance_func(self): 383 """Calls a registered instance attribute as a function""" 384 # Makes sure any exception raised inside the function has no other 385 # exception chained to it 386 387 exp_params = 1, 2, 3 388 389 class DispatchedClass: 390 def dispatched_func(self, *params): 391 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params) 392 393 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 394 dispatcher.register_instance(DispatchedClass()) 395 with self.assertRaises(self.DispatchExc) as exc_ctx: 396 dispatcher._dispatch('dispatched_func', exp_params) 397 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 398 self.assertIsNone(exc_ctx.exception.__cause__) 399 self.assertIsNone(exc_ctx.exception.__context__) 400 401 def test_call_dispatch_func(self): 402 """Calls the registered instance's `_dispatch` function""" 403 # Makes sure any exception raised inside the function has no other 404 # exception chained to it 405 406 exp_method = 'method' 407 exp_params = 1, 2, 3 408 409 class TestInstance: 410 def _dispatch(self, method, params): 411 raise SimpleXMLRPCDispatcherTestCase.DispatchExc( 412 method, params) 413 414 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 415 dispatcher.register_instance(TestInstance()) 416 with self.assertRaises(self.DispatchExc) as exc_ctx: 417 dispatcher._dispatch(exp_method, exp_params) 418 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params)) 419 self.assertIsNone(exc_ctx.exception.__cause__) 420 self.assertIsNone(exc_ctx.exception.__context__) 421 422 def test_registered_func_is_none(self): 423 """Calls explicitly registered function which is None""" 424 425 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 426 dispatcher.register_function(None, name='method') 427 with self.assertRaisesRegex(Exception, 'method'): 428 dispatcher._dispatch('method', ('param',)) 429 430 def test_instance_has_no_func(self): 431 """Attempts to call nonexistent function on a registered instance""" 432 433 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 434 dispatcher.register_instance(object()) 435 with self.assertRaisesRegex(Exception, 'method'): 436 dispatcher._dispatch('method', ('param',)) 437 438 def test_cannot_locate_func(self): 439 """Calls a function that the dispatcher cannot locate""" 440 441 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 442 with self.assertRaisesRegex(Exception, 'method'): 443 dispatcher._dispatch('method', ('param',)) 444 445 446class HelperTestCase(unittest.TestCase): 447 def test_escape(self): 448 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 449 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 450 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 451 452class FaultTestCase(unittest.TestCase): 453 def test_repr(self): 454 f = xmlrpclib.Fault(42, 'Test Fault') 455 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 456 self.assertEqual(repr(f), str(f)) 457 458 def test_dump_fault(self): 459 f = xmlrpclib.Fault(42, 'Test Fault') 460 s = xmlrpclib.dumps((f,)) 461 (newf,), m = xmlrpclib.loads(s) 462 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 463 self.assertEqual(m, None) 464 465 s = xmlrpclib.Marshaller().dumps(f) 466 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 467 468 def test_dotted_attribute(self): 469 # this will raise AttributeError because code don't want us to use 470 # private methods 471 self.assertRaises(AttributeError, 472 xmlrpc.server.resolve_dotted_attribute, str, '__add') 473 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 474 475class DateTimeTestCase(unittest.TestCase): 476 def test_default(self): 477 with mock.patch('time.localtime') as localtime_mock: 478 time_struct = time.struct_time( 479 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 480 localtime_mock.return_value = time_struct 481 localtime = time.localtime() 482 t = xmlrpclib.DateTime() 483 self.assertEqual(str(t), 484 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 485 486 def test_time(self): 487 d = 1181399930.036952 488 t = xmlrpclib.DateTime(d) 489 self.assertEqual(str(t), 490 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 491 492 def test_time_tuple(self): 493 d = (2007,6,9,10,38,50,5,160,0) 494 t = xmlrpclib.DateTime(d) 495 self.assertEqual(str(t), '20070609T10:38:50') 496 497 def test_time_struct(self): 498 d = time.localtime(1181399930.036952) 499 t = xmlrpclib.DateTime(d) 500 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 501 502 def test_datetime_datetime(self): 503 d = datetime.datetime(2007,1,2,3,4,5) 504 t = xmlrpclib.DateTime(d) 505 self.assertEqual(str(t), '20070102T03:04:05') 506 507 def test_repr(self): 508 d = datetime.datetime(2007,1,2,3,4,5) 509 t = xmlrpclib.DateTime(d) 510 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 511 self.assertEqual(repr(t), val) 512 513 def test_decode(self): 514 d = ' 20070908T07:11:13 ' 515 t1 = xmlrpclib.DateTime() 516 t1.decode(d) 517 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 518 self.assertEqual(t1, tref) 519 520 t2 = xmlrpclib._datetime(d) 521 self.assertEqual(t2, tref) 522 523 def test_comparison(self): 524 now = datetime.datetime.now() 525 dtime = xmlrpclib.DateTime(now.timetuple()) 526 527 # datetime vs. DateTime 528 self.assertTrue(dtime == now) 529 self.assertTrue(now == dtime) 530 then = now + datetime.timedelta(seconds=4) 531 self.assertTrue(then >= dtime) 532 self.assertTrue(dtime < then) 533 534 # str vs. DateTime 535 dstr = now.strftime("%Y%m%dT%H:%M:%S") 536 self.assertTrue(dtime == dstr) 537 self.assertTrue(dstr == dtime) 538 dtime_then = xmlrpclib.DateTime(then.timetuple()) 539 self.assertTrue(dtime_then >= dstr) 540 self.assertTrue(dstr < dtime_then) 541 542 # some other types 543 dbytes = dstr.encode('ascii') 544 dtuple = now.timetuple() 545 self.assertFalse(dtime == 1970) 546 self.assertTrue(dtime != dbytes) 547 self.assertFalse(dtime == bytearray(dbytes)) 548 self.assertTrue(dtime != dtuple) 549 with self.assertRaises(TypeError): 550 dtime < float(1970) 551 with self.assertRaises(TypeError): 552 dtime > dbytes 553 with self.assertRaises(TypeError): 554 dtime <= bytearray(dbytes) 555 with self.assertRaises(TypeError): 556 dtime >= dtuple 557 558 self.assertTrue(dtime == ALWAYS_EQ) 559 self.assertFalse(dtime != ALWAYS_EQ) 560 self.assertTrue(dtime < LARGEST) 561 self.assertFalse(dtime > LARGEST) 562 self.assertTrue(dtime <= LARGEST) 563 self.assertFalse(dtime >= LARGEST) 564 self.assertFalse(dtime < SMALLEST) 565 self.assertTrue(dtime > SMALLEST) 566 self.assertFalse(dtime <= SMALLEST) 567 self.assertTrue(dtime >= SMALLEST) 568 569 570class BinaryTestCase(unittest.TestCase): 571 572 # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" 573 # for now (i.e. interpreting the binary data as Latin-1-encoded 574 # text). But this feels very unsatisfactory. Perhaps we should 575 # only define repr(), and return r"Binary(b'\xff')" instead? 576 577 def test_default(self): 578 t = xmlrpclib.Binary() 579 self.assertEqual(str(t), '') 580 581 def test_string(self): 582 d = b'\x01\x02\x03abc123\xff\xfe' 583 t = xmlrpclib.Binary(d) 584 self.assertEqual(str(t), str(d, "latin-1")) 585 586 def test_decode(self): 587 d = b'\x01\x02\x03abc123\xff\xfe' 588 de = base64.encodebytes(d) 589 t1 = xmlrpclib.Binary() 590 t1.decode(de) 591 self.assertEqual(str(t1), str(d, "latin-1")) 592 593 t2 = xmlrpclib._binary(de) 594 self.assertEqual(str(t2), str(d, "latin-1")) 595 596 597ADDR = PORT = URL = None 598 599# The evt is set twice. First when the server is ready to serve. 600# Second when the server has been shutdown. The user must clear 601# the event after it has been set the first time to catch the second set. 602def http_server(evt, numrequests, requestHandler=None, encoding=None): 603 class TestInstanceClass: 604 def div(self, x, y): 605 return x // y 606 607 def _methodHelp(self, name): 608 if name == 'div': 609 return 'This is the div function' 610 611 class Fixture: 612 @staticmethod 613 def getData(): 614 return '42' 615 616 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 617 def get_request(self): 618 # Ensure the socket is always non-blocking. On Linux, socket 619 # attributes are not inherited like they are on *BSD and Windows. 620 s, port = self.socket.accept() 621 s.setblocking(True) 622 return s, port 623 624 if not requestHandler: 625 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 626 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 627 encoding=encoding, 628 logRequests=False, bind_and_activate=False) 629 try: 630 serv.server_bind() 631 global ADDR, PORT, URL 632 ADDR, PORT = serv.socket.getsockname() 633 #connect to IP address directly. This avoids socket.create_connection() 634 #trying to connect to "localhost" using all address families, which 635 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 636 #on AF_INET only. 637 URL = "http://%s:%d"%(ADDR, PORT) 638 serv.server_activate() 639 serv.register_introspection_functions() 640 serv.register_multicall_functions() 641 serv.register_function(pow) 642 serv.register_function(lambda x: x, 'têšt') 643 @serv.register_function 644 def my_function(): 645 '''This is my function''' 646 return True 647 @serv.register_function(name='add') 648 def _(x, y): 649 return x + y 650 testInstance = TestInstanceClass() 651 serv.register_instance(testInstance, allow_dotted_names=True) 652 evt.set() 653 654 # handle up to 'numrequests' requests 655 while numrequests > 0: 656 serv.handle_request() 657 numrequests -= 1 658 659 except socket.timeout: 660 pass 661 finally: 662 serv.socket.close() 663 PORT = None 664 evt.set() 665 666def http_multi_server(evt, numrequests, requestHandler=None): 667 class TestInstanceClass: 668 def div(self, x, y): 669 return x // y 670 671 def _methodHelp(self, name): 672 if name == 'div': 673 return 'This is the div function' 674 675 def my_function(): 676 '''This is my function''' 677 return True 678 679 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 680 def get_request(self): 681 # Ensure the socket is always non-blocking. On Linux, socket 682 # attributes are not inherited like they are on *BSD and Windows. 683 s, port = self.socket.accept() 684 s.setblocking(True) 685 return s, port 686 687 if not requestHandler: 688 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 689 class MyRequestHandler(requestHandler): 690 rpc_paths = [] 691 692 class BrokenDispatcher: 693 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 694 raise RuntimeError("broken dispatcher") 695 696 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 697 logRequests=False, bind_and_activate=False) 698 serv.socket.settimeout(3) 699 serv.server_bind() 700 try: 701 global ADDR, PORT, URL 702 ADDR, PORT = serv.socket.getsockname() 703 #connect to IP address directly. This avoids socket.create_connection() 704 #trying to connect to "localhost" using all address families, which 705 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 706 #on AF_INET only. 707 URL = "http://%s:%d"%(ADDR, PORT) 708 serv.server_activate() 709 paths = ["/foo", "/foo/bar"] 710 for path in paths: 711 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 712 d.register_introspection_functions() 713 d.register_multicall_functions() 714 serv.get_dispatcher(paths[0]).register_function(pow) 715 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 716 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 717 evt.set() 718 719 # handle up to 'numrequests' requests 720 while numrequests > 0: 721 serv.handle_request() 722 numrequests -= 1 723 724 except socket.timeout: 725 pass 726 finally: 727 serv.socket.close() 728 PORT = None 729 evt.set() 730 731# This function prevents errors like: 732# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 733def is_unavailable_exception(e): 734 '''Returns True if the given ProtocolError is the product of a server-side 735 exception caused by the 'temporarily unavailable' response sometimes 736 given by operations on non-blocking sockets.''' 737 738 # sometimes we get a -1 error code and/or empty headers 739 try: 740 if e.errcode == -1 or e.headers is None: 741 return True 742 exc_mess = e.headers.get('X-exception') 743 except AttributeError: 744 # Ignore OSErrors here. 745 exc_mess = str(e) 746 747 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 748 return True 749 750def make_request_and_skipIf(condition, reason): 751 # If we skip the test, we have to make a request because 752 # the server created in setUp blocks expecting one to come in. 753 if not condition: 754 return lambda func: func 755 def decorator(func): 756 def make_request_and_skip(self): 757 try: 758 xmlrpclib.ServerProxy(URL).my_function() 759 except (xmlrpclib.ProtocolError, OSError) as e: 760 if not is_unavailable_exception(e): 761 raise 762 raise unittest.SkipTest(reason) 763 return make_request_and_skip 764 return decorator 765 766class BaseServerTestCase(unittest.TestCase): 767 requestHandler = None 768 request_count = 1 769 threadFunc = staticmethod(http_server) 770 771 def setUp(self): 772 # enable traceback reporting 773 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 774 775 self.evt = threading.Event() 776 # start server thread to handle requests 777 serv_args = (self.evt, self.request_count, self.requestHandler) 778 thread = threading.Thread(target=self.threadFunc, args=serv_args) 779 thread.start() 780 self.addCleanup(thread.join) 781 782 # wait for the server to be ready 783 self.evt.wait() 784 self.evt.clear() 785 786 def tearDown(self): 787 # wait on the server thread to terminate 788 self.evt.wait() 789 790 # disable traceback reporting 791 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 792 793class SimpleServerTestCase(BaseServerTestCase): 794 def test_simple1(self): 795 try: 796 p = xmlrpclib.ServerProxy(URL) 797 self.assertEqual(p.pow(6,8), 6**8) 798 except (xmlrpclib.ProtocolError, OSError) as e: 799 # ignore failures due to non-blocking socket 'unavailable' errors 800 if not is_unavailable_exception(e): 801 # protocol error; provide additional information in test output 802 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 803 804 def test_nonascii(self): 805 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 806 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 807 try: 808 p = xmlrpclib.ServerProxy(URL) 809 self.assertEqual(p.add(start_string, end_string), 810 start_string + end_string) 811 except (xmlrpclib.ProtocolError, OSError) as e: 812 # ignore failures due to non-blocking socket 'unavailable' errors 813 if not is_unavailable_exception(e): 814 # protocol error; provide additional information in test output 815 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 816 817 def test_client_encoding(self): 818 start_string = '\u20ac' 819 end_string = '\xa4' 820 821 try: 822 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 823 self.assertEqual(p.add(start_string, end_string), 824 start_string + end_string) 825 except (xmlrpclib.ProtocolError, socket.error) as e: 826 # ignore failures due to non-blocking socket unavailable errors. 827 if not is_unavailable_exception(e): 828 # protocol error; provide additional information in test output 829 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 830 831 def test_nonascii_methodname(self): 832 try: 833 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 834 self.assertEqual(p.têšt(42), 42) 835 except (xmlrpclib.ProtocolError, socket.error) as e: 836 # ignore failures due to non-blocking socket unavailable errors. 837 if not is_unavailable_exception(e): 838 # protocol error; provide additional information in test output 839 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 840 841 def test_404(self): 842 # send POST with http.client, it should return 404 header and 843 # 'Not Found' message. 844 with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: 845 conn.request('POST', '/this-is-not-valid') 846 response = conn.getresponse() 847 848 self.assertEqual(response.status, 404) 849 self.assertEqual(response.reason, 'Not Found') 850 851 def test_introspection1(self): 852 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 853 'system.listMethods', 'system.methodHelp', 854 'system.methodSignature', 'system.multicall', 855 'Fixture']) 856 try: 857 p = xmlrpclib.ServerProxy(URL) 858 meth = p.system.listMethods() 859 self.assertEqual(set(meth), expected_methods) 860 except (xmlrpclib.ProtocolError, OSError) as e: 861 # ignore failures due to non-blocking socket 'unavailable' errors 862 if not is_unavailable_exception(e): 863 # protocol error; provide additional information in test output 864 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 865 866 867 def test_introspection2(self): 868 try: 869 # test _methodHelp() 870 p = xmlrpclib.ServerProxy(URL) 871 divhelp = p.system.methodHelp('div') 872 self.assertEqual(divhelp, 'This is the div function') 873 except (xmlrpclib.ProtocolError, OSError) as e: 874 # ignore failures due to non-blocking socket 'unavailable' errors 875 if not is_unavailable_exception(e): 876 # protocol error; provide additional information in test output 877 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 878 879 @make_request_and_skipIf(sys.flags.optimize >= 2, 880 "Docstrings are omitted with -O2 and above") 881 def test_introspection3(self): 882 try: 883 # test native doc 884 p = xmlrpclib.ServerProxy(URL) 885 myfunction = p.system.methodHelp('my_function') 886 self.assertEqual(myfunction, 'This is my function') 887 except (xmlrpclib.ProtocolError, OSError) as e: 888 # ignore failures due to non-blocking socket 'unavailable' errors 889 if not is_unavailable_exception(e): 890 # protocol error; provide additional information in test output 891 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 892 893 def test_introspection4(self): 894 # the SimpleXMLRPCServer doesn't support signatures, but 895 # at least check that we can try making the call 896 try: 897 p = xmlrpclib.ServerProxy(URL) 898 divsig = p.system.methodSignature('div') 899 self.assertEqual(divsig, 'signatures not supported') 900 except (xmlrpclib.ProtocolError, OSError) as e: 901 # ignore failures due to non-blocking socket 'unavailable' errors 902 if not is_unavailable_exception(e): 903 # protocol error; provide additional information in test output 904 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 905 906 def test_multicall(self): 907 try: 908 p = xmlrpclib.ServerProxy(URL) 909 multicall = xmlrpclib.MultiCall(p) 910 multicall.add(2,3) 911 multicall.pow(6,8) 912 multicall.div(127,42) 913 add_result, pow_result, div_result = multicall() 914 self.assertEqual(add_result, 2+3) 915 self.assertEqual(pow_result, 6**8) 916 self.assertEqual(div_result, 127//42) 917 except (xmlrpclib.ProtocolError, OSError) as e: 918 # ignore failures due to non-blocking socket 'unavailable' errors 919 if not is_unavailable_exception(e): 920 # protocol error; provide additional information in test output 921 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 922 923 def test_non_existing_multicall(self): 924 try: 925 p = xmlrpclib.ServerProxy(URL) 926 multicall = xmlrpclib.MultiCall(p) 927 multicall.this_is_not_exists() 928 result = multicall() 929 930 # result.results contains; 931 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 932 # 'method "this_is_not_exists" is not supported'>}] 933 934 self.assertEqual(result.results[0]['faultCode'], 1) 935 self.assertEqual(result.results[0]['faultString'], 936 '<class \'Exception\'>:method "this_is_not_exists" ' 937 'is not supported') 938 except (xmlrpclib.ProtocolError, OSError) as e: 939 # ignore failures due to non-blocking socket 'unavailable' errors 940 if not is_unavailable_exception(e): 941 # protocol error; provide additional information in test output 942 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 943 944 def test_dotted_attribute(self): 945 # Raises an AttributeError because private methods are not allowed. 946 self.assertRaises(AttributeError, 947 xmlrpc.server.resolve_dotted_attribute, str, '__add') 948 949 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 950 # Get the test to run faster by sending a request with test_simple1. 951 # This avoids waiting for the socket timeout. 952 self.test_simple1() 953 954 def test_allow_dotted_names_true(self): 955 # XXX also need allow_dotted_names_false test. 956 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 957 data = server.Fixture.getData() 958 self.assertEqual(data, '42') 959 960 def test_unicode_host(self): 961 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 962 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 963 964 def test_partial_post(self): 965 # Check that a partial POST doesn't make the server loop: issue #14001. 966 with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn: 967 conn.send('POST /RPC2 HTTP/1.0\r\n' 968 'Content-Length: 100\r\n\r\n' 969 'bye HTTP/1.1\r\n' 970 f'Host: {ADDR}:{PORT}\r\n' 971 'Accept-Encoding: identity\r\n' 972 'Content-Length: 0\r\n\r\n'.encode('ascii')) 973 974 def test_context_manager(self): 975 with xmlrpclib.ServerProxy(URL) as server: 976 server.add(2, 3) 977 self.assertNotEqual(server('transport')._connection, 978 (None, None)) 979 self.assertEqual(server('transport')._connection, 980 (None, None)) 981 982 def test_context_manager_method_error(self): 983 try: 984 with xmlrpclib.ServerProxy(URL) as server: 985 server.add(2, "a") 986 except xmlrpclib.Fault: 987 pass 988 self.assertEqual(server('transport')._connection, 989 (None, None)) 990 991 992class SimpleServerEncodingTestCase(BaseServerTestCase): 993 @staticmethod 994 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 995 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 996 997 def test_server_encoding(self): 998 start_string = '\u20ac' 999 end_string = '\xa4' 1000 1001 try: 1002 p = xmlrpclib.ServerProxy(URL) 1003 self.assertEqual(p.add(start_string, end_string), 1004 start_string + end_string) 1005 except (xmlrpclib.ProtocolError, socket.error) as e: 1006 # ignore failures due to non-blocking socket unavailable errors. 1007 if not is_unavailable_exception(e): 1008 # protocol error; provide additional information in test output 1009 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1010 1011 1012class MultiPathServerTestCase(BaseServerTestCase): 1013 threadFunc = staticmethod(http_multi_server) 1014 request_count = 2 1015 def test_path1(self): 1016 p = xmlrpclib.ServerProxy(URL+"/foo") 1017 self.assertEqual(p.pow(6,8), 6**8) 1018 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1019 1020 def test_path2(self): 1021 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 1022 self.assertEqual(p.add(6,8), 6+8) 1023 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 1024 1025 def test_path3(self): 1026 p = xmlrpclib.ServerProxy(URL+"/is/broken") 1027 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1028 1029#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1030#does indeed serve subsequent requests on the same connection 1031class BaseKeepaliveServerTestCase(BaseServerTestCase): 1032 #a request handler that supports keep-alive and logs requests into a 1033 #class variable 1034 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1035 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1036 protocol_version = 'HTTP/1.1' 1037 myRequests = [] 1038 def handle(self): 1039 self.myRequests.append([]) 1040 self.reqidx = len(self.myRequests)-1 1041 return self.parentClass.handle(self) 1042 def handle_one_request(self): 1043 result = self.parentClass.handle_one_request(self) 1044 self.myRequests[self.reqidx].append(self.raw_requestline) 1045 return result 1046 1047 requestHandler = RequestHandler 1048 def setUp(self): 1049 #clear request log 1050 self.RequestHandler.myRequests = [] 1051 return BaseServerTestCase.setUp(self) 1052 1053#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1054#does indeed serve subsequent requests on the same connection 1055class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 1056 def test_two(self): 1057 p = xmlrpclib.ServerProxy(URL) 1058 #do three requests. 1059 self.assertEqual(p.pow(6,8), 6**8) 1060 self.assertEqual(p.pow(6,8), 6**8) 1061 self.assertEqual(p.pow(6,8), 6**8) 1062 p("close")() 1063 1064 #they should have all been handled by a single request handler 1065 self.assertEqual(len(self.RequestHandler.myRequests), 1) 1066 1067 #check that we did at least two (the third may be pending append 1068 #due to thread scheduling) 1069 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1070 1071 1072#test special attribute access on the serverproxy, through the __call__ 1073#function. 1074class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 1075 #ask for two keepalive requests to be handled. 1076 request_count=2 1077 1078 def test_close(self): 1079 p = xmlrpclib.ServerProxy(URL) 1080 #do some requests with close. 1081 self.assertEqual(p.pow(6,8), 6**8) 1082 self.assertEqual(p.pow(6,8), 6**8) 1083 self.assertEqual(p.pow(6,8), 6**8) 1084 p("close")() #this should trigger a new keep-alive request 1085 self.assertEqual(p.pow(6,8), 6**8) 1086 self.assertEqual(p.pow(6,8), 6**8) 1087 self.assertEqual(p.pow(6,8), 6**8) 1088 p("close")() 1089 1090 #they should have all been two request handlers, each having logged at least 1091 #two complete requests 1092 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1093 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1094 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 1095 1096 1097 def test_transport(self): 1098 p = xmlrpclib.ServerProxy(URL) 1099 #do some requests with close. 1100 self.assertEqual(p.pow(6,8), 6**8) 1101 p("transport").close() #same as above, really. 1102 self.assertEqual(p.pow(6,8), 6**8) 1103 p("close")() 1104 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1105 1106#A test case that verifies that gzip encoding works in both directions 1107#(for a request and the response) 1108@unittest.skipIf(gzip is None, 'requires gzip') 1109class GzipServerTestCase(BaseServerTestCase): 1110 #a request handler that supports keep-alive and logs requests into a 1111 #class variable 1112 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1113 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1114 protocol_version = 'HTTP/1.1' 1115 1116 def do_POST(self): 1117 #store content of last request in class 1118 self.__class__.content_length = int(self.headers["content-length"]) 1119 return self.parentClass.do_POST(self) 1120 requestHandler = RequestHandler 1121 1122 class Transport(xmlrpclib.Transport): 1123 #custom transport, stores the response length for our perusal 1124 fake_gzip = False 1125 def parse_response(self, response): 1126 self.response_length=int(response.getheader("content-length", 0)) 1127 return xmlrpclib.Transport.parse_response(self, response) 1128 1129 def send_content(self, connection, body): 1130 if self.fake_gzip: 1131 #add a lone gzip header to induce decode error remotely 1132 connection.putheader("Content-Encoding", "gzip") 1133 return xmlrpclib.Transport.send_content(self, connection, body) 1134 1135 def setUp(self): 1136 BaseServerTestCase.setUp(self) 1137 1138 def test_gzip_request(self): 1139 t = self.Transport() 1140 t.encode_threshold = None 1141 p = xmlrpclib.ServerProxy(URL, transport=t) 1142 self.assertEqual(p.pow(6,8), 6**8) 1143 a = self.RequestHandler.content_length 1144 t.encode_threshold = 0 #turn on request encoding 1145 self.assertEqual(p.pow(6,8), 6**8) 1146 b = self.RequestHandler.content_length 1147 self.assertTrue(a>b) 1148 p("close")() 1149 1150 def test_bad_gzip_request(self): 1151 t = self.Transport() 1152 t.encode_threshold = None 1153 t.fake_gzip = True 1154 p = xmlrpclib.ServerProxy(URL, transport=t) 1155 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1156 re.compile(r"\b400\b")) 1157 with cm: 1158 p.pow(6, 8) 1159 p("close")() 1160 1161 def test_gzip_response(self): 1162 t = self.Transport() 1163 p = xmlrpclib.ServerProxy(URL, transport=t) 1164 old = self.requestHandler.encode_threshold 1165 self.requestHandler.encode_threshold = None #no encoding 1166 self.assertEqual(p.pow(6,8), 6**8) 1167 a = t.response_length 1168 self.requestHandler.encode_threshold = 0 #always encode 1169 self.assertEqual(p.pow(6,8), 6**8) 1170 p("close")() 1171 b = t.response_length 1172 self.requestHandler.encode_threshold = old 1173 self.assertTrue(a>b) 1174 1175 1176@unittest.skipIf(gzip is None, 'requires gzip') 1177class GzipUtilTestCase(unittest.TestCase): 1178 1179 def test_gzip_decode_limit(self): 1180 max_gzip_decode = 20 * 1024 * 1024 1181 data = b'\0' * max_gzip_decode 1182 encoded = xmlrpclib.gzip_encode(data) 1183 decoded = xmlrpclib.gzip_decode(encoded) 1184 self.assertEqual(len(decoded), max_gzip_decode) 1185 1186 data = b'\0' * (max_gzip_decode + 1) 1187 encoded = xmlrpclib.gzip_encode(data) 1188 1189 with self.assertRaisesRegex(ValueError, 1190 "max gzipped payload length exceeded"): 1191 xmlrpclib.gzip_decode(encoded) 1192 1193 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1194 1195 1196class HeadersServerTestCase(BaseServerTestCase): 1197 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1198 test_headers = None 1199 1200 def do_POST(self): 1201 self.__class__.test_headers = self.headers 1202 return super().do_POST() 1203 requestHandler = RequestHandler 1204 standard_headers = [ 1205 'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent', 1206 'Content-Length'] 1207 1208 def setUp(self): 1209 self.RequestHandler.test_headers = None 1210 return super().setUp() 1211 1212 def assertContainsAdditionalHeaders(self, headers, additional): 1213 expected_keys = sorted(self.standard_headers + list(additional.keys())) 1214 self.assertListEqual(sorted(headers.keys()), expected_keys) 1215 1216 for key, value in additional.items(): 1217 self.assertEqual(headers.get(key), value) 1218 1219 def test_header(self): 1220 p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')]) 1221 self.assertEqual(p.pow(6, 8), 6**8) 1222 1223 headers = self.RequestHandler.test_headers 1224 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1225 1226 def test_header_many(self): 1227 p = xmlrpclib.ServerProxy( 1228 URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')]) 1229 self.assertEqual(p.pow(6, 8), 6**8) 1230 1231 headers = self.RequestHandler.test_headers 1232 self.assertContainsAdditionalHeaders( 1233 headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'}) 1234 1235 def test_header_empty(self): 1236 p = xmlrpclib.ServerProxy(URL, headers=[]) 1237 self.assertEqual(p.pow(6, 8), 6**8) 1238 1239 headers = self.RequestHandler.test_headers 1240 self.assertContainsAdditionalHeaders(headers, {}) 1241 1242 def test_header_tuple(self): 1243 p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),)) 1244 self.assertEqual(p.pow(6, 8), 6**8) 1245 1246 headers = self.RequestHandler.test_headers 1247 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1248 1249 def test_header_items(self): 1250 p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items()) 1251 self.assertEqual(p.pow(6, 8), 6**8) 1252 1253 headers = self.RequestHandler.test_headers 1254 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1255 1256 1257#Test special attributes of the ServerProxy object 1258class ServerProxyTestCase(unittest.TestCase): 1259 def setUp(self): 1260 unittest.TestCase.setUp(self) 1261 # Actual value of the URL doesn't matter if it is a string in 1262 # the correct format. 1263 self.url = 'http://fake.localhost' 1264 1265 def test_close(self): 1266 p = xmlrpclib.ServerProxy(self.url) 1267 self.assertEqual(p('close')(), None) 1268 1269 def test_transport(self): 1270 t = xmlrpclib.Transport() 1271 p = xmlrpclib.ServerProxy(self.url, transport=t) 1272 self.assertEqual(p('transport'), t) 1273 1274 1275# This is a contrived way to make a failure occur on the server side 1276# in order to test the _send_traceback_header flag on the server 1277class FailingMessageClass(http.client.HTTPMessage): 1278 def get(self, key, failobj=None): 1279 key = key.lower() 1280 if key == 'content-length': 1281 return 'I am broken' 1282 return super().get(key, failobj) 1283 1284 1285class FailingServerTestCase(unittest.TestCase): 1286 def setUp(self): 1287 self.evt = threading.Event() 1288 # start server thread to handle requests 1289 serv_args = (self.evt, 1) 1290 thread = threading.Thread(target=http_server, args=serv_args) 1291 thread.start() 1292 self.addCleanup(thread.join) 1293 1294 # wait for the server to be ready 1295 self.evt.wait() 1296 self.evt.clear() 1297 1298 def tearDown(self): 1299 # wait on the server thread to terminate 1300 self.evt.wait() 1301 # reset flag 1302 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1303 # reset message class 1304 default_class = http.client.HTTPMessage 1305 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1306 1307 def test_basic(self): 1308 # check that flag is false by default 1309 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1310 self.assertEqual(flagval, False) 1311 1312 # enable traceback reporting 1313 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1314 1315 # test a call that shouldn't fail just as a smoke test 1316 try: 1317 p = xmlrpclib.ServerProxy(URL) 1318 self.assertEqual(p.pow(6,8), 6**8) 1319 except (xmlrpclib.ProtocolError, OSError) as e: 1320 # ignore failures due to non-blocking socket 'unavailable' errors 1321 if not is_unavailable_exception(e): 1322 # protocol error; provide additional information in test output 1323 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1324 1325 def test_fail_no_info(self): 1326 # use the broken message class 1327 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1328 1329 try: 1330 p = xmlrpclib.ServerProxy(URL) 1331 p.pow(6,8) 1332 except (xmlrpclib.ProtocolError, OSError) as e: 1333 # ignore failures due to non-blocking socket 'unavailable' errors 1334 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1335 # The two server-side error headers shouldn't be sent back in this case 1336 self.assertTrue(e.headers.get("X-exception") is None) 1337 self.assertTrue(e.headers.get("X-traceback") is None) 1338 else: 1339 self.fail('ProtocolError not raised') 1340 1341 def test_fail_with_info(self): 1342 # use the broken message class 1343 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1344 1345 # Check that errors in the server send back exception/traceback 1346 # info when flag is set 1347 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1348 1349 try: 1350 p = xmlrpclib.ServerProxy(URL) 1351 p.pow(6,8) 1352 except (xmlrpclib.ProtocolError, OSError) as e: 1353 # ignore failures due to non-blocking socket 'unavailable' errors 1354 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1355 # We should get error info in the response 1356 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1357 self.assertEqual(e.headers.get("X-exception"), expected_err) 1358 self.assertTrue(e.headers.get("X-traceback") is not None) 1359 else: 1360 self.fail('ProtocolError not raised') 1361 1362 1363@contextlib.contextmanager 1364def captured_stdout(encoding='utf-8'): 1365 """A variation on support.captured_stdout() which gives a text stream 1366 having a `buffer` attribute. 1367 """ 1368 orig_stdout = sys.stdout 1369 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1370 try: 1371 yield sys.stdout 1372 finally: 1373 sys.stdout = orig_stdout 1374 1375 1376class CGIHandlerTestCase(unittest.TestCase): 1377 def setUp(self): 1378 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1379 1380 def tearDown(self): 1381 self.cgi = None 1382 1383 def test_cgi_get(self): 1384 with support.EnvironmentVarGuard() as env: 1385 env['REQUEST_METHOD'] = 'GET' 1386 # if the method is GET and no request_text is given, it runs handle_get 1387 # get sysout output 1388 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1389 self.cgi.handle_request() 1390 1391 # parse Status header 1392 data_out.seek(0) 1393 handle = data_out.read() 1394 status = handle.split()[1] 1395 message = ' '.join(handle.split()[2:4]) 1396 1397 self.assertEqual(status, '400') 1398 self.assertEqual(message, 'Bad Request') 1399 1400 1401 def test_cgi_xmlrpc_response(self): 1402 data = """<?xml version='1.0'?> 1403 <methodCall> 1404 <methodName>test_method</methodName> 1405 <params> 1406 <param> 1407 <value><string>foo</string></value> 1408 </param> 1409 <param> 1410 <value><string>bar</string></value> 1411 </param> 1412 </params> 1413 </methodCall> 1414 """ 1415 1416 with support.EnvironmentVarGuard() as env, \ 1417 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1418 support.captured_stdin() as data_in: 1419 data_in.write(data) 1420 data_in.seek(0) 1421 env['CONTENT_LENGTH'] = str(len(data)) 1422 self.cgi.handle_request() 1423 data_out.seek(0) 1424 1425 # will respond exception, if so, our goal is achieved ;) 1426 handle = data_out.read() 1427 1428 # start with 44th char so as not to get http header, we just 1429 # need only xml 1430 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1431 1432 # Also test the content-length returned by handle_request 1433 # Using the same test method inorder to avoid all the datapassing 1434 # boilerplate code. 1435 # Test for bug: http://bugs.python.org/issue5040 1436 1437 content = handle[handle.find("<?xml"):] 1438 1439 self.assertEqual( 1440 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1441 len(content)) 1442 1443 1444class UseBuiltinTypesTestCase(unittest.TestCase): 1445 1446 def test_use_builtin_types(self): 1447 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1448 # makes all dispatch of binary data as bytes instances, and all 1449 # dispatch of datetime argument as datetime.datetime instances. 1450 self.log = [] 1451 expected_bytes = b"my dog has fleas" 1452 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1453 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1454 def foobar(*args): 1455 self.log.extend(args) 1456 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1457 allow_none=True, encoding=None, use_builtin_types=True) 1458 handler.register_function(foobar) 1459 handler._marshaled_dispatch(marshaled) 1460 self.assertEqual(len(self.log), 2) 1461 mybytes, mydate = self.log 1462 self.assertEqual(self.log, [expected_bytes, expected_date]) 1463 self.assertIs(type(mydate), datetime.datetime) 1464 self.assertIs(type(mybytes), bytes) 1465 1466 def test_cgihandler_has_use_builtin_types_flag(self): 1467 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1468 self.assertTrue(handler.use_builtin_types) 1469 1470 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1471 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1472 use_builtin_types=True) 1473 server.server_close() 1474 self.assertTrue(server.use_builtin_types) 1475 1476 1477@support.reap_threads 1478def test_main(): 1479 support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 1480 BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase, 1481 SimpleServerTestCase, SimpleServerEncodingTestCase, 1482 KeepaliveServerTestCase1, KeepaliveServerTestCase2, 1483 GzipServerTestCase, GzipUtilTestCase, HeadersServerTestCase, 1484 MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase, 1485 CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase) 1486 1487 1488if __name__ == "__main__": 1489 test_main() 1490