1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import threading 14import re 15import io 16import contextlib 17from test import support 18 19try: 20 import gzip 21except ImportError: 22 gzip = None 23 24alist = [{'astring': 'foo@bar.baz.spam', 25 'afloat': 7283.43, 26 'anint': 2**20, 27 'ashortlong': 2, 28 'anotherlist': ['.zyx.41'], 29 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 30 'b64bytes': b"my dog has fleas", 31 'b64bytearray': bytearray(b"my dog has fleas"), 32 'boolean': False, 33 'unicode': '\u4000\u6000\u8000', 34 'ukey\u4000': 'regular value', 35 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 36 'datetime2': xmlrpclib.DateTime( 37 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 38 'datetime3': xmlrpclib.DateTime( 39 datetime.datetime(2005, 2, 10, 11, 41, 23)), 40 }] 41 42class XMLRPCTestCase(unittest.TestCase): 43 44 def test_dump_load(self): 45 dump = xmlrpclib.dumps((alist,)) 46 load = xmlrpclib.loads(dump) 47 self.assertEqual(alist, load[0][0]) 48 49 def test_dump_bare_datetime(self): 50 # This checks that an unwrapped datetime.date object can be handled 51 # by the marshalling code. This can't be done via test_dump_load() 52 # since with use_builtin_types set to 1 the unmarshaller would create 53 # datetime objects for the 'datetime[123]' keys as well 54 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 55 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 56 s = xmlrpclib.dumps((dt,)) 57 58 result, m = xmlrpclib.loads(s, use_builtin_types=True) 59 (newdt,) = result 60 self.assertEqual(newdt, dt) 61 self.assertIs(type(newdt), datetime.datetime) 62 self.assertIsNone(m) 63 64 result, m = xmlrpclib.loads(s, use_builtin_types=False) 65 (newdt,) = result 66 self.assertEqual(newdt, dt) 67 self.assertIs(type(newdt), xmlrpclib.DateTime) 68 self.assertIsNone(m) 69 70 result, m = xmlrpclib.loads(s, use_datetime=True) 71 (newdt,) = result 72 self.assertEqual(newdt, dt) 73 self.assertIs(type(newdt), datetime.datetime) 74 self.assertIsNone(m) 75 76 result, m = xmlrpclib.loads(s, use_datetime=False) 77 (newdt,) = result 78 self.assertEqual(newdt, dt) 79 self.assertIs(type(newdt), xmlrpclib.DateTime) 80 self.assertIsNone(m) 81 82 83 def test_datetime_before_1900(self): 84 # same as before but with a date before 1900 85 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 86 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 87 s = xmlrpclib.dumps((dt,)) 88 89 result, m = xmlrpclib.loads(s, use_builtin_types=True) 90 (newdt,) = result 91 self.assertEqual(newdt, dt) 92 self.assertIs(type(newdt), datetime.datetime) 93 self.assertIsNone(m) 94 95 result, m = xmlrpclib.loads(s, use_builtin_types=False) 96 (newdt,) = result 97 self.assertEqual(newdt, dt) 98 self.assertIs(type(newdt), xmlrpclib.DateTime) 99 self.assertIsNone(m) 100 101 def test_bug_1164912 (self): 102 d = xmlrpclib.DateTime() 103 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 104 methodresponse=True)) 105 self.assertIsInstance(new_d.value, str) 106 107 # Check that the output of dumps() is still an 8-bit string 108 s = xmlrpclib.dumps((new_d,), methodresponse=True) 109 self.assertIsInstance(s, str) 110 111 def test_newstyle_class(self): 112 class T(object): 113 pass 114 t = T() 115 t.x = 100 116 t.y = "Hello" 117 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 118 self.assertEqual(t2, t.__dict__) 119 120 def test_dump_big_long(self): 121 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 122 123 def test_dump_bad_dict(self): 124 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 125 126 def test_dump_recursive_seq(self): 127 l = [1,2,3] 128 t = [3,4,5,l] 129 l.append(t) 130 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 131 132 def test_dump_recursive_dict(self): 133 d = {'1':1, '2':1} 134 t = {'3':3, 'd':d} 135 d['t'] = t 136 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 137 138 def test_dump_big_int(self): 139 if sys.maxsize > 2**31-1: 140 self.assertRaises(OverflowError, xmlrpclib.dumps, 141 (int(2**34),)) 142 143 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 144 self.assertRaises(OverflowError, xmlrpclib.dumps, 145 (xmlrpclib.MAXINT+1,)) 146 self.assertRaises(OverflowError, xmlrpclib.dumps, 147 (xmlrpclib.MININT-1,)) 148 149 def dummy_write(s): 150 pass 151 152 m = xmlrpclib.Marshaller() 153 m.dump_int(xmlrpclib.MAXINT, dummy_write) 154 m.dump_int(xmlrpclib.MININT, dummy_write) 155 self.assertRaises(OverflowError, m.dump_int, 156 xmlrpclib.MAXINT+1, dummy_write) 157 self.assertRaises(OverflowError, m.dump_int, 158 xmlrpclib.MININT-1, dummy_write) 159 160 def test_dump_double(self): 161 xmlrpclib.dumps((float(2 ** 34),)) 162 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 163 float(xmlrpclib.MININT))) 164 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 165 float(xmlrpclib.MININT - 42))) 166 167 def dummy_write(s): 168 pass 169 170 m = xmlrpclib.Marshaller() 171 m.dump_double(xmlrpclib.MAXINT, dummy_write) 172 m.dump_double(xmlrpclib.MININT, dummy_write) 173 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 174 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 175 176 def test_dump_none(self): 177 value = alist + [None] 178 arg1 = (alist + [None],) 179 strg = xmlrpclib.dumps(arg1, allow_none=True) 180 self.assertEqual(value, 181 xmlrpclib.loads(strg)[0][0]) 182 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 183 184 def test_dump_encoding(self): 185 value = {'key\u20ac\xa4': 186 'value\u20ac\xa4'} 187 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 188 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 189 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 190 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 191 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 192 193 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 194 methodresponse=True) 195 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 196 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 197 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 198 199 methodname = 'method\u20ac\xa4' 200 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 201 methodname=methodname) 202 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 203 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 204 205 def test_dump_bytes(self): 206 sample = b"my dog has fleas" 207 self.assertEqual(sample, xmlrpclib.Binary(sample)) 208 for type_ in bytes, bytearray, xmlrpclib.Binary: 209 value = type_(sample) 210 s = xmlrpclib.dumps((value,)) 211 212 result, m = xmlrpclib.loads(s, use_builtin_types=True) 213 (newvalue,) = result 214 self.assertEqual(newvalue, sample) 215 self.assertIs(type(newvalue), bytes) 216 self.assertIsNone(m) 217 218 result, m = xmlrpclib.loads(s, use_builtin_types=False) 219 (newvalue,) = result 220 self.assertEqual(newvalue, sample) 221 self.assertIs(type(newvalue), xmlrpclib.Binary) 222 self.assertIsNone(m) 223 224 def test_loads_unsupported(self): 225 ResponseError = xmlrpclib.ResponseError 226 data = '<params><param><value><spam/></value></param></params>' 227 self.assertRaises(ResponseError, xmlrpclib.loads, data) 228 data = ('<params><param><value><array>' 229 '<value><spam/></value>' 230 '</array></value></param></params>') 231 self.assertRaises(ResponseError, xmlrpclib.loads, data) 232 data = ('<params><param><value><struct>' 233 '<member><name>a</name><value><spam/></value></member>' 234 '<member><name>b</name><value><spam/></value></member>' 235 '</struct></value></param></params>') 236 self.assertRaises(ResponseError, xmlrpclib.loads, data) 237 238 def check_loads(self, s, value, **kwargs): 239 dump = '<params><param><value>%s</value></param></params>' % s 240 result, m = xmlrpclib.loads(dump, **kwargs) 241 (newvalue,) = result 242 self.assertEqual(newvalue, value) 243 self.assertIs(type(newvalue), type(value)) 244 self.assertIsNone(m) 245 246 def test_load_standard_types(self): 247 check = self.check_loads 248 check('string', 'string') 249 check('<string>string</string>', 'string') 250 check('<string> string</string>', ' string') 251 check('<int>2056183947</int>', 2056183947) 252 check('<int>-2056183947</int>', -2056183947) 253 check('<i4>2056183947</i4>', 2056183947) 254 check('<double>46093.78125</double>', 46093.78125) 255 check('<boolean>0</boolean>', False) 256 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 257 xmlrpclib.Binary(b'\x00byte string\xff')) 258 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 259 b'\x00byte string\xff', use_builtin_types=True) 260 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 261 xmlrpclib.DateTime('20050210T11:41:23')) 262 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 263 datetime.datetime(2005, 2, 10, 11, 41, 23), 264 use_builtin_types=True) 265 check('<array><data>' 266 '<value><int>1</int></value><value><int>2</int></value>' 267 '</data></array>', [1, 2]) 268 check('<struct>' 269 '<member><name>b</name><value><int>2</int></value></member>' 270 '<member><name>a</name><value><int>1</int></value></member>' 271 '</struct>', {'a': 1, 'b': 2}) 272 273 def test_load_extension_types(self): 274 check = self.check_loads 275 check('<nil/>', None) 276 check('<ex:nil/>', None) 277 check('<i1>205</i1>', 205) 278 check('<i2>20561</i2>', 20561) 279 check('<i8>9876543210</i8>', 9876543210) 280 check('<biginteger>98765432100123456789</biginteger>', 281 98765432100123456789) 282 check('<float>93.78125</float>', 93.78125) 283 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 284 decimal.Decimal('9876543210.0123456789')) 285 286 def test_get_host_info(self): 287 # see bug #3613, this raised a TypeError 288 transp = xmlrpc.client.Transport() 289 self.assertEqual(transp.get_host_info("user@host.tld"), 290 ('host.tld', 291 [('Authorization', 'Basic dXNlcg==')], {})) 292 293 def test_ssl_presence(self): 294 try: 295 import ssl 296 except ImportError: 297 has_ssl = False 298 else: 299 has_ssl = True 300 try: 301 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 302 except NotImplementedError: 303 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 304 except OSError: 305 self.assertTrue(has_ssl) 306 307 def test_keepalive_disconnect(self): 308 class RequestHandler(http.server.BaseHTTPRequestHandler): 309 protocol_version = "HTTP/1.1" 310 handled = False 311 312 def do_POST(self): 313 length = int(self.headers.get("Content-Length")) 314 self.rfile.read(length) 315 if self.handled: 316 self.close_connection = True 317 return 318 response = xmlrpclib.dumps((5,), methodresponse=True) 319 response = response.encode() 320 self.send_response(http.HTTPStatus.OK) 321 self.send_header("Content-Length", len(response)) 322 self.end_headers() 323 self.wfile.write(response) 324 self.handled = True 325 self.close_connection = False 326 327 def log_message(self, format, *args): 328 # don't clobber sys.stderr 329 pass 330 331 def run_server(): 332 server.socket.settimeout(float(1)) # Don't hang if client fails 333 server.handle_request() # First request and attempt at second 334 server.handle_request() # Retried second request 335 336 server = http.server.HTTPServer((support.HOST, 0), RequestHandler) 337 self.addCleanup(server.server_close) 338 thread = threading.Thread(target=run_server) 339 thread.start() 340 self.addCleanup(thread.join) 341 url = "http://{}:{}/".format(*server.server_address) 342 with xmlrpclib.ServerProxy(url) as p: 343 self.assertEqual(p.method(), 5) 344 self.assertEqual(p.method(), 5) 345 346 347class SimpleXMLRPCDispatcherTestCase(unittest.TestCase): 348 class DispatchExc(Exception): 349 """Raised inside the dispatched functions when checking for 350 chained exceptions""" 351 352 def test_call_registered_func(self): 353 """Calls explicitly registered function""" 354 # Makes sure any exception raised inside the function has no other 355 # exception chained to it 356 357 exp_params = 1, 2, 3 358 359 def dispatched_func(*params): 360 raise self.DispatchExc(params) 361 362 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 363 dispatcher.register_function(dispatched_func) 364 with self.assertRaises(self.DispatchExc) as exc_ctx: 365 dispatcher._dispatch('dispatched_func', exp_params) 366 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 367 self.assertIsNone(exc_ctx.exception.__cause__) 368 self.assertIsNone(exc_ctx.exception.__context__) 369 370 def test_call_instance_func(self): 371 """Calls a registered instance attribute as a function""" 372 # Makes sure any exception raised inside the function has no other 373 # exception chained to it 374 375 exp_params = 1, 2, 3 376 377 class DispatchedClass: 378 def dispatched_func(self, *params): 379 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params) 380 381 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 382 dispatcher.register_instance(DispatchedClass()) 383 with self.assertRaises(self.DispatchExc) as exc_ctx: 384 dispatcher._dispatch('dispatched_func', exp_params) 385 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 386 self.assertIsNone(exc_ctx.exception.__cause__) 387 self.assertIsNone(exc_ctx.exception.__context__) 388 389 def test_call_dispatch_func(self): 390 """Calls the registered instance's `_dispatch` function""" 391 # Makes sure any exception raised inside the function has no other 392 # exception chained to it 393 394 exp_method = 'method' 395 exp_params = 1, 2, 3 396 397 class TestInstance: 398 def _dispatch(self, method, params): 399 raise SimpleXMLRPCDispatcherTestCase.DispatchExc( 400 method, params) 401 402 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 403 dispatcher.register_instance(TestInstance()) 404 with self.assertRaises(self.DispatchExc) as exc_ctx: 405 dispatcher._dispatch(exp_method, exp_params) 406 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params)) 407 self.assertIsNone(exc_ctx.exception.__cause__) 408 self.assertIsNone(exc_ctx.exception.__context__) 409 410 def test_registered_func_is_none(self): 411 """Calls explicitly registered function which is None""" 412 413 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 414 dispatcher.register_function(None, name='method') 415 with self.assertRaisesRegex(Exception, 'method'): 416 dispatcher._dispatch('method', ('param',)) 417 418 def test_instance_has_no_func(self): 419 """Attempts to call nonexistent function on a registered instance""" 420 421 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 422 dispatcher.register_instance(object()) 423 with self.assertRaisesRegex(Exception, 'method'): 424 dispatcher._dispatch('method', ('param',)) 425 426 def test_cannot_locate_func(self): 427 """Calls a function that the dispatcher cannot locate""" 428 429 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 430 with self.assertRaisesRegex(Exception, 'method'): 431 dispatcher._dispatch('method', ('param',)) 432 433 434class HelperTestCase(unittest.TestCase): 435 def test_escape(self): 436 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 437 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 438 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 439 440class FaultTestCase(unittest.TestCase): 441 def test_repr(self): 442 f = xmlrpclib.Fault(42, 'Test Fault') 443 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 444 self.assertEqual(repr(f), str(f)) 445 446 def test_dump_fault(self): 447 f = xmlrpclib.Fault(42, 'Test Fault') 448 s = xmlrpclib.dumps((f,)) 449 (newf,), m = xmlrpclib.loads(s) 450 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 451 self.assertEqual(m, None) 452 453 s = xmlrpclib.Marshaller().dumps(f) 454 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 455 456 def test_dotted_attribute(self): 457 # this will raise AttributeError because code don't want us to use 458 # private methods 459 self.assertRaises(AttributeError, 460 xmlrpc.server.resolve_dotted_attribute, str, '__add') 461 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 462 463class DateTimeTestCase(unittest.TestCase): 464 def test_default(self): 465 with mock.patch('time.localtime') as localtime_mock: 466 time_struct = time.struct_time( 467 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 468 localtime_mock.return_value = time_struct 469 localtime = time.localtime() 470 t = xmlrpclib.DateTime() 471 self.assertEqual(str(t), 472 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 473 474 def test_time(self): 475 d = 1181399930.036952 476 t = xmlrpclib.DateTime(d) 477 self.assertEqual(str(t), 478 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 479 480 def test_time_tuple(self): 481 d = (2007,6,9,10,38,50,5,160,0) 482 t = xmlrpclib.DateTime(d) 483 self.assertEqual(str(t), '20070609T10:38:50') 484 485 def test_time_struct(self): 486 d = time.localtime(1181399930.036952) 487 t = xmlrpclib.DateTime(d) 488 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 489 490 def test_datetime_datetime(self): 491 d = datetime.datetime(2007,1,2,3,4,5) 492 t = xmlrpclib.DateTime(d) 493 self.assertEqual(str(t), '20070102T03:04:05') 494 495 def test_repr(self): 496 d = datetime.datetime(2007,1,2,3,4,5) 497 t = xmlrpclib.DateTime(d) 498 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 499 self.assertEqual(repr(t), val) 500 501 def test_decode(self): 502 d = ' 20070908T07:11:13 ' 503 t1 = xmlrpclib.DateTime() 504 t1.decode(d) 505 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 506 self.assertEqual(t1, tref) 507 508 t2 = xmlrpclib._datetime(d) 509 self.assertEqual(t2, tref) 510 511 def test_comparison(self): 512 now = datetime.datetime.now() 513 dtime = xmlrpclib.DateTime(now.timetuple()) 514 515 # datetime vs. DateTime 516 self.assertTrue(dtime == now) 517 self.assertTrue(now == dtime) 518 then = now + datetime.timedelta(seconds=4) 519 self.assertTrue(then >= dtime) 520 self.assertTrue(dtime < then) 521 522 # str vs. DateTime 523 dstr = now.strftime("%Y%m%dT%H:%M:%S") 524 self.assertTrue(dtime == dstr) 525 self.assertTrue(dstr == dtime) 526 dtime_then = xmlrpclib.DateTime(then.timetuple()) 527 self.assertTrue(dtime_then >= dstr) 528 self.assertTrue(dstr < dtime_then) 529 530 # some other types 531 dbytes = dstr.encode('ascii') 532 dtuple = now.timetuple() 533 with self.assertRaises(TypeError): 534 dtime == 1970 535 with self.assertRaises(TypeError): 536 dtime != dbytes 537 with self.assertRaises(TypeError): 538 dtime == bytearray(dbytes) 539 with self.assertRaises(TypeError): 540 dtime != dtuple 541 with self.assertRaises(TypeError): 542 dtime < float(1970) 543 with self.assertRaises(TypeError): 544 dtime > dbytes 545 with self.assertRaises(TypeError): 546 dtime <= bytearray(dbytes) 547 with self.assertRaises(TypeError): 548 dtime >= dtuple 549 550class BinaryTestCase(unittest.TestCase): 551 552 # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" 553 # for now (i.e. interpreting the binary data as Latin-1-encoded 554 # text). But this feels very unsatisfactory. Perhaps we should 555 # only define repr(), and return r"Binary(b'\xff')" instead? 556 557 def test_default(self): 558 t = xmlrpclib.Binary() 559 self.assertEqual(str(t), '') 560 561 def test_string(self): 562 d = b'\x01\x02\x03abc123\xff\xfe' 563 t = xmlrpclib.Binary(d) 564 self.assertEqual(str(t), str(d, "latin-1")) 565 566 def test_decode(self): 567 d = b'\x01\x02\x03abc123\xff\xfe' 568 de = base64.encodebytes(d) 569 t1 = xmlrpclib.Binary() 570 t1.decode(de) 571 self.assertEqual(str(t1), str(d, "latin-1")) 572 573 t2 = xmlrpclib._binary(de) 574 self.assertEqual(str(t2), str(d, "latin-1")) 575 576 577ADDR = PORT = URL = None 578 579# The evt is set twice. First when the server is ready to serve. 580# Second when the server has been shutdown. The user must clear 581# the event after it has been set the first time to catch the second set. 582def http_server(evt, numrequests, requestHandler=None, encoding=None): 583 class TestInstanceClass: 584 def div(self, x, y): 585 return x // y 586 587 def _methodHelp(self, name): 588 if name == 'div': 589 return 'This is the div function' 590 591 class Fixture: 592 @staticmethod 593 def getData(): 594 return '42' 595 596 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 597 def get_request(self): 598 # Ensure the socket is always non-blocking. On Linux, socket 599 # attributes are not inherited like they are on *BSD and Windows. 600 s, port = self.socket.accept() 601 s.setblocking(True) 602 return s, port 603 604 if not requestHandler: 605 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 606 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 607 encoding=encoding, 608 logRequests=False, bind_and_activate=False) 609 try: 610 serv.server_bind() 611 global ADDR, PORT, URL 612 ADDR, PORT = serv.socket.getsockname() 613 #connect to IP address directly. This avoids socket.create_connection() 614 #trying to connect to "localhost" using all address families, which 615 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 616 #on AF_INET only. 617 URL = "http://%s:%d"%(ADDR, PORT) 618 serv.server_activate() 619 serv.register_introspection_functions() 620 serv.register_multicall_functions() 621 serv.register_function(pow) 622 serv.register_function(lambda x: x, 'têšt') 623 @serv.register_function 624 def my_function(): 625 '''This is my function''' 626 return True 627 @serv.register_function(name='add') 628 def _(x, y): 629 return x + y 630 testInstance = TestInstanceClass() 631 serv.register_instance(testInstance, allow_dotted_names=True) 632 evt.set() 633 634 # handle up to 'numrequests' requests 635 while numrequests > 0: 636 serv.handle_request() 637 numrequests -= 1 638 639 except socket.timeout: 640 pass 641 finally: 642 serv.socket.close() 643 PORT = None 644 evt.set() 645 646def http_multi_server(evt, numrequests, requestHandler=None): 647 class TestInstanceClass: 648 def div(self, x, y): 649 return x // y 650 651 def _methodHelp(self, name): 652 if name == 'div': 653 return 'This is the div function' 654 655 def my_function(): 656 '''This is my function''' 657 return True 658 659 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 660 def get_request(self): 661 # Ensure the socket is always non-blocking. On Linux, socket 662 # attributes are not inherited like they are on *BSD and Windows. 663 s, port = self.socket.accept() 664 s.setblocking(True) 665 return s, port 666 667 if not requestHandler: 668 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 669 class MyRequestHandler(requestHandler): 670 rpc_paths = [] 671 672 class BrokenDispatcher: 673 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 674 raise RuntimeError("broken dispatcher") 675 676 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 677 logRequests=False, bind_and_activate=False) 678 serv.socket.settimeout(3) 679 serv.server_bind() 680 try: 681 global ADDR, PORT, URL 682 ADDR, PORT = serv.socket.getsockname() 683 #connect to IP address directly. This avoids socket.create_connection() 684 #trying to connect to "localhost" using all address families, which 685 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 686 #on AF_INET only. 687 URL = "http://%s:%d"%(ADDR, PORT) 688 serv.server_activate() 689 paths = ["/foo", "/foo/bar"] 690 for path in paths: 691 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 692 d.register_introspection_functions() 693 d.register_multicall_functions() 694 serv.get_dispatcher(paths[0]).register_function(pow) 695 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 696 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 697 evt.set() 698 699 # handle up to 'numrequests' requests 700 while numrequests > 0: 701 serv.handle_request() 702 numrequests -= 1 703 704 except socket.timeout: 705 pass 706 finally: 707 serv.socket.close() 708 PORT = None 709 evt.set() 710 711# This function prevents errors like: 712# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 713def is_unavailable_exception(e): 714 '''Returns True if the given ProtocolError is the product of a server-side 715 exception caused by the 'temporarily unavailable' response sometimes 716 given by operations on non-blocking sockets.''' 717 718 # sometimes we get a -1 error code and/or empty headers 719 try: 720 if e.errcode == -1 or e.headers is None: 721 return True 722 exc_mess = e.headers.get('X-exception') 723 except AttributeError: 724 # Ignore OSErrors here. 725 exc_mess = str(e) 726 727 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 728 return True 729 730def make_request_and_skipIf(condition, reason): 731 # If we skip the test, we have to make a request because 732 # the server created in setUp blocks expecting one to come in. 733 if not condition: 734 return lambda func: func 735 def decorator(func): 736 def make_request_and_skip(self): 737 try: 738 xmlrpclib.ServerProxy(URL).my_function() 739 except (xmlrpclib.ProtocolError, OSError) as e: 740 if not is_unavailable_exception(e): 741 raise 742 raise unittest.SkipTest(reason) 743 return make_request_and_skip 744 return decorator 745 746class BaseServerTestCase(unittest.TestCase): 747 requestHandler = None 748 request_count = 1 749 threadFunc = staticmethod(http_server) 750 751 def setUp(self): 752 # enable traceback reporting 753 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 754 755 self.evt = threading.Event() 756 # start server thread to handle requests 757 serv_args = (self.evt, self.request_count, self.requestHandler) 758 thread = threading.Thread(target=self.threadFunc, args=serv_args) 759 thread.start() 760 self.addCleanup(thread.join) 761 762 # wait for the server to be ready 763 self.evt.wait() 764 self.evt.clear() 765 766 def tearDown(self): 767 # wait on the server thread to terminate 768 self.evt.wait() 769 770 # disable traceback reporting 771 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 772 773class SimpleServerTestCase(BaseServerTestCase): 774 def test_simple1(self): 775 try: 776 p = xmlrpclib.ServerProxy(URL) 777 self.assertEqual(p.pow(6,8), 6**8) 778 except (xmlrpclib.ProtocolError, OSError) as e: 779 # ignore failures due to non-blocking socket 'unavailable' errors 780 if not is_unavailable_exception(e): 781 # protocol error; provide additional information in test output 782 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 783 784 def test_nonascii(self): 785 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 786 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 787 try: 788 p = xmlrpclib.ServerProxy(URL) 789 self.assertEqual(p.add(start_string, end_string), 790 start_string + end_string) 791 except (xmlrpclib.ProtocolError, OSError) as e: 792 # ignore failures due to non-blocking socket 'unavailable' errors 793 if not is_unavailable_exception(e): 794 # protocol error; provide additional information in test output 795 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 796 797 def test_client_encoding(self): 798 start_string = '\u20ac' 799 end_string = '\xa4' 800 801 try: 802 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 803 self.assertEqual(p.add(start_string, end_string), 804 start_string + end_string) 805 except (xmlrpclib.ProtocolError, socket.error) as e: 806 # ignore failures due to non-blocking socket unavailable errors. 807 if not is_unavailable_exception(e): 808 # protocol error; provide additional information in test output 809 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 810 811 def test_nonascii_methodname(self): 812 try: 813 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 814 self.assertEqual(p.têšt(42), 42) 815 except (xmlrpclib.ProtocolError, socket.error) as e: 816 # ignore failures due to non-blocking socket unavailable errors. 817 if not is_unavailable_exception(e): 818 # protocol error; provide additional information in test output 819 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 820 821 def test_404(self): 822 # send POST with http.client, it should return 404 header and 823 # 'Not Found' message. 824 conn = http.client.HTTPConnection(ADDR, PORT) 825 conn.request('POST', '/this-is-not-valid') 826 response = conn.getresponse() 827 conn.close() 828 829 self.assertEqual(response.status, 404) 830 self.assertEqual(response.reason, 'Not Found') 831 832 def test_introspection1(self): 833 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 834 'system.listMethods', 'system.methodHelp', 835 'system.methodSignature', 'system.multicall', 836 'Fixture']) 837 try: 838 p = xmlrpclib.ServerProxy(URL) 839 meth = p.system.listMethods() 840 self.assertEqual(set(meth), expected_methods) 841 except (xmlrpclib.ProtocolError, OSError) as e: 842 # ignore failures due to non-blocking socket 'unavailable' errors 843 if not is_unavailable_exception(e): 844 # protocol error; provide additional information in test output 845 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 846 847 848 def test_introspection2(self): 849 try: 850 # test _methodHelp() 851 p = xmlrpclib.ServerProxy(URL) 852 divhelp = p.system.methodHelp('div') 853 self.assertEqual(divhelp, 'This is the div function') 854 except (xmlrpclib.ProtocolError, OSError) as e: 855 # ignore failures due to non-blocking socket 'unavailable' errors 856 if not is_unavailable_exception(e): 857 # protocol error; provide additional information in test output 858 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 859 860 @make_request_and_skipIf(sys.flags.optimize >= 2, 861 "Docstrings are omitted with -O2 and above") 862 def test_introspection3(self): 863 try: 864 # test native doc 865 p = xmlrpclib.ServerProxy(URL) 866 myfunction = p.system.methodHelp('my_function') 867 self.assertEqual(myfunction, 'This is my function') 868 except (xmlrpclib.ProtocolError, OSError) as e: 869 # ignore failures due to non-blocking socket 'unavailable' errors 870 if not is_unavailable_exception(e): 871 # protocol error; provide additional information in test output 872 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 873 874 def test_introspection4(self): 875 # the SimpleXMLRPCServer doesn't support signatures, but 876 # at least check that we can try making the call 877 try: 878 p = xmlrpclib.ServerProxy(URL) 879 divsig = p.system.methodSignature('div') 880 self.assertEqual(divsig, 'signatures not supported') 881 except (xmlrpclib.ProtocolError, OSError) as e: 882 # ignore failures due to non-blocking socket 'unavailable' errors 883 if not is_unavailable_exception(e): 884 # protocol error; provide additional information in test output 885 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 886 887 def test_multicall(self): 888 try: 889 p = xmlrpclib.ServerProxy(URL) 890 multicall = xmlrpclib.MultiCall(p) 891 multicall.add(2,3) 892 multicall.pow(6,8) 893 multicall.div(127,42) 894 add_result, pow_result, div_result = multicall() 895 self.assertEqual(add_result, 2+3) 896 self.assertEqual(pow_result, 6**8) 897 self.assertEqual(div_result, 127//42) 898 except (xmlrpclib.ProtocolError, OSError) as e: 899 # ignore failures due to non-blocking socket 'unavailable' errors 900 if not is_unavailable_exception(e): 901 # protocol error; provide additional information in test output 902 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 903 904 def test_non_existing_multicall(self): 905 try: 906 p = xmlrpclib.ServerProxy(URL) 907 multicall = xmlrpclib.MultiCall(p) 908 multicall.this_is_not_exists() 909 result = multicall() 910 911 # result.results contains; 912 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 913 # 'method "this_is_not_exists" is not supported'>}] 914 915 self.assertEqual(result.results[0]['faultCode'], 1) 916 self.assertEqual(result.results[0]['faultString'], 917 '<class \'Exception\'>:method "this_is_not_exists" ' 918 'is not supported') 919 except (xmlrpclib.ProtocolError, OSError) as e: 920 # ignore failures due to non-blocking socket 'unavailable' errors 921 if not is_unavailable_exception(e): 922 # protocol error; provide additional information in test output 923 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 924 925 def test_dotted_attribute(self): 926 # Raises an AttributeError because private methods are not allowed. 927 self.assertRaises(AttributeError, 928 xmlrpc.server.resolve_dotted_attribute, str, '__add') 929 930 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 931 # Get the test to run faster by sending a request with test_simple1. 932 # This avoids waiting for the socket timeout. 933 self.test_simple1() 934 935 def test_allow_dotted_names_true(self): 936 # XXX also need allow_dotted_names_false test. 937 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 938 data = server.Fixture.getData() 939 self.assertEqual(data, '42') 940 941 def test_unicode_host(self): 942 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 943 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 944 945 def test_partial_post(self): 946 # Check that a partial POST doesn't make the server loop: issue #14001. 947 conn = http.client.HTTPConnection(ADDR, PORT) 948 conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') 949 conn.close() 950 951 def test_context_manager(self): 952 with xmlrpclib.ServerProxy(URL) as server: 953 server.add(2, 3) 954 self.assertNotEqual(server('transport')._connection, 955 (None, None)) 956 self.assertEqual(server('transport')._connection, 957 (None, None)) 958 959 def test_context_manager_method_error(self): 960 try: 961 with xmlrpclib.ServerProxy(URL) as server: 962 server.add(2, "a") 963 except xmlrpclib.Fault: 964 pass 965 self.assertEqual(server('transport')._connection, 966 (None, None)) 967 968 969class SimpleServerEncodingTestCase(BaseServerTestCase): 970 @staticmethod 971 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 972 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 973 974 def test_server_encoding(self): 975 start_string = '\u20ac' 976 end_string = '\xa4' 977 978 try: 979 p = xmlrpclib.ServerProxy(URL) 980 self.assertEqual(p.add(start_string, end_string), 981 start_string + end_string) 982 except (xmlrpclib.ProtocolError, socket.error) as e: 983 # ignore failures due to non-blocking socket unavailable errors. 984 if not is_unavailable_exception(e): 985 # protocol error; provide additional information in test output 986 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 987 988 989class MultiPathServerTestCase(BaseServerTestCase): 990 threadFunc = staticmethod(http_multi_server) 991 request_count = 2 992 def test_path1(self): 993 p = xmlrpclib.ServerProxy(URL+"/foo") 994 self.assertEqual(p.pow(6,8), 6**8) 995 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 996 997 def test_path2(self): 998 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 999 self.assertEqual(p.add(6,8), 6+8) 1000 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 1001 1002 def test_path3(self): 1003 p = xmlrpclib.ServerProxy(URL+"/is/broken") 1004 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1005 1006#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1007#does indeed serve subsequent requests on the same connection 1008class BaseKeepaliveServerTestCase(BaseServerTestCase): 1009 #a request handler that supports keep-alive and logs requests into a 1010 #class variable 1011 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1012 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1013 protocol_version = 'HTTP/1.1' 1014 myRequests = [] 1015 def handle(self): 1016 self.myRequests.append([]) 1017 self.reqidx = len(self.myRequests)-1 1018 return self.parentClass.handle(self) 1019 def handle_one_request(self): 1020 result = self.parentClass.handle_one_request(self) 1021 self.myRequests[self.reqidx].append(self.raw_requestline) 1022 return result 1023 1024 requestHandler = RequestHandler 1025 def setUp(self): 1026 #clear request log 1027 self.RequestHandler.myRequests = [] 1028 return BaseServerTestCase.setUp(self) 1029 1030#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1031#does indeed serve subsequent requests on the same connection 1032class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 1033 def test_two(self): 1034 p = xmlrpclib.ServerProxy(URL) 1035 #do three requests. 1036 self.assertEqual(p.pow(6,8), 6**8) 1037 self.assertEqual(p.pow(6,8), 6**8) 1038 self.assertEqual(p.pow(6,8), 6**8) 1039 p("close")() 1040 1041 #they should have all been handled by a single request handler 1042 self.assertEqual(len(self.RequestHandler.myRequests), 1) 1043 1044 #check that we did at least two (the third may be pending append 1045 #due to thread scheduling) 1046 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1047 1048 1049#test special attribute access on the serverproxy, through the __call__ 1050#function. 1051class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 1052 #ask for two keepalive requests to be handled. 1053 request_count=2 1054 1055 def test_close(self): 1056 p = xmlrpclib.ServerProxy(URL) 1057 #do some requests with close. 1058 self.assertEqual(p.pow(6,8), 6**8) 1059 self.assertEqual(p.pow(6,8), 6**8) 1060 self.assertEqual(p.pow(6,8), 6**8) 1061 p("close")() #this should trigger a new keep-alive request 1062 self.assertEqual(p.pow(6,8), 6**8) 1063 self.assertEqual(p.pow(6,8), 6**8) 1064 self.assertEqual(p.pow(6,8), 6**8) 1065 p("close")() 1066 1067 #they should have all been two request handlers, each having logged at least 1068 #two complete requests 1069 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1070 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1071 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 1072 1073 1074 def test_transport(self): 1075 p = xmlrpclib.ServerProxy(URL) 1076 #do some requests with close. 1077 self.assertEqual(p.pow(6,8), 6**8) 1078 p("transport").close() #same as above, really. 1079 self.assertEqual(p.pow(6,8), 6**8) 1080 p("close")() 1081 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1082 1083#A test case that verifies that gzip encoding works in both directions 1084#(for a request and the response) 1085@unittest.skipIf(gzip is None, 'requires gzip') 1086class GzipServerTestCase(BaseServerTestCase): 1087 #a request handler that supports keep-alive and logs requests into a 1088 #class variable 1089 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1090 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1091 protocol_version = 'HTTP/1.1' 1092 1093 def do_POST(self): 1094 #store content of last request in class 1095 self.__class__.content_length = int(self.headers["content-length"]) 1096 return self.parentClass.do_POST(self) 1097 requestHandler = RequestHandler 1098 1099 class Transport(xmlrpclib.Transport): 1100 #custom transport, stores the response length for our perusal 1101 fake_gzip = False 1102 def parse_response(self, response): 1103 self.response_length=int(response.getheader("content-length", 0)) 1104 return xmlrpclib.Transport.parse_response(self, response) 1105 1106 def send_content(self, connection, body): 1107 if self.fake_gzip: 1108 #add a lone gzip header to induce decode error remotely 1109 connection.putheader("Content-Encoding", "gzip") 1110 return xmlrpclib.Transport.send_content(self, connection, body) 1111 1112 def setUp(self): 1113 BaseServerTestCase.setUp(self) 1114 1115 def test_gzip_request(self): 1116 t = self.Transport() 1117 t.encode_threshold = None 1118 p = xmlrpclib.ServerProxy(URL, transport=t) 1119 self.assertEqual(p.pow(6,8), 6**8) 1120 a = self.RequestHandler.content_length 1121 t.encode_threshold = 0 #turn on request encoding 1122 self.assertEqual(p.pow(6,8), 6**8) 1123 b = self.RequestHandler.content_length 1124 self.assertTrue(a>b) 1125 p("close")() 1126 1127 def test_bad_gzip_request(self): 1128 t = self.Transport() 1129 t.encode_threshold = None 1130 t.fake_gzip = True 1131 p = xmlrpclib.ServerProxy(URL, transport=t) 1132 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1133 re.compile(r"\b400\b")) 1134 with cm: 1135 p.pow(6, 8) 1136 p("close")() 1137 1138 def test_gzip_response(self): 1139 t = self.Transport() 1140 p = xmlrpclib.ServerProxy(URL, transport=t) 1141 old = self.requestHandler.encode_threshold 1142 self.requestHandler.encode_threshold = None #no encoding 1143 self.assertEqual(p.pow(6,8), 6**8) 1144 a = t.response_length 1145 self.requestHandler.encode_threshold = 0 #always encode 1146 self.assertEqual(p.pow(6,8), 6**8) 1147 p("close")() 1148 b = t.response_length 1149 self.requestHandler.encode_threshold = old 1150 self.assertTrue(a>b) 1151 1152 1153@unittest.skipIf(gzip is None, 'requires gzip') 1154class GzipUtilTestCase(unittest.TestCase): 1155 1156 def test_gzip_decode_limit(self): 1157 max_gzip_decode = 20 * 1024 * 1024 1158 data = b'\0' * max_gzip_decode 1159 encoded = xmlrpclib.gzip_encode(data) 1160 decoded = xmlrpclib.gzip_decode(encoded) 1161 self.assertEqual(len(decoded), max_gzip_decode) 1162 1163 data = b'\0' * (max_gzip_decode + 1) 1164 encoded = xmlrpclib.gzip_encode(data) 1165 1166 with self.assertRaisesRegex(ValueError, 1167 "max gzipped payload length exceeded"): 1168 xmlrpclib.gzip_decode(encoded) 1169 1170 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1171 1172 1173#Test special attributes of the ServerProxy object 1174class ServerProxyTestCase(unittest.TestCase): 1175 def setUp(self): 1176 unittest.TestCase.setUp(self) 1177 # Actual value of the URL doesn't matter if it is a string in 1178 # the correct format. 1179 self.url = 'http://fake.localhost' 1180 1181 def test_close(self): 1182 p = xmlrpclib.ServerProxy(self.url) 1183 self.assertEqual(p('close')(), None) 1184 1185 def test_transport(self): 1186 t = xmlrpclib.Transport() 1187 p = xmlrpclib.ServerProxy(self.url, transport=t) 1188 self.assertEqual(p('transport'), t) 1189 1190 1191# This is a contrived way to make a failure occur on the server side 1192# in order to test the _send_traceback_header flag on the server 1193class FailingMessageClass(http.client.HTTPMessage): 1194 def get(self, key, failobj=None): 1195 key = key.lower() 1196 if key == 'content-length': 1197 return 'I am broken' 1198 return super().get(key, failobj) 1199 1200 1201class FailingServerTestCase(unittest.TestCase): 1202 def setUp(self): 1203 self.evt = threading.Event() 1204 # start server thread to handle requests 1205 serv_args = (self.evt, 1) 1206 thread = threading.Thread(target=http_server, args=serv_args) 1207 thread.start() 1208 self.addCleanup(thread.join) 1209 1210 # wait for the server to be ready 1211 self.evt.wait() 1212 self.evt.clear() 1213 1214 def tearDown(self): 1215 # wait on the server thread to terminate 1216 self.evt.wait() 1217 # reset flag 1218 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1219 # reset message class 1220 default_class = http.client.HTTPMessage 1221 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1222 1223 def test_basic(self): 1224 # check that flag is false by default 1225 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1226 self.assertEqual(flagval, False) 1227 1228 # enable traceback reporting 1229 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1230 1231 # test a call that shouldn't fail just as a smoke test 1232 try: 1233 p = xmlrpclib.ServerProxy(URL) 1234 self.assertEqual(p.pow(6,8), 6**8) 1235 except (xmlrpclib.ProtocolError, OSError) as e: 1236 # ignore failures due to non-blocking socket 'unavailable' errors 1237 if not is_unavailable_exception(e): 1238 # protocol error; provide additional information in test output 1239 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1240 1241 def test_fail_no_info(self): 1242 # use the broken message class 1243 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1244 1245 try: 1246 p = xmlrpclib.ServerProxy(URL) 1247 p.pow(6,8) 1248 except (xmlrpclib.ProtocolError, OSError) as e: 1249 # ignore failures due to non-blocking socket 'unavailable' errors 1250 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1251 # The two server-side error headers shouldn't be sent back in this case 1252 self.assertTrue(e.headers.get("X-exception") is None) 1253 self.assertTrue(e.headers.get("X-traceback") is None) 1254 else: 1255 self.fail('ProtocolError not raised') 1256 1257 def test_fail_with_info(self): 1258 # use the broken message class 1259 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1260 1261 # Check that errors in the server send back exception/traceback 1262 # info when flag is set 1263 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1264 1265 try: 1266 p = xmlrpclib.ServerProxy(URL) 1267 p.pow(6,8) 1268 except (xmlrpclib.ProtocolError, OSError) as e: 1269 # ignore failures due to non-blocking socket 'unavailable' errors 1270 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1271 # We should get error info in the response 1272 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1273 self.assertEqual(e.headers.get("X-exception"), expected_err) 1274 self.assertTrue(e.headers.get("X-traceback") is not None) 1275 else: 1276 self.fail('ProtocolError not raised') 1277 1278 1279@contextlib.contextmanager 1280def captured_stdout(encoding='utf-8'): 1281 """A variation on support.captured_stdout() which gives a text stream 1282 having a `buffer` attribute. 1283 """ 1284 orig_stdout = sys.stdout 1285 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1286 try: 1287 yield sys.stdout 1288 finally: 1289 sys.stdout = orig_stdout 1290 1291 1292class CGIHandlerTestCase(unittest.TestCase): 1293 def setUp(self): 1294 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1295 1296 def tearDown(self): 1297 self.cgi = None 1298 1299 def test_cgi_get(self): 1300 with support.EnvironmentVarGuard() as env: 1301 env['REQUEST_METHOD'] = 'GET' 1302 # if the method is GET and no request_text is given, it runs handle_get 1303 # get sysout output 1304 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1305 self.cgi.handle_request() 1306 1307 # parse Status header 1308 data_out.seek(0) 1309 handle = data_out.read() 1310 status = handle.split()[1] 1311 message = ' '.join(handle.split()[2:4]) 1312 1313 self.assertEqual(status, '400') 1314 self.assertEqual(message, 'Bad Request') 1315 1316 1317 def test_cgi_xmlrpc_response(self): 1318 data = """<?xml version='1.0'?> 1319 <methodCall> 1320 <methodName>test_method</methodName> 1321 <params> 1322 <param> 1323 <value><string>foo</string></value> 1324 </param> 1325 <param> 1326 <value><string>bar</string></value> 1327 </param> 1328 </params> 1329 </methodCall> 1330 """ 1331 1332 with support.EnvironmentVarGuard() as env, \ 1333 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1334 support.captured_stdin() as data_in: 1335 data_in.write(data) 1336 data_in.seek(0) 1337 env['CONTENT_LENGTH'] = str(len(data)) 1338 self.cgi.handle_request() 1339 data_out.seek(0) 1340 1341 # will respond exception, if so, our goal is achieved ;) 1342 handle = data_out.read() 1343 1344 # start with 44th char so as not to get http header, we just 1345 # need only xml 1346 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1347 1348 # Also test the content-length returned by handle_request 1349 # Using the same test method inorder to avoid all the datapassing 1350 # boilerplate code. 1351 # Test for bug: http://bugs.python.org/issue5040 1352 1353 content = handle[handle.find("<?xml"):] 1354 1355 self.assertEqual( 1356 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1357 len(content)) 1358 1359 1360class UseBuiltinTypesTestCase(unittest.TestCase): 1361 1362 def test_use_builtin_types(self): 1363 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1364 # makes all dispatch of binary data as bytes instances, and all 1365 # dispatch of datetime argument as datetime.datetime instances. 1366 self.log = [] 1367 expected_bytes = b"my dog has fleas" 1368 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1369 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1370 def foobar(*args): 1371 self.log.extend(args) 1372 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1373 allow_none=True, encoding=None, use_builtin_types=True) 1374 handler.register_function(foobar) 1375 handler._marshaled_dispatch(marshaled) 1376 self.assertEqual(len(self.log), 2) 1377 mybytes, mydate = self.log 1378 self.assertEqual(self.log, [expected_bytes, expected_date]) 1379 self.assertIs(type(mydate), datetime.datetime) 1380 self.assertIs(type(mybytes), bytes) 1381 1382 def test_cgihandler_has_use_builtin_types_flag(self): 1383 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1384 self.assertTrue(handler.use_builtin_types) 1385 1386 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1387 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1388 use_builtin_types=True) 1389 server.server_close() 1390 self.assertTrue(server.use_builtin_types) 1391 1392 1393@support.reap_threads 1394def test_main(): 1395 support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 1396 BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase, 1397 SimpleServerTestCase, SimpleServerEncodingTestCase, 1398 KeepaliveServerTestCase1, KeepaliveServerTestCase2, 1399 GzipServerTestCase, GzipUtilTestCase, 1400 MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase, 1401 CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase) 1402 1403 1404if __name__ == "__main__": 1405 test_main() 1406