1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import threading 14import re 15import io 16import contextlib 17from test import support 18 19try: 20 import gzip 21except ImportError: 22 gzip = None 23 24alist = [{'astring': 'foo@bar.baz.spam', 25 'afloat': 7283.43, 26 'anint': 2**20, 27 'ashortlong': 2, 28 'anotherlist': ['.zyx.41'], 29 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 30 'b64bytes': b"my dog has fleas", 31 'b64bytearray': bytearray(b"my dog has fleas"), 32 'boolean': False, 33 'unicode': '\u4000\u6000\u8000', 34 'ukey\u4000': 'regular value', 35 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 36 'datetime2': xmlrpclib.DateTime( 37 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 38 'datetime3': xmlrpclib.DateTime( 39 datetime.datetime(2005, 2, 10, 11, 41, 23)), 40 }] 41 42class XMLRPCTestCase(unittest.TestCase): 43 44 def test_dump_load(self): 45 dump = xmlrpclib.dumps((alist,)) 46 load = xmlrpclib.loads(dump) 47 self.assertEqual(alist, load[0][0]) 48 49 def test_dump_bare_datetime(self): 50 # This checks that an unwrapped datetime.date object can be handled 51 # by the marshalling code. This can't be done via test_dump_load() 52 # since with use_builtin_types set to 1 the unmarshaller would create 53 # datetime objects for the 'datetime[123]' keys as well 54 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 55 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 56 s = xmlrpclib.dumps((dt,)) 57 58 result, m = xmlrpclib.loads(s, use_builtin_types=True) 59 (newdt,) = result 60 self.assertEqual(newdt, dt) 61 self.assertIs(type(newdt), datetime.datetime) 62 self.assertIsNone(m) 63 64 result, m = xmlrpclib.loads(s, use_builtin_types=False) 65 (newdt,) = result 66 self.assertEqual(newdt, dt) 67 self.assertIs(type(newdt), xmlrpclib.DateTime) 68 self.assertIsNone(m) 69 70 result, m = xmlrpclib.loads(s, use_datetime=True) 71 (newdt,) = result 72 self.assertEqual(newdt, dt) 73 self.assertIs(type(newdt), datetime.datetime) 74 self.assertIsNone(m) 75 76 result, m = xmlrpclib.loads(s, use_datetime=False) 77 (newdt,) = result 78 self.assertEqual(newdt, dt) 79 self.assertIs(type(newdt), xmlrpclib.DateTime) 80 self.assertIsNone(m) 81 82 83 def test_datetime_before_1900(self): 84 # same as before but with a date before 1900 85 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 86 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 87 s = xmlrpclib.dumps((dt,)) 88 89 result, m = xmlrpclib.loads(s, use_builtin_types=True) 90 (newdt,) = result 91 self.assertEqual(newdt, dt) 92 self.assertIs(type(newdt), datetime.datetime) 93 self.assertIsNone(m) 94 95 result, m = xmlrpclib.loads(s, use_builtin_types=False) 96 (newdt,) = result 97 self.assertEqual(newdt, dt) 98 self.assertIs(type(newdt), xmlrpclib.DateTime) 99 self.assertIsNone(m) 100 101 def test_bug_1164912 (self): 102 d = xmlrpclib.DateTime() 103 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 104 methodresponse=True)) 105 self.assertIsInstance(new_d.value, str) 106 107 # Check that the output of dumps() is still an 8-bit string 108 s = xmlrpclib.dumps((new_d,), methodresponse=True) 109 self.assertIsInstance(s, str) 110 111 def test_newstyle_class(self): 112 class T(object): 113 pass 114 t = T() 115 t.x = 100 116 t.y = "Hello" 117 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 118 self.assertEqual(t2, t.__dict__) 119 120 def test_dump_big_long(self): 121 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 122 123 def test_dump_bad_dict(self): 124 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 125 126 def test_dump_recursive_seq(self): 127 l = [1,2,3] 128 t = [3,4,5,l] 129 l.append(t) 130 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 131 132 def test_dump_recursive_dict(self): 133 d = {'1':1, '2':1} 134 t = {'3':3, 'd':d} 135 d['t'] = t 136 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 137 138 def test_dump_big_int(self): 139 if sys.maxsize > 2**31-1: 140 self.assertRaises(OverflowError, xmlrpclib.dumps, 141 (int(2**34),)) 142 143 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 144 self.assertRaises(OverflowError, xmlrpclib.dumps, 145 (xmlrpclib.MAXINT+1,)) 146 self.assertRaises(OverflowError, xmlrpclib.dumps, 147 (xmlrpclib.MININT-1,)) 148 149 def dummy_write(s): 150 pass 151 152 m = xmlrpclib.Marshaller() 153 m.dump_int(xmlrpclib.MAXINT, dummy_write) 154 m.dump_int(xmlrpclib.MININT, dummy_write) 155 self.assertRaises(OverflowError, m.dump_int, 156 xmlrpclib.MAXINT+1, dummy_write) 157 self.assertRaises(OverflowError, m.dump_int, 158 xmlrpclib.MININT-1, dummy_write) 159 160 def test_dump_double(self): 161 xmlrpclib.dumps((float(2 ** 34),)) 162 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 163 float(xmlrpclib.MININT))) 164 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 165 float(xmlrpclib.MININT - 42))) 166 167 def dummy_write(s): 168 pass 169 170 m = xmlrpclib.Marshaller() 171 m.dump_double(xmlrpclib.MAXINT, dummy_write) 172 m.dump_double(xmlrpclib.MININT, dummy_write) 173 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 174 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 175 176 def test_dump_none(self): 177 value = alist + [None] 178 arg1 = (alist + [None],) 179 strg = xmlrpclib.dumps(arg1, allow_none=True) 180 self.assertEqual(value, 181 xmlrpclib.loads(strg)[0][0]) 182 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 183 184 def test_dump_encoding(self): 185 value = {'key\u20ac\xa4': 186 'value\u20ac\xa4'} 187 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 188 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 189 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 190 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 191 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 192 193 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 194 methodresponse=True) 195 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 196 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 197 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 198 199 methodname = 'method\u20ac\xa4' 200 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 201 methodname=methodname) 202 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 203 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 204 205 def test_dump_bytes(self): 206 sample = b"my dog has fleas" 207 self.assertEqual(sample, xmlrpclib.Binary(sample)) 208 for type_ in bytes, bytearray, xmlrpclib.Binary: 209 value = type_(sample) 210 s = xmlrpclib.dumps((value,)) 211 212 result, m = xmlrpclib.loads(s, use_builtin_types=True) 213 (newvalue,) = result 214 self.assertEqual(newvalue, sample) 215 self.assertIs(type(newvalue), bytes) 216 self.assertIsNone(m) 217 218 result, m = xmlrpclib.loads(s, use_builtin_types=False) 219 (newvalue,) = result 220 self.assertEqual(newvalue, sample) 221 self.assertIs(type(newvalue), xmlrpclib.Binary) 222 self.assertIsNone(m) 223 224 def test_loads_unsupported(self): 225 ResponseError = xmlrpclib.ResponseError 226 data = '<params><param><value><spam/></value></param></params>' 227 self.assertRaises(ResponseError, xmlrpclib.loads, data) 228 data = ('<params><param><value><array>' 229 '<value><spam/></value>' 230 '</array></value></param></params>') 231 self.assertRaises(ResponseError, xmlrpclib.loads, data) 232 data = ('<params><param><value><struct>' 233 '<member><name>a</name><value><spam/></value></member>' 234 '<member><name>b</name><value><spam/></value></member>' 235 '</struct></value></param></params>') 236 self.assertRaises(ResponseError, xmlrpclib.loads, data) 237 238 def check_loads(self, s, value, **kwargs): 239 dump = '<params><param><value>%s</value></param></params>' % s 240 result, m = xmlrpclib.loads(dump, **kwargs) 241 (newvalue,) = result 242 self.assertEqual(newvalue, value) 243 self.assertIs(type(newvalue), type(value)) 244 self.assertIsNone(m) 245 246 def test_load_standard_types(self): 247 check = self.check_loads 248 check('string', 'string') 249 check('<string>string</string>', 'string') 250 check('<string>�������������� string</string>', '�������������� string') 251 check('<int>2056183947</int>', 2056183947) 252 check('<int>-2056183947</int>', -2056183947) 253 check('<i4>2056183947</i4>', 2056183947) 254 check('<double>46093.78125</double>', 46093.78125) 255 check('<boolean>0</boolean>', False) 256 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 257 xmlrpclib.Binary(b'\x00byte string\xff')) 258 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 259 b'\x00byte string\xff', use_builtin_types=True) 260 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 261 xmlrpclib.DateTime('20050210T11:41:23')) 262 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 263 datetime.datetime(2005, 2, 10, 11, 41, 23), 264 use_builtin_types=True) 265 check('<array><data>' 266 '<value><int>1</int></value><value><int>2</int></value>' 267 '</data></array>', [1, 2]) 268 check('<struct>' 269 '<member><name>b</name><value><int>2</int></value></member>' 270 '<member><name>a</name><value><int>1</int></value></member>' 271 '</struct>', {'a': 1, 'b': 2}) 272 273 def test_load_extension_types(self): 274 check = self.check_loads 275 check('<nil/>', None) 276 check('<ex:nil/>', None) 277 check('<i1>205</i1>', 205) 278 check('<i2>20561</i2>', 20561) 279 check('<i8>9876543210</i8>', 9876543210) 280 check('<biginteger>98765432100123456789</biginteger>', 281 98765432100123456789) 282 check('<float>93.78125</float>', 93.78125) 283 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 284 decimal.Decimal('9876543210.0123456789')) 285 286 def test_get_host_info(self): 287 # see bug #3613, this raised a TypeError 288 transp = xmlrpc.client.Transport() 289 self.assertEqual(transp.get_host_info("user@host.tld"), 290 ('host.tld', 291 [('Authorization', 'Basic dXNlcg==')], {})) 292 293 def test_ssl_presence(self): 294 try: 295 import ssl 296 except ImportError: 297 has_ssl = False 298 else: 299 has_ssl = True 300 try: 301 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 302 except NotImplementedError: 303 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 304 except OSError: 305 self.assertTrue(has_ssl) 306 307 def test_keepalive_disconnect(self): 308 class RequestHandler(http.server.BaseHTTPRequestHandler): 309 protocol_version = "HTTP/1.1" 310 handled = False 311 312 def do_POST(self): 313 length = int(self.headers.get("Content-Length")) 314 self.rfile.read(length) 315 if self.handled: 316 self.close_connection = True 317 return 318 response = xmlrpclib.dumps((5,), methodresponse=True) 319 response = response.encode() 320 self.send_response(http.HTTPStatus.OK) 321 self.send_header("Content-Length", len(response)) 322 self.end_headers() 323 self.wfile.write(response) 324 self.handled = True 325 self.close_connection = False 326 327 def log_message(self, format, *args): 328 # don't clobber sys.stderr 329 pass 330 331 def run_server(): 332 server.socket.settimeout(float(1)) # Don't hang if client fails 333 server.handle_request() # First request and attempt at second 334 server.handle_request() # Retried second request 335 336 server = http.server.HTTPServer((support.HOST, 0), RequestHandler) 337 self.addCleanup(server.server_close) 338 thread = threading.Thread(target=run_server) 339 thread.start() 340 self.addCleanup(thread.join) 341 url = "http://{}:{}/".format(*server.server_address) 342 with xmlrpclib.ServerProxy(url) as p: 343 self.assertEqual(p.method(), 5) 344 self.assertEqual(p.method(), 5) 345 346 347class SimpleXMLRPCDispatcherTestCase(unittest.TestCase): 348 class DispatchExc(Exception): 349 """Raised inside the dispatched functions when checking for 350 chained exceptions""" 351 352 def test_call_registered_func(self): 353 """Calls explicitly registered function""" 354 # Makes sure any exception raised inside the function has no other 355 # exception chained to it 356 357 exp_params = 1, 2, 3 358 359 def dispatched_func(*params): 360 raise self.DispatchExc(params) 361 362 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 363 dispatcher.register_function(dispatched_func) 364 with self.assertRaises(self.DispatchExc) as exc_ctx: 365 dispatcher._dispatch('dispatched_func', exp_params) 366 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 367 self.assertIsNone(exc_ctx.exception.__cause__) 368 self.assertIsNone(exc_ctx.exception.__context__) 369 370 def test_call_instance_func(self): 371 """Calls a registered instance attribute as a function""" 372 # Makes sure any exception raised inside the function has no other 373 # exception chained to it 374 375 exp_params = 1, 2, 3 376 377 class DispatchedClass: 378 def dispatched_func(self, *params): 379 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params) 380 381 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 382 dispatcher.register_instance(DispatchedClass()) 383 with self.assertRaises(self.DispatchExc) as exc_ctx: 384 dispatcher._dispatch('dispatched_func', exp_params) 385 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 386 self.assertIsNone(exc_ctx.exception.__cause__) 387 self.assertIsNone(exc_ctx.exception.__context__) 388 389 def test_call_dispatch_func(self): 390 """Calls the registered instance's `_dispatch` function""" 391 # Makes sure any exception raised inside the function has no other 392 # exception chained to it 393 394 exp_method = 'method' 395 exp_params = 1, 2, 3 396 397 class TestInstance: 398 def _dispatch(self, method, params): 399 raise SimpleXMLRPCDispatcherTestCase.DispatchExc( 400 method, params) 401 402 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 403 dispatcher.register_instance(TestInstance()) 404 with self.assertRaises(self.DispatchExc) as exc_ctx: 405 dispatcher._dispatch(exp_method, exp_params) 406 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params)) 407 self.assertIsNone(exc_ctx.exception.__cause__) 408 self.assertIsNone(exc_ctx.exception.__context__) 409 410 def test_registered_func_is_none(self): 411 """Calls explicitly registered function which is None""" 412 413 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 414 dispatcher.register_function(None, name='method') 415 with self.assertRaisesRegex(Exception, 'method'): 416 dispatcher._dispatch('method', ('param',)) 417 418 def test_instance_has_no_func(self): 419 """Attempts to call nonexistent function on a registered instance""" 420 421 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 422 dispatcher.register_instance(object()) 423 with self.assertRaisesRegex(Exception, 'method'): 424 dispatcher._dispatch('method', ('param',)) 425 426 def test_cannot_locate_func(self): 427 """Calls a function that the dispatcher cannot locate""" 428 429 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 430 with self.assertRaisesRegex(Exception, 'method'): 431 dispatcher._dispatch('method', ('param',)) 432 433 434class HelperTestCase(unittest.TestCase): 435 def test_escape(self): 436 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 437 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 438 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 439 440class FaultTestCase(unittest.TestCase): 441 def test_repr(self): 442 f = xmlrpclib.Fault(42, 'Test Fault') 443 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 444 self.assertEqual(repr(f), str(f)) 445 446 def test_dump_fault(self): 447 f = xmlrpclib.Fault(42, 'Test Fault') 448 s = xmlrpclib.dumps((f,)) 449 (newf,), m = xmlrpclib.loads(s) 450 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 451 self.assertEqual(m, None) 452 453 s = xmlrpclib.Marshaller().dumps(f) 454 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 455 456 def test_dotted_attribute(self): 457 # this will raise AttributeError because code don't want us to use 458 # private methods 459 self.assertRaises(AttributeError, 460 xmlrpc.server.resolve_dotted_attribute, str, '__add') 461 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 462 463class DateTimeTestCase(unittest.TestCase): 464 def test_default(self): 465 with mock.patch('time.localtime') as localtime_mock: 466 time_struct = time.struct_time( 467 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 468 localtime_mock.return_value = time_struct 469 localtime = time.localtime() 470 t = xmlrpclib.DateTime() 471 self.assertEqual(str(t), 472 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 473 474 def test_time(self): 475 d = 1181399930.036952 476 t = xmlrpclib.DateTime(d) 477 self.assertEqual(str(t), 478 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 479 480 def test_time_tuple(self): 481 d = (2007,6,9,10,38,50,5,160,0) 482 t = xmlrpclib.DateTime(d) 483 self.assertEqual(str(t), '20070609T10:38:50') 484 485 def test_time_struct(self): 486 d = time.localtime(1181399930.036952) 487 t = xmlrpclib.DateTime(d) 488 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 489 490 def test_datetime_datetime(self): 491 d = datetime.datetime(2007,1,2,3,4,5) 492 t = xmlrpclib.DateTime(d) 493 self.assertEqual(str(t), '20070102T03:04:05') 494 495 def test_repr(self): 496 d = datetime.datetime(2007,1,2,3,4,5) 497 t = xmlrpclib.DateTime(d) 498 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 499 self.assertEqual(repr(t), val) 500 501 def test_decode(self): 502 d = ' 20070908T07:11:13 ' 503 t1 = xmlrpclib.DateTime() 504 t1.decode(d) 505 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 506 self.assertEqual(t1, tref) 507 508 t2 = xmlrpclib._datetime(d) 509 self.assertEqual(t2, tref) 510 511 def test_comparison(self): 512 now = datetime.datetime.now() 513 dtime = xmlrpclib.DateTime(now.timetuple()) 514 515 # datetime vs. DateTime 516 self.assertTrue(dtime == now) 517 self.assertTrue(now == dtime) 518 then = now + datetime.timedelta(seconds=4) 519 self.assertTrue(then >= dtime) 520 self.assertTrue(dtime < then) 521 522 # str vs. DateTime 523 dstr = now.strftime("%Y%m%dT%H:%M:%S") 524 self.assertTrue(dtime == dstr) 525 self.assertTrue(dstr == dtime) 526 dtime_then = xmlrpclib.DateTime(then.timetuple()) 527 self.assertTrue(dtime_then >= dstr) 528 self.assertTrue(dstr < dtime_then) 529 530 # some other types 531 dbytes = dstr.encode('ascii') 532 dtuple = now.timetuple() 533 with self.assertRaises(TypeError): 534 dtime == 1970 535 with self.assertRaises(TypeError): 536 dtime != dbytes 537 with self.assertRaises(TypeError): 538 dtime == bytearray(dbytes) 539 with self.assertRaises(TypeError): 540 dtime != dtuple 541 with self.assertRaises(TypeError): 542 dtime < float(1970) 543 with self.assertRaises(TypeError): 544 dtime > dbytes 545 with self.assertRaises(TypeError): 546 dtime <= bytearray(dbytes) 547 with self.assertRaises(TypeError): 548 dtime >= dtuple 549 550class BinaryTestCase(unittest.TestCase): 551 552 # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" 553 # for now (i.e. interpreting the binary data as Latin-1-encoded 554 # text). But this feels very unsatisfactory. Perhaps we should 555 # only define repr(), and return r"Binary(b'\xff')" instead? 556 557 def test_default(self): 558 t = xmlrpclib.Binary() 559 self.assertEqual(str(t), '') 560 561 def test_string(self): 562 d = b'\x01\x02\x03abc123\xff\xfe' 563 t = xmlrpclib.Binary(d) 564 self.assertEqual(str(t), str(d, "latin-1")) 565 566 def test_decode(self): 567 d = b'\x01\x02\x03abc123\xff\xfe' 568 de = base64.encodebytes(d) 569 t1 = xmlrpclib.Binary() 570 t1.decode(de) 571 self.assertEqual(str(t1), str(d, "latin-1")) 572 573 t2 = xmlrpclib._binary(de) 574 self.assertEqual(str(t2), str(d, "latin-1")) 575 576 577ADDR = PORT = URL = None 578 579# The evt is set twice. First when the server is ready to serve. 580# Second when the server has been shutdown. The user must clear 581# the event after it has been set the first time to catch the second set. 582def http_server(evt, numrequests, requestHandler=None, encoding=None): 583 class TestInstanceClass: 584 def div(self, x, y): 585 return x // y 586 587 def _methodHelp(self, name): 588 if name == 'div': 589 return 'This is the div function' 590 591 class Fixture: 592 @staticmethod 593 def getData(): 594 return '42' 595 596 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 597 def get_request(self): 598 # Ensure the socket is always non-blocking. On Linux, socket 599 # attributes are not inherited like they are on *BSD and Windows. 600 s, port = self.socket.accept() 601 s.setblocking(True) 602 return s, port 603 604 if not requestHandler: 605 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 606 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 607 encoding=encoding, 608 logRequests=False, bind_and_activate=False) 609 try: 610 serv.server_bind() 611 global ADDR, PORT, URL 612 ADDR, PORT = serv.socket.getsockname() 613 #connect to IP address directly. This avoids socket.create_connection() 614 #trying to connect to "localhost" using all address families, which 615 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 616 #on AF_INET only. 617 URL = "http://%s:%d"%(ADDR, PORT) 618 serv.server_activate() 619 serv.register_introspection_functions() 620 serv.register_multicall_functions() 621 serv.register_function(pow) 622 serv.register_function(lambda x: x, 'têšt') 623 @serv.register_function 624 def my_function(): 625 '''This is my function''' 626 return True 627 @serv.register_function(name='add') 628 def _(x, y): 629 return x + y 630 testInstance = TestInstanceClass() 631 serv.register_instance(testInstance, allow_dotted_names=True) 632 evt.set() 633 634 # handle up to 'numrequests' requests 635 while numrequests > 0: 636 serv.handle_request() 637 numrequests -= 1 638 639 except socket.timeout: 640 pass 641 finally: 642 serv.socket.close() 643 PORT = None 644 evt.set() 645 646def http_multi_server(evt, numrequests, requestHandler=None): 647 class TestInstanceClass: 648 def div(self, x, y): 649 return x // y 650 651 def _methodHelp(self, name): 652 if name == 'div': 653 return 'This is the div function' 654 655 def my_function(): 656 '''This is my function''' 657 return True 658 659 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 660 def get_request(self): 661 # Ensure the socket is always non-blocking. On Linux, socket 662 # attributes are not inherited like they are on *BSD and Windows. 663 s, port = self.socket.accept() 664 s.setblocking(True) 665 return s, port 666 667 if not requestHandler: 668 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 669 class MyRequestHandler(requestHandler): 670 rpc_paths = [] 671 672 class BrokenDispatcher: 673 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 674 raise RuntimeError("broken dispatcher") 675 676 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 677 logRequests=False, bind_and_activate=False) 678 serv.socket.settimeout(3) 679 serv.server_bind() 680 try: 681 global ADDR, PORT, URL 682 ADDR, PORT = serv.socket.getsockname() 683 #connect to IP address directly. This avoids socket.create_connection() 684 #trying to connect to "localhost" using all address families, which 685 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 686 #on AF_INET only. 687 URL = "http://%s:%d"%(ADDR, PORT) 688 serv.server_activate() 689 paths = ["/foo", "/foo/bar"] 690 for path in paths: 691 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 692 d.register_introspection_functions() 693 d.register_multicall_functions() 694 serv.get_dispatcher(paths[0]).register_function(pow) 695 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 696 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 697 evt.set() 698 699 # handle up to 'numrequests' requests 700 while numrequests > 0: 701 serv.handle_request() 702 numrequests -= 1 703 704 except socket.timeout: 705 pass 706 finally: 707 serv.socket.close() 708 PORT = None 709 evt.set() 710 711# This function prevents errors like: 712# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 713def is_unavailable_exception(e): 714 '''Returns True if the given ProtocolError is the product of a server-side 715 exception caused by the 'temporarily unavailable' response sometimes 716 given by operations on non-blocking sockets.''' 717 718 # sometimes we get a -1 error code and/or empty headers 719 try: 720 if e.errcode == -1 or e.headers is None: 721 return True 722 exc_mess = e.headers.get('X-exception') 723 except AttributeError: 724 # Ignore OSErrors here. 725 exc_mess = str(e) 726 727 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 728 return True 729 730def make_request_and_skipIf(condition, reason): 731 # If we skip the test, we have to make a request because 732 # the server created in setUp blocks expecting one to come in. 733 if not condition: 734 return lambda func: func 735 def decorator(func): 736 def make_request_and_skip(self): 737 try: 738 xmlrpclib.ServerProxy(URL).my_function() 739 except (xmlrpclib.ProtocolError, OSError) as e: 740 if not is_unavailable_exception(e): 741 raise 742 raise unittest.SkipTest(reason) 743 return make_request_and_skip 744 return decorator 745 746class BaseServerTestCase(unittest.TestCase): 747 requestHandler = None 748 request_count = 1 749 threadFunc = staticmethod(http_server) 750 751 def setUp(self): 752 # enable traceback reporting 753 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 754 755 self.evt = threading.Event() 756 # start server thread to handle requests 757 serv_args = (self.evt, self.request_count, self.requestHandler) 758 thread = threading.Thread(target=self.threadFunc, args=serv_args) 759 thread.start() 760 self.addCleanup(thread.join) 761 762 # wait for the server to be ready 763 self.evt.wait() 764 self.evt.clear() 765 766 def tearDown(self): 767 # wait on the server thread to terminate 768 self.evt.wait() 769 770 # disable traceback reporting 771 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 772 773class SimpleServerTestCase(BaseServerTestCase): 774 def test_simple1(self): 775 try: 776 p = xmlrpclib.ServerProxy(URL) 777 self.assertEqual(p.pow(6,8), 6**8) 778 except (xmlrpclib.ProtocolError, OSError) as e: 779 # ignore failures due to non-blocking socket 'unavailable' errors 780 if not is_unavailable_exception(e): 781 # protocol error; provide additional information in test output 782 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 783 784 def test_nonascii(self): 785 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 786 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 787 try: 788 p = xmlrpclib.ServerProxy(URL) 789 self.assertEqual(p.add(start_string, end_string), 790 start_string + end_string) 791 except (xmlrpclib.ProtocolError, OSError) as e: 792 # ignore failures due to non-blocking socket 'unavailable' errors 793 if not is_unavailable_exception(e): 794 # protocol error; provide additional information in test output 795 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 796 797 def test_client_encoding(self): 798 start_string = '\u20ac' 799 end_string = '\xa4' 800 801 try: 802 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 803 self.assertEqual(p.add(start_string, end_string), 804 start_string + end_string) 805 except (xmlrpclib.ProtocolError, socket.error) as e: 806 # ignore failures due to non-blocking socket unavailable errors. 807 if not is_unavailable_exception(e): 808 # protocol error; provide additional information in test output 809 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 810 811 def test_nonascii_methodname(self): 812 try: 813 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 814 self.assertEqual(p.têšt(42), 42) 815 except (xmlrpclib.ProtocolError, socket.error) as e: 816 # ignore failures due to non-blocking socket unavailable errors. 817 if not is_unavailable_exception(e): 818 # protocol error; provide additional information in test output 819 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 820 821 def test_404(self): 822 # send POST with http.client, it should return 404 header and 823 # 'Not Found' message. 824 with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: 825 conn.request('POST', '/this-is-not-valid') 826 response = conn.getresponse() 827 828 self.assertEqual(response.status, 404) 829 self.assertEqual(response.reason, 'Not Found') 830 831 def test_introspection1(self): 832 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 833 'system.listMethods', 'system.methodHelp', 834 'system.methodSignature', 'system.multicall', 835 'Fixture']) 836 try: 837 p = xmlrpclib.ServerProxy(URL) 838 meth = p.system.listMethods() 839 self.assertEqual(set(meth), expected_methods) 840 except (xmlrpclib.ProtocolError, OSError) as e: 841 # ignore failures due to non-blocking socket 'unavailable' errors 842 if not is_unavailable_exception(e): 843 # protocol error; provide additional information in test output 844 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 845 846 847 def test_introspection2(self): 848 try: 849 # test _methodHelp() 850 p = xmlrpclib.ServerProxy(URL) 851 divhelp = p.system.methodHelp('div') 852 self.assertEqual(divhelp, 'This is the div function') 853 except (xmlrpclib.ProtocolError, OSError) as e: 854 # ignore failures due to non-blocking socket 'unavailable' errors 855 if not is_unavailable_exception(e): 856 # protocol error; provide additional information in test output 857 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 858 859 @make_request_and_skipIf(sys.flags.optimize >= 2, 860 "Docstrings are omitted with -O2 and above") 861 def test_introspection3(self): 862 try: 863 # test native doc 864 p = xmlrpclib.ServerProxy(URL) 865 myfunction = p.system.methodHelp('my_function') 866 self.assertEqual(myfunction, 'This is my function') 867 except (xmlrpclib.ProtocolError, OSError) as e: 868 # ignore failures due to non-blocking socket 'unavailable' errors 869 if not is_unavailable_exception(e): 870 # protocol error; provide additional information in test output 871 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 872 873 def test_introspection4(self): 874 # the SimpleXMLRPCServer doesn't support signatures, but 875 # at least check that we can try making the call 876 try: 877 p = xmlrpclib.ServerProxy(URL) 878 divsig = p.system.methodSignature('div') 879 self.assertEqual(divsig, 'signatures not supported') 880 except (xmlrpclib.ProtocolError, OSError) as e: 881 # ignore failures due to non-blocking socket 'unavailable' errors 882 if not is_unavailable_exception(e): 883 # protocol error; provide additional information in test output 884 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 885 886 def test_multicall(self): 887 try: 888 p = xmlrpclib.ServerProxy(URL) 889 multicall = xmlrpclib.MultiCall(p) 890 multicall.add(2,3) 891 multicall.pow(6,8) 892 multicall.div(127,42) 893 add_result, pow_result, div_result = multicall() 894 self.assertEqual(add_result, 2+3) 895 self.assertEqual(pow_result, 6**8) 896 self.assertEqual(div_result, 127//42) 897 except (xmlrpclib.ProtocolError, OSError) as e: 898 # ignore failures due to non-blocking socket 'unavailable' errors 899 if not is_unavailable_exception(e): 900 # protocol error; provide additional information in test output 901 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 902 903 def test_non_existing_multicall(self): 904 try: 905 p = xmlrpclib.ServerProxy(URL) 906 multicall = xmlrpclib.MultiCall(p) 907 multicall.this_is_not_exists() 908 result = multicall() 909 910 # result.results contains; 911 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 912 # 'method "this_is_not_exists" is not supported'>}] 913 914 self.assertEqual(result.results[0]['faultCode'], 1) 915 self.assertEqual(result.results[0]['faultString'], 916 '<class \'Exception\'>:method "this_is_not_exists" ' 917 'is not supported') 918 except (xmlrpclib.ProtocolError, OSError) as e: 919 # ignore failures due to non-blocking socket 'unavailable' errors 920 if not is_unavailable_exception(e): 921 # protocol error; provide additional information in test output 922 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 923 924 def test_dotted_attribute(self): 925 # Raises an AttributeError because private methods are not allowed. 926 self.assertRaises(AttributeError, 927 xmlrpc.server.resolve_dotted_attribute, str, '__add') 928 929 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 930 # Get the test to run faster by sending a request with test_simple1. 931 # This avoids waiting for the socket timeout. 932 self.test_simple1() 933 934 def test_allow_dotted_names_true(self): 935 # XXX also need allow_dotted_names_false test. 936 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 937 data = server.Fixture.getData() 938 self.assertEqual(data, '42') 939 940 def test_unicode_host(self): 941 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 942 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 943 944 def test_partial_post(self): 945 # Check that a partial POST doesn't make the server loop: issue #14001. 946 with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn: 947 conn.send('POST /RPC2 HTTP/1.0\r\n' 948 'Content-Length: 100\r\n\r\n' 949 'bye HTTP/1.1\r\n' 950 f'Host: {ADDR}:{PORT}\r\n' 951 'Accept-Encoding: identity\r\n' 952 'Content-Length: 0\r\n\r\n'.encode('ascii')) 953 954 def test_context_manager(self): 955 with xmlrpclib.ServerProxy(URL) as server: 956 server.add(2, 3) 957 self.assertNotEqual(server('transport')._connection, 958 (None, None)) 959 self.assertEqual(server('transport')._connection, 960 (None, None)) 961 962 def test_context_manager_method_error(self): 963 try: 964 with xmlrpclib.ServerProxy(URL) as server: 965 server.add(2, "a") 966 except xmlrpclib.Fault: 967 pass 968 self.assertEqual(server('transport')._connection, 969 (None, None)) 970 971 972class SimpleServerEncodingTestCase(BaseServerTestCase): 973 @staticmethod 974 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 975 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 976 977 def test_server_encoding(self): 978 start_string = '\u20ac' 979 end_string = '\xa4' 980 981 try: 982 p = xmlrpclib.ServerProxy(URL) 983 self.assertEqual(p.add(start_string, end_string), 984 start_string + end_string) 985 except (xmlrpclib.ProtocolError, socket.error) as e: 986 # ignore failures due to non-blocking socket unavailable errors. 987 if not is_unavailable_exception(e): 988 # protocol error; provide additional information in test output 989 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 990 991 992class MultiPathServerTestCase(BaseServerTestCase): 993 threadFunc = staticmethod(http_multi_server) 994 request_count = 2 995 def test_path1(self): 996 p = xmlrpclib.ServerProxy(URL+"/foo") 997 self.assertEqual(p.pow(6,8), 6**8) 998 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 999 1000 def test_path2(self): 1001 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 1002 self.assertEqual(p.add(6,8), 6+8) 1003 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 1004 1005 def test_path3(self): 1006 p = xmlrpclib.ServerProxy(URL+"/is/broken") 1007 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1008 1009#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1010#does indeed serve subsequent requests on the same connection 1011class BaseKeepaliveServerTestCase(BaseServerTestCase): 1012 #a request handler that supports keep-alive and logs requests into a 1013 #class variable 1014 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1015 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1016 protocol_version = 'HTTP/1.1' 1017 myRequests = [] 1018 def handle(self): 1019 self.myRequests.append([]) 1020 self.reqidx = len(self.myRequests)-1 1021 return self.parentClass.handle(self) 1022 def handle_one_request(self): 1023 result = self.parentClass.handle_one_request(self) 1024 self.myRequests[self.reqidx].append(self.raw_requestline) 1025 return result 1026 1027 requestHandler = RequestHandler 1028 def setUp(self): 1029 #clear request log 1030 self.RequestHandler.myRequests = [] 1031 return BaseServerTestCase.setUp(self) 1032 1033#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1034#does indeed serve subsequent requests on the same connection 1035class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 1036 def test_two(self): 1037 p = xmlrpclib.ServerProxy(URL) 1038 #do three requests. 1039 self.assertEqual(p.pow(6,8), 6**8) 1040 self.assertEqual(p.pow(6,8), 6**8) 1041 self.assertEqual(p.pow(6,8), 6**8) 1042 p("close")() 1043 1044 #they should have all been handled by a single request handler 1045 self.assertEqual(len(self.RequestHandler.myRequests), 1) 1046 1047 #check that we did at least two (the third may be pending append 1048 #due to thread scheduling) 1049 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1050 1051 1052#test special attribute access on the serverproxy, through the __call__ 1053#function. 1054class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 1055 #ask for two keepalive requests to be handled. 1056 request_count=2 1057 1058 def test_close(self): 1059 p = xmlrpclib.ServerProxy(URL) 1060 #do some requests with close. 1061 self.assertEqual(p.pow(6,8), 6**8) 1062 self.assertEqual(p.pow(6,8), 6**8) 1063 self.assertEqual(p.pow(6,8), 6**8) 1064 p("close")() #this should trigger a new keep-alive request 1065 self.assertEqual(p.pow(6,8), 6**8) 1066 self.assertEqual(p.pow(6,8), 6**8) 1067 self.assertEqual(p.pow(6,8), 6**8) 1068 p("close")() 1069 1070 #they should have all been two request handlers, each having logged at least 1071 #two complete requests 1072 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1073 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1074 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 1075 1076 1077 def test_transport(self): 1078 p = xmlrpclib.ServerProxy(URL) 1079 #do some requests with close. 1080 self.assertEqual(p.pow(6,8), 6**8) 1081 p("transport").close() #same as above, really. 1082 self.assertEqual(p.pow(6,8), 6**8) 1083 p("close")() 1084 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1085 1086#A test case that verifies that gzip encoding works in both directions 1087#(for a request and the response) 1088@unittest.skipIf(gzip is None, 'requires gzip') 1089class GzipServerTestCase(BaseServerTestCase): 1090 #a request handler that supports keep-alive and logs requests into a 1091 #class variable 1092 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1093 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1094 protocol_version = 'HTTP/1.1' 1095 1096 def do_POST(self): 1097 #store content of last request in class 1098 self.__class__.content_length = int(self.headers["content-length"]) 1099 return self.parentClass.do_POST(self) 1100 requestHandler = RequestHandler 1101 1102 class Transport(xmlrpclib.Transport): 1103 #custom transport, stores the response length for our perusal 1104 fake_gzip = False 1105 def parse_response(self, response): 1106 self.response_length=int(response.getheader("content-length", 0)) 1107 return xmlrpclib.Transport.parse_response(self, response) 1108 1109 def send_content(self, connection, body): 1110 if self.fake_gzip: 1111 #add a lone gzip header to induce decode error remotely 1112 connection.putheader("Content-Encoding", "gzip") 1113 return xmlrpclib.Transport.send_content(self, connection, body) 1114 1115 def setUp(self): 1116 BaseServerTestCase.setUp(self) 1117 1118 def test_gzip_request(self): 1119 t = self.Transport() 1120 t.encode_threshold = None 1121 p = xmlrpclib.ServerProxy(URL, transport=t) 1122 self.assertEqual(p.pow(6,8), 6**8) 1123 a = self.RequestHandler.content_length 1124 t.encode_threshold = 0 #turn on request encoding 1125 self.assertEqual(p.pow(6,8), 6**8) 1126 b = self.RequestHandler.content_length 1127 self.assertTrue(a>b) 1128 p("close")() 1129 1130 def test_bad_gzip_request(self): 1131 t = self.Transport() 1132 t.encode_threshold = None 1133 t.fake_gzip = True 1134 p = xmlrpclib.ServerProxy(URL, transport=t) 1135 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1136 re.compile(r"\b400\b")) 1137 with cm: 1138 p.pow(6, 8) 1139 p("close")() 1140 1141 def test_gzip_response(self): 1142 t = self.Transport() 1143 p = xmlrpclib.ServerProxy(URL, transport=t) 1144 old = self.requestHandler.encode_threshold 1145 self.requestHandler.encode_threshold = None #no encoding 1146 self.assertEqual(p.pow(6,8), 6**8) 1147 a = t.response_length 1148 self.requestHandler.encode_threshold = 0 #always encode 1149 self.assertEqual(p.pow(6,8), 6**8) 1150 p("close")() 1151 b = t.response_length 1152 self.requestHandler.encode_threshold = old 1153 self.assertTrue(a>b) 1154 1155 1156@unittest.skipIf(gzip is None, 'requires gzip') 1157class GzipUtilTestCase(unittest.TestCase): 1158 1159 def test_gzip_decode_limit(self): 1160 max_gzip_decode = 20 * 1024 * 1024 1161 data = b'\0' * max_gzip_decode 1162 encoded = xmlrpclib.gzip_encode(data) 1163 decoded = xmlrpclib.gzip_decode(encoded) 1164 self.assertEqual(len(decoded), max_gzip_decode) 1165 1166 data = b'\0' * (max_gzip_decode + 1) 1167 encoded = xmlrpclib.gzip_encode(data) 1168 1169 with self.assertRaisesRegex(ValueError, 1170 "max gzipped payload length exceeded"): 1171 xmlrpclib.gzip_decode(encoded) 1172 1173 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1174 1175 1176class HeadersServerTestCase(BaseServerTestCase): 1177 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1178 test_headers = None 1179 1180 def do_POST(self): 1181 self.__class__.test_headers = self.headers 1182 return super().do_POST() 1183 requestHandler = RequestHandler 1184 standard_headers = [ 1185 'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent', 1186 'Content-Length'] 1187 1188 def setUp(self): 1189 self.RequestHandler.test_headers = None 1190 return super().setUp() 1191 1192 def assertContainsAdditionalHeaders(self, headers, additional): 1193 expected_keys = sorted(self.standard_headers + list(additional.keys())) 1194 self.assertListEqual(sorted(headers.keys()), expected_keys) 1195 1196 for key, value in additional.items(): 1197 self.assertEqual(headers.get(key), value) 1198 1199 def test_header(self): 1200 p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')]) 1201 self.assertEqual(p.pow(6, 8), 6**8) 1202 1203 headers = self.RequestHandler.test_headers 1204 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1205 1206 def test_header_many(self): 1207 p = xmlrpclib.ServerProxy( 1208 URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')]) 1209 self.assertEqual(p.pow(6, 8), 6**8) 1210 1211 headers = self.RequestHandler.test_headers 1212 self.assertContainsAdditionalHeaders( 1213 headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'}) 1214 1215 def test_header_empty(self): 1216 p = xmlrpclib.ServerProxy(URL, headers=[]) 1217 self.assertEqual(p.pow(6, 8), 6**8) 1218 1219 headers = self.RequestHandler.test_headers 1220 self.assertContainsAdditionalHeaders(headers, {}) 1221 1222 def test_header_tuple(self): 1223 p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),)) 1224 self.assertEqual(p.pow(6, 8), 6**8) 1225 1226 headers = self.RequestHandler.test_headers 1227 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1228 1229 def test_header_items(self): 1230 p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items()) 1231 self.assertEqual(p.pow(6, 8), 6**8) 1232 1233 headers = self.RequestHandler.test_headers 1234 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1235 1236 1237#Test special attributes of the ServerProxy object 1238class ServerProxyTestCase(unittest.TestCase): 1239 def setUp(self): 1240 unittest.TestCase.setUp(self) 1241 # Actual value of the URL doesn't matter if it is a string in 1242 # the correct format. 1243 self.url = 'http://fake.localhost' 1244 1245 def test_close(self): 1246 p = xmlrpclib.ServerProxy(self.url) 1247 self.assertEqual(p('close')(), None) 1248 1249 def test_transport(self): 1250 t = xmlrpclib.Transport() 1251 p = xmlrpclib.ServerProxy(self.url, transport=t) 1252 self.assertEqual(p('transport'), t) 1253 1254 1255# This is a contrived way to make a failure occur on the server side 1256# in order to test the _send_traceback_header flag on the server 1257class FailingMessageClass(http.client.HTTPMessage): 1258 def get(self, key, failobj=None): 1259 key = key.lower() 1260 if key == 'content-length': 1261 return 'I am broken' 1262 return super().get(key, failobj) 1263 1264 1265class FailingServerTestCase(unittest.TestCase): 1266 def setUp(self): 1267 self.evt = threading.Event() 1268 # start server thread to handle requests 1269 serv_args = (self.evt, 1) 1270 thread = threading.Thread(target=http_server, args=serv_args) 1271 thread.start() 1272 self.addCleanup(thread.join) 1273 1274 # wait for the server to be ready 1275 self.evt.wait() 1276 self.evt.clear() 1277 1278 def tearDown(self): 1279 # wait on the server thread to terminate 1280 self.evt.wait() 1281 # reset flag 1282 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1283 # reset message class 1284 default_class = http.client.HTTPMessage 1285 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1286 1287 def test_basic(self): 1288 # check that flag is false by default 1289 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1290 self.assertEqual(flagval, False) 1291 1292 # enable traceback reporting 1293 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1294 1295 # test a call that shouldn't fail just as a smoke test 1296 try: 1297 p = xmlrpclib.ServerProxy(URL) 1298 self.assertEqual(p.pow(6,8), 6**8) 1299 except (xmlrpclib.ProtocolError, OSError) as e: 1300 # ignore failures due to non-blocking socket 'unavailable' errors 1301 if not is_unavailable_exception(e): 1302 # protocol error; provide additional information in test output 1303 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1304 1305 def test_fail_no_info(self): 1306 # use the broken message class 1307 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1308 1309 try: 1310 p = xmlrpclib.ServerProxy(URL) 1311 p.pow(6,8) 1312 except (xmlrpclib.ProtocolError, OSError) as e: 1313 # ignore failures due to non-blocking socket 'unavailable' errors 1314 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1315 # The two server-side error headers shouldn't be sent back in this case 1316 self.assertTrue(e.headers.get("X-exception") is None) 1317 self.assertTrue(e.headers.get("X-traceback") is None) 1318 else: 1319 self.fail('ProtocolError not raised') 1320 1321 def test_fail_with_info(self): 1322 # use the broken message class 1323 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1324 1325 # Check that errors in the server send back exception/traceback 1326 # info when flag is set 1327 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1328 1329 try: 1330 p = xmlrpclib.ServerProxy(URL) 1331 p.pow(6,8) 1332 except (xmlrpclib.ProtocolError, OSError) as e: 1333 # ignore failures due to non-blocking socket 'unavailable' errors 1334 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1335 # We should get error info in the response 1336 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1337 self.assertEqual(e.headers.get("X-exception"), expected_err) 1338 self.assertTrue(e.headers.get("X-traceback") is not None) 1339 else: 1340 self.fail('ProtocolError not raised') 1341 1342 1343@contextlib.contextmanager 1344def captured_stdout(encoding='utf-8'): 1345 """A variation on support.captured_stdout() which gives a text stream 1346 having a `buffer` attribute. 1347 """ 1348 orig_stdout = sys.stdout 1349 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1350 try: 1351 yield sys.stdout 1352 finally: 1353 sys.stdout = orig_stdout 1354 1355 1356class CGIHandlerTestCase(unittest.TestCase): 1357 def setUp(self): 1358 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1359 1360 def tearDown(self): 1361 self.cgi = None 1362 1363 def test_cgi_get(self): 1364 with support.EnvironmentVarGuard() as env: 1365 env['REQUEST_METHOD'] = 'GET' 1366 # if the method is GET and no request_text is given, it runs handle_get 1367 # get sysout output 1368 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1369 self.cgi.handle_request() 1370 1371 # parse Status header 1372 data_out.seek(0) 1373 handle = data_out.read() 1374 status = handle.split()[1] 1375 message = ' '.join(handle.split()[2:4]) 1376 1377 self.assertEqual(status, '400') 1378 self.assertEqual(message, 'Bad Request') 1379 1380 1381 def test_cgi_xmlrpc_response(self): 1382 data = """<?xml version='1.0'?> 1383 <methodCall> 1384 <methodName>test_method</methodName> 1385 <params> 1386 <param> 1387 <value><string>foo</string></value> 1388 </param> 1389 <param> 1390 <value><string>bar</string></value> 1391 </param> 1392 </params> 1393 </methodCall> 1394 """ 1395 1396 with support.EnvironmentVarGuard() as env, \ 1397 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1398 support.captured_stdin() as data_in: 1399 data_in.write(data) 1400 data_in.seek(0) 1401 env['CONTENT_LENGTH'] = str(len(data)) 1402 self.cgi.handle_request() 1403 data_out.seek(0) 1404 1405 # will respond exception, if so, our goal is achieved ;) 1406 handle = data_out.read() 1407 1408 # start with 44th char so as not to get http header, we just 1409 # need only xml 1410 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1411 1412 # Also test the content-length returned by handle_request 1413 # Using the same test method inorder to avoid all the datapassing 1414 # boilerplate code. 1415 # Test for bug: http://bugs.python.org/issue5040 1416 1417 content = handle[handle.find("<?xml"):] 1418 1419 self.assertEqual( 1420 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1421 len(content)) 1422 1423 1424class UseBuiltinTypesTestCase(unittest.TestCase): 1425 1426 def test_use_builtin_types(self): 1427 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1428 # makes all dispatch of binary data as bytes instances, and all 1429 # dispatch of datetime argument as datetime.datetime instances. 1430 self.log = [] 1431 expected_bytes = b"my dog has fleas" 1432 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1433 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1434 def foobar(*args): 1435 self.log.extend(args) 1436 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1437 allow_none=True, encoding=None, use_builtin_types=True) 1438 handler.register_function(foobar) 1439 handler._marshaled_dispatch(marshaled) 1440 self.assertEqual(len(self.log), 2) 1441 mybytes, mydate = self.log 1442 self.assertEqual(self.log, [expected_bytes, expected_date]) 1443 self.assertIs(type(mydate), datetime.datetime) 1444 self.assertIs(type(mybytes), bytes) 1445 1446 def test_cgihandler_has_use_builtin_types_flag(self): 1447 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1448 self.assertTrue(handler.use_builtin_types) 1449 1450 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1451 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1452 use_builtin_types=True) 1453 server.server_close() 1454 self.assertTrue(server.use_builtin_types) 1455 1456 1457@support.reap_threads 1458def test_main(): 1459 support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 1460 BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase, 1461 SimpleServerTestCase, SimpleServerEncodingTestCase, 1462 KeepaliveServerTestCase1, KeepaliveServerTestCase2, 1463 GzipServerTestCase, GzipUtilTestCase, HeadersServerTestCase, 1464 MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase, 1465 CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase) 1466 1467 1468if __name__ == "__main__": 1469 test_main() 1470