1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import threading 14import re 15import io 16import contextlib 17from test import support 18from test.support import os_helper 19from test.support import socket_helper 20from test.support import threading_helper 21from test.support import ALWAYS_EQ, LARGEST, SMALLEST 22 23try: 24 import gzip 25except ImportError: 26 gzip = None 27 28alist = [{'astring': 'foo@bar.baz.spam', 29 'afloat': 7283.43, 30 'anint': 2**20, 31 'ashortlong': 2, 32 'anotherlist': ['.zyx.41'], 33 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 34 'b64bytes': b"my dog has fleas", 35 'b64bytearray': bytearray(b"my dog has fleas"), 36 'boolean': False, 37 'unicode': '\u4000\u6000\u8000', 38 'ukey\u4000': 'regular value', 39 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 40 'datetime2': xmlrpclib.DateTime( 41 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 42 'datetime3': xmlrpclib.DateTime( 43 datetime.datetime(2005, 2, 10, 11, 41, 23)), 44 }] 45 46class XMLRPCTestCase(unittest.TestCase): 47 48 def test_dump_load(self): 49 dump = xmlrpclib.dumps((alist,)) 50 load = xmlrpclib.loads(dump) 51 self.assertEqual(alist, load[0][0]) 52 53 def test_dump_bare_datetime(self): 54 # This checks that an unwrapped datetime.date object can be handled 55 # by the marshalling code. This can't be done via test_dump_load() 56 # since with use_builtin_types set to 1 the unmarshaller would create 57 # datetime objects for the 'datetime[123]' keys as well 58 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 59 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 60 s = xmlrpclib.dumps((dt,)) 61 62 result, m = xmlrpclib.loads(s, use_builtin_types=True) 63 (newdt,) = result 64 self.assertEqual(newdt, dt) 65 self.assertIs(type(newdt), datetime.datetime) 66 self.assertIsNone(m) 67 68 result, m = xmlrpclib.loads(s, use_builtin_types=False) 69 (newdt,) = result 70 self.assertEqual(newdt, dt) 71 self.assertIs(type(newdt), xmlrpclib.DateTime) 72 self.assertIsNone(m) 73 74 result, m = xmlrpclib.loads(s, use_datetime=True) 75 (newdt,) = result 76 self.assertEqual(newdt, dt) 77 self.assertIs(type(newdt), datetime.datetime) 78 self.assertIsNone(m) 79 80 result, m = xmlrpclib.loads(s, use_datetime=False) 81 (newdt,) = result 82 self.assertEqual(newdt, dt) 83 self.assertIs(type(newdt), xmlrpclib.DateTime) 84 self.assertIsNone(m) 85 86 87 def test_datetime_before_1900(self): 88 # same as before but with a date before 1900 89 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 90 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 91 s = xmlrpclib.dumps((dt,)) 92 93 result, m = xmlrpclib.loads(s, use_builtin_types=True) 94 (newdt,) = result 95 self.assertEqual(newdt, dt) 96 self.assertIs(type(newdt), datetime.datetime) 97 self.assertIsNone(m) 98 99 result, m = xmlrpclib.loads(s, use_builtin_types=False) 100 (newdt,) = result 101 self.assertEqual(newdt, dt) 102 self.assertIs(type(newdt), xmlrpclib.DateTime) 103 self.assertIsNone(m) 104 105 def test_bug_1164912 (self): 106 d = xmlrpclib.DateTime() 107 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 108 methodresponse=True)) 109 self.assertIsInstance(new_d.value, str) 110 111 # Check that the output of dumps() is still an 8-bit string 112 s = xmlrpclib.dumps((new_d,), methodresponse=True) 113 self.assertIsInstance(s, str) 114 115 def test_newstyle_class(self): 116 class T(object): 117 pass 118 t = T() 119 t.x = 100 120 t.y = "Hello" 121 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 122 self.assertEqual(t2, t.__dict__) 123 124 def test_dump_big_long(self): 125 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 126 127 def test_dump_bad_dict(self): 128 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 129 130 def test_dump_recursive_seq(self): 131 l = [1,2,3] 132 t = [3,4,5,l] 133 l.append(t) 134 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 135 136 def test_dump_recursive_dict(self): 137 d = {'1':1, '2':1} 138 t = {'3':3, 'd':d} 139 d['t'] = t 140 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 141 142 def test_dump_big_int(self): 143 if sys.maxsize > 2**31-1: 144 self.assertRaises(OverflowError, xmlrpclib.dumps, 145 (int(2**34),)) 146 147 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 148 self.assertRaises(OverflowError, xmlrpclib.dumps, 149 (xmlrpclib.MAXINT+1,)) 150 self.assertRaises(OverflowError, xmlrpclib.dumps, 151 (xmlrpclib.MININT-1,)) 152 153 def dummy_write(s): 154 pass 155 156 m = xmlrpclib.Marshaller() 157 m.dump_int(xmlrpclib.MAXINT, dummy_write) 158 m.dump_int(xmlrpclib.MININT, dummy_write) 159 self.assertRaises(OverflowError, m.dump_int, 160 xmlrpclib.MAXINT+1, dummy_write) 161 self.assertRaises(OverflowError, m.dump_int, 162 xmlrpclib.MININT-1, dummy_write) 163 164 def test_dump_double(self): 165 xmlrpclib.dumps((float(2 ** 34),)) 166 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 167 float(xmlrpclib.MININT))) 168 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 169 float(xmlrpclib.MININT - 42))) 170 171 def dummy_write(s): 172 pass 173 174 m = xmlrpclib.Marshaller() 175 m.dump_double(xmlrpclib.MAXINT, dummy_write) 176 m.dump_double(xmlrpclib.MININT, dummy_write) 177 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 178 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 179 180 def test_dump_none(self): 181 value = alist + [None] 182 arg1 = (alist + [None],) 183 strg = xmlrpclib.dumps(arg1, allow_none=True) 184 self.assertEqual(value, 185 xmlrpclib.loads(strg)[0][0]) 186 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 187 188 def test_dump_encoding(self): 189 value = {'key\u20ac\xa4': 190 'value\u20ac\xa4'} 191 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 192 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 193 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 194 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 195 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 196 197 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 198 methodresponse=True) 199 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 200 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 201 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 202 203 methodname = 'method\u20ac\xa4' 204 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 205 methodname=methodname) 206 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 207 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 208 209 def test_dump_bytes(self): 210 sample = b"my dog has fleas" 211 self.assertEqual(sample, xmlrpclib.Binary(sample)) 212 for type_ in bytes, bytearray, xmlrpclib.Binary: 213 value = type_(sample) 214 s = xmlrpclib.dumps((value,)) 215 216 result, m = xmlrpclib.loads(s, use_builtin_types=True) 217 (newvalue,) = result 218 self.assertEqual(newvalue, sample) 219 self.assertIs(type(newvalue), bytes) 220 self.assertIsNone(m) 221 222 result, m = xmlrpclib.loads(s, use_builtin_types=False) 223 (newvalue,) = result 224 self.assertEqual(newvalue, sample) 225 self.assertIs(type(newvalue), xmlrpclib.Binary) 226 self.assertIsNone(m) 227 228 def test_loads_unsupported(self): 229 ResponseError = xmlrpclib.ResponseError 230 data = '<params><param><value><spam/></value></param></params>' 231 self.assertRaises(ResponseError, xmlrpclib.loads, data) 232 data = ('<params><param><value><array>' 233 '<value><spam/></value>' 234 '</array></value></param></params>') 235 self.assertRaises(ResponseError, xmlrpclib.loads, data) 236 data = ('<params><param><value><struct>' 237 '<member><name>a</name><value><spam/></value></member>' 238 '<member><name>b</name><value><spam/></value></member>' 239 '</struct></value></param></params>') 240 self.assertRaises(ResponseError, xmlrpclib.loads, data) 241 242 def check_loads(self, s, value, **kwargs): 243 dump = '<params><param><value>%s</value></param></params>' % s 244 result, m = xmlrpclib.loads(dump, **kwargs) 245 (newvalue,) = result 246 self.assertEqual(newvalue, value) 247 self.assertIs(type(newvalue), type(value)) 248 self.assertIsNone(m) 249 250 def test_load_standard_types(self): 251 check = self.check_loads 252 check('string', 'string') 253 check('<string>string</string>', 'string') 254 check('<string> string</string>', ' string') 255 check('<int>2056183947</int>', 2056183947) 256 check('<int>-2056183947</int>', -2056183947) 257 check('<i4>2056183947</i4>', 2056183947) 258 check('<double>46093.78125</double>', 46093.78125) 259 check('<boolean>0</boolean>', False) 260 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 261 xmlrpclib.Binary(b'\x00byte string\xff')) 262 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 263 b'\x00byte string\xff', use_builtin_types=True) 264 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 265 xmlrpclib.DateTime('20050210T11:41:23')) 266 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 267 datetime.datetime(2005, 2, 10, 11, 41, 23), 268 use_builtin_types=True) 269 check('<array><data>' 270 '<value><int>1</int></value><value><int>2</int></value>' 271 '</data></array>', [1, 2]) 272 check('<struct>' 273 '<member><name>b</name><value><int>2</int></value></member>' 274 '<member><name>a</name><value><int>1</int></value></member>' 275 '</struct>', {'a': 1, 'b': 2}) 276 277 def test_load_extension_types(self): 278 check = self.check_loads 279 check('<nil/>', None) 280 check('<ex:nil/>', None) 281 check('<i1>205</i1>', 205) 282 check('<i2>20561</i2>', 20561) 283 check('<i8>9876543210</i8>', 9876543210) 284 check('<biginteger>98765432100123456789</biginteger>', 285 98765432100123456789) 286 check('<float>93.78125</float>', 93.78125) 287 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 288 decimal.Decimal('9876543210.0123456789')) 289 290 def test_get_host_info(self): 291 # see bug #3613, this raised a TypeError 292 transp = xmlrpc.client.Transport() 293 self.assertEqual(transp.get_host_info("user@host.tld"), 294 ('host.tld', 295 [('Authorization', 'Basic dXNlcg==')], {})) 296 297 def test_ssl_presence(self): 298 try: 299 import ssl 300 except ImportError: 301 has_ssl = False 302 else: 303 has_ssl = True 304 try: 305 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 306 except NotImplementedError: 307 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 308 except OSError: 309 self.assertTrue(has_ssl) 310 311 def test_keepalive_disconnect(self): 312 class RequestHandler(http.server.BaseHTTPRequestHandler): 313 protocol_version = "HTTP/1.1" 314 handled = False 315 316 def do_POST(self): 317 length = int(self.headers.get("Content-Length")) 318 self.rfile.read(length) 319 if self.handled: 320 self.close_connection = True 321 return 322 response = xmlrpclib.dumps((5,), methodresponse=True) 323 response = response.encode() 324 self.send_response(http.HTTPStatus.OK) 325 self.send_header("Content-Length", len(response)) 326 self.end_headers() 327 self.wfile.write(response) 328 self.handled = True 329 self.close_connection = False 330 331 def log_message(self, format, *args): 332 # don't clobber sys.stderr 333 pass 334 335 def run_server(): 336 server.socket.settimeout(float(1)) # Don't hang if client fails 337 server.handle_request() # First request and attempt at second 338 server.handle_request() # Retried second request 339 340 server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler) 341 self.addCleanup(server.server_close) 342 thread = threading.Thread(target=run_server) 343 thread.start() 344 self.addCleanup(thread.join) 345 url = "http://{}:{}/".format(*server.server_address) 346 with xmlrpclib.ServerProxy(url) as p: 347 self.assertEqual(p.method(), 5) 348 self.assertEqual(p.method(), 5) 349 350 351class SimpleXMLRPCDispatcherTestCase(unittest.TestCase): 352 class DispatchExc(Exception): 353 """Raised inside the dispatched functions when checking for 354 chained exceptions""" 355 356 def test_call_registered_func(self): 357 """Calls explicitly registered function""" 358 # Makes sure any exception raised inside the function has no other 359 # exception chained to it 360 361 exp_params = 1, 2, 3 362 363 def dispatched_func(*params): 364 raise self.DispatchExc(params) 365 366 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 367 dispatcher.register_function(dispatched_func) 368 with self.assertRaises(self.DispatchExc) as exc_ctx: 369 dispatcher._dispatch('dispatched_func', exp_params) 370 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 371 self.assertIsNone(exc_ctx.exception.__cause__) 372 self.assertIsNone(exc_ctx.exception.__context__) 373 374 def test_call_instance_func(self): 375 """Calls a registered instance attribute as a function""" 376 # Makes sure any exception raised inside the function has no other 377 # exception chained to it 378 379 exp_params = 1, 2, 3 380 381 class DispatchedClass: 382 def dispatched_func(self, *params): 383 raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params) 384 385 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 386 dispatcher.register_instance(DispatchedClass()) 387 with self.assertRaises(self.DispatchExc) as exc_ctx: 388 dispatcher._dispatch('dispatched_func', exp_params) 389 self.assertEqual(exc_ctx.exception.args, (exp_params,)) 390 self.assertIsNone(exc_ctx.exception.__cause__) 391 self.assertIsNone(exc_ctx.exception.__context__) 392 393 def test_call_dispatch_func(self): 394 """Calls the registered instance's `_dispatch` function""" 395 # Makes sure any exception raised inside the function has no other 396 # exception chained to it 397 398 exp_method = 'method' 399 exp_params = 1, 2, 3 400 401 class TestInstance: 402 def _dispatch(self, method, params): 403 raise SimpleXMLRPCDispatcherTestCase.DispatchExc( 404 method, params) 405 406 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 407 dispatcher.register_instance(TestInstance()) 408 with self.assertRaises(self.DispatchExc) as exc_ctx: 409 dispatcher._dispatch(exp_method, exp_params) 410 self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params)) 411 self.assertIsNone(exc_ctx.exception.__cause__) 412 self.assertIsNone(exc_ctx.exception.__context__) 413 414 def test_registered_func_is_none(self): 415 """Calls explicitly registered function which is None""" 416 417 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 418 dispatcher.register_function(None, name='method') 419 with self.assertRaisesRegex(Exception, 'method'): 420 dispatcher._dispatch('method', ('param',)) 421 422 def test_instance_has_no_func(self): 423 """Attempts to call nonexistent function on a registered instance""" 424 425 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 426 dispatcher.register_instance(object()) 427 with self.assertRaisesRegex(Exception, 'method'): 428 dispatcher._dispatch('method', ('param',)) 429 430 def test_cannot_locate_func(self): 431 """Calls a function that the dispatcher cannot locate""" 432 433 dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher() 434 with self.assertRaisesRegex(Exception, 'method'): 435 dispatcher._dispatch('method', ('param',)) 436 437 438class HelperTestCase(unittest.TestCase): 439 def test_escape(self): 440 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 441 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 442 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 443 444class FaultTestCase(unittest.TestCase): 445 def test_repr(self): 446 f = xmlrpclib.Fault(42, 'Test Fault') 447 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 448 self.assertEqual(repr(f), str(f)) 449 450 def test_dump_fault(self): 451 f = xmlrpclib.Fault(42, 'Test Fault') 452 s = xmlrpclib.dumps((f,)) 453 (newf,), m = xmlrpclib.loads(s) 454 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 455 self.assertEqual(m, None) 456 457 s = xmlrpclib.Marshaller().dumps(f) 458 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 459 460 def test_dotted_attribute(self): 461 # this will raise AttributeError because code don't want us to use 462 # private methods 463 self.assertRaises(AttributeError, 464 xmlrpc.server.resolve_dotted_attribute, str, '__add') 465 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 466 467class DateTimeTestCase(unittest.TestCase): 468 def test_default(self): 469 with mock.patch('time.localtime') as localtime_mock: 470 time_struct = time.struct_time( 471 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 472 localtime_mock.return_value = time_struct 473 localtime = time.localtime() 474 t = xmlrpclib.DateTime() 475 self.assertEqual(str(t), 476 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 477 478 def test_time(self): 479 d = 1181399930.036952 480 t = xmlrpclib.DateTime(d) 481 self.assertEqual(str(t), 482 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 483 484 def test_time_tuple(self): 485 d = (2007,6,9,10,38,50,5,160,0) 486 t = xmlrpclib.DateTime(d) 487 self.assertEqual(str(t), '20070609T10:38:50') 488 489 def test_time_struct(self): 490 d = time.localtime(1181399930.036952) 491 t = xmlrpclib.DateTime(d) 492 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 493 494 def test_datetime_datetime(self): 495 d = datetime.datetime(2007,1,2,3,4,5) 496 t = xmlrpclib.DateTime(d) 497 self.assertEqual(str(t), '20070102T03:04:05') 498 499 def test_repr(self): 500 d = datetime.datetime(2007,1,2,3,4,5) 501 t = xmlrpclib.DateTime(d) 502 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 503 self.assertEqual(repr(t), val) 504 505 def test_decode(self): 506 d = ' 20070908T07:11:13 ' 507 t1 = xmlrpclib.DateTime() 508 t1.decode(d) 509 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 510 self.assertEqual(t1, tref) 511 512 t2 = xmlrpclib._datetime(d) 513 self.assertEqual(t2, tref) 514 515 def test_comparison(self): 516 now = datetime.datetime.now() 517 dtime = xmlrpclib.DateTime(now.timetuple()) 518 519 # datetime vs. DateTime 520 self.assertTrue(dtime == now) 521 self.assertTrue(now == dtime) 522 then = now + datetime.timedelta(seconds=4) 523 self.assertTrue(then >= dtime) 524 self.assertTrue(dtime < then) 525 526 # str vs. DateTime 527 dstr = now.strftime("%Y%m%dT%H:%M:%S") 528 self.assertTrue(dtime == dstr) 529 self.assertTrue(dstr == dtime) 530 dtime_then = xmlrpclib.DateTime(then.timetuple()) 531 self.assertTrue(dtime_then >= dstr) 532 self.assertTrue(dstr < dtime_then) 533 534 # some other types 535 dbytes = dstr.encode('ascii') 536 dtuple = now.timetuple() 537 self.assertFalse(dtime == 1970) 538 self.assertTrue(dtime != dbytes) 539 self.assertFalse(dtime == bytearray(dbytes)) 540 self.assertTrue(dtime != dtuple) 541 with self.assertRaises(TypeError): 542 dtime < float(1970) 543 with self.assertRaises(TypeError): 544 dtime > dbytes 545 with self.assertRaises(TypeError): 546 dtime <= bytearray(dbytes) 547 with self.assertRaises(TypeError): 548 dtime >= dtuple 549 550 self.assertTrue(dtime == ALWAYS_EQ) 551 self.assertFalse(dtime != ALWAYS_EQ) 552 self.assertTrue(dtime < LARGEST) 553 self.assertFalse(dtime > LARGEST) 554 self.assertTrue(dtime <= LARGEST) 555 self.assertFalse(dtime >= LARGEST) 556 self.assertFalse(dtime < SMALLEST) 557 self.assertTrue(dtime > SMALLEST) 558 self.assertFalse(dtime <= SMALLEST) 559 self.assertTrue(dtime >= SMALLEST) 560 561 562class BinaryTestCase(unittest.TestCase): 563 564 # XXX What should str(Binary(b"\xff")) return? I'm choosing "\xff" 565 # for now (i.e. interpreting the binary data as Latin-1-encoded 566 # text). But this feels very unsatisfactory. Perhaps we should 567 # only define repr(), and return r"Binary(b'\xff')" instead? 568 569 def test_default(self): 570 t = xmlrpclib.Binary() 571 self.assertEqual(str(t), '') 572 573 def test_string(self): 574 d = b'\x01\x02\x03abc123\xff\xfe' 575 t = xmlrpclib.Binary(d) 576 self.assertEqual(str(t), str(d, "latin-1")) 577 578 def test_decode(self): 579 d = b'\x01\x02\x03abc123\xff\xfe' 580 de = base64.encodebytes(d) 581 t1 = xmlrpclib.Binary() 582 t1.decode(de) 583 self.assertEqual(str(t1), str(d, "latin-1")) 584 585 t2 = xmlrpclib._binary(de) 586 self.assertEqual(str(t2), str(d, "latin-1")) 587 588 589ADDR = PORT = URL = None 590 591# The evt is set twice. First when the server is ready to serve. 592# Second when the server has been shutdown. The user must clear 593# the event after it has been set the first time to catch the second set. 594def http_server(evt, numrequests, requestHandler=None, encoding=None): 595 class TestInstanceClass: 596 def div(self, x, y): 597 return x // y 598 599 def _methodHelp(self, name): 600 if name == 'div': 601 return 'This is the div function' 602 603 class Fixture: 604 @staticmethod 605 def getData(): 606 return '42' 607 608 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 609 def get_request(self): 610 # Ensure the socket is always non-blocking. On Linux, socket 611 # attributes are not inherited like they are on *BSD and Windows. 612 s, port = self.socket.accept() 613 s.setblocking(True) 614 return s, port 615 616 if not requestHandler: 617 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 618 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 619 encoding=encoding, 620 logRequests=False, bind_and_activate=False) 621 try: 622 serv.server_bind() 623 global ADDR, PORT, URL 624 ADDR, PORT = serv.socket.getsockname() 625 #connect to IP address directly. This avoids socket.create_connection() 626 #trying to connect to "localhost" using all address families, which 627 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 628 #on AF_INET only. 629 URL = "http://%s:%d"%(ADDR, PORT) 630 serv.server_activate() 631 serv.register_introspection_functions() 632 serv.register_multicall_functions() 633 serv.register_function(pow) 634 serv.register_function(lambda x: x, 'têšt') 635 @serv.register_function 636 def my_function(): 637 '''This is my function''' 638 return True 639 @serv.register_function(name='add') 640 def _(x, y): 641 return x + y 642 testInstance = TestInstanceClass() 643 serv.register_instance(testInstance, allow_dotted_names=True) 644 evt.set() 645 646 # handle up to 'numrequests' requests 647 while numrequests > 0: 648 serv.handle_request() 649 numrequests -= 1 650 651 except TimeoutError: 652 pass 653 finally: 654 serv.socket.close() 655 PORT = None 656 evt.set() 657 658def http_multi_server(evt, numrequests, requestHandler=None): 659 class TestInstanceClass: 660 def div(self, x, y): 661 return x // y 662 663 def _methodHelp(self, name): 664 if name == 'div': 665 return 'This is the div function' 666 667 def my_function(): 668 '''This is my function''' 669 return True 670 671 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 672 def get_request(self): 673 # Ensure the socket is always non-blocking. On Linux, socket 674 # attributes are not inherited like they are on *BSD and Windows. 675 s, port = self.socket.accept() 676 s.setblocking(True) 677 return s, port 678 679 if not requestHandler: 680 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 681 class MyRequestHandler(requestHandler): 682 rpc_paths = [] 683 684 class BrokenDispatcher: 685 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 686 raise RuntimeError("broken dispatcher") 687 688 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 689 logRequests=False, bind_and_activate=False) 690 serv.socket.settimeout(3) 691 serv.server_bind() 692 try: 693 global ADDR, PORT, URL 694 ADDR, PORT = serv.socket.getsockname() 695 #connect to IP address directly. This avoids socket.create_connection() 696 #trying to connect to "localhost" using all address families, which 697 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 698 #on AF_INET only. 699 URL = "http://%s:%d"%(ADDR, PORT) 700 serv.server_activate() 701 paths = [ 702 "/foo", "/foo/bar", 703 "/foo?k=v", "/foo#frag", "/foo?k=v#frag", 704 "", "/", "/RPC2", "?k=v", "#frag", 705 ] 706 for path in paths: 707 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 708 d.register_introspection_functions() 709 d.register_multicall_functions() 710 d.register_function(lambda p=path: p, 'test') 711 serv.get_dispatcher(paths[0]).register_function(pow) 712 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 713 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 714 evt.set() 715 716 # handle up to 'numrequests' requests 717 while numrequests > 0: 718 serv.handle_request() 719 numrequests -= 1 720 721 except TimeoutError: 722 pass 723 finally: 724 serv.socket.close() 725 PORT = None 726 evt.set() 727 728# This function prevents errors like: 729# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 730def is_unavailable_exception(e): 731 '''Returns True if the given ProtocolError is the product of a server-side 732 exception caused by the 'temporarily unavailable' response sometimes 733 given by operations on non-blocking sockets.''' 734 735 # sometimes we get a -1 error code and/or empty headers 736 try: 737 if e.errcode == -1 or e.headers is None: 738 return True 739 exc_mess = e.headers.get('X-exception') 740 except AttributeError: 741 # Ignore OSErrors here. 742 exc_mess = str(e) 743 744 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 745 return True 746 747def make_request_and_skipIf(condition, reason): 748 # If we skip the test, we have to make a request because 749 # the server created in setUp blocks expecting one to come in. 750 if not condition: 751 return lambda func: func 752 def decorator(func): 753 def make_request_and_skip(self): 754 try: 755 xmlrpclib.ServerProxy(URL).my_function() 756 except (xmlrpclib.ProtocolError, OSError) as e: 757 if not is_unavailable_exception(e): 758 raise 759 raise unittest.SkipTest(reason) 760 return make_request_and_skip 761 return decorator 762 763class BaseServerTestCase(unittest.TestCase): 764 requestHandler = None 765 request_count = 1 766 threadFunc = staticmethod(http_server) 767 768 def setUp(self): 769 # enable traceback reporting 770 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 771 772 self.evt = threading.Event() 773 # start server thread to handle requests 774 serv_args = (self.evt, self.request_count, self.requestHandler) 775 thread = threading.Thread(target=self.threadFunc, args=serv_args) 776 thread.start() 777 self.addCleanup(thread.join) 778 779 # wait for the server to be ready 780 self.evt.wait() 781 self.evt.clear() 782 783 def tearDown(self): 784 # wait on the server thread to terminate 785 self.evt.wait() 786 787 # disable traceback reporting 788 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 789 790class SimpleServerTestCase(BaseServerTestCase): 791 def test_simple1(self): 792 try: 793 p = xmlrpclib.ServerProxy(URL) 794 self.assertEqual(p.pow(6,8), 6**8) 795 except (xmlrpclib.ProtocolError, OSError) as e: 796 # ignore failures due to non-blocking socket 'unavailable' errors 797 if not is_unavailable_exception(e): 798 # protocol error; provide additional information in test output 799 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 800 801 def test_nonascii(self): 802 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 803 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 804 try: 805 p = xmlrpclib.ServerProxy(URL) 806 self.assertEqual(p.add(start_string, end_string), 807 start_string + end_string) 808 except (xmlrpclib.ProtocolError, OSError) as e: 809 # ignore failures due to non-blocking socket 'unavailable' errors 810 if not is_unavailable_exception(e): 811 # protocol error; provide additional information in test output 812 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 813 814 def test_client_encoding(self): 815 start_string = '\u20ac' 816 end_string = '\xa4' 817 818 try: 819 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 820 self.assertEqual(p.add(start_string, end_string), 821 start_string + end_string) 822 except (xmlrpclib.ProtocolError, socket.error) as e: 823 # ignore failures due to non-blocking socket unavailable errors. 824 if not is_unavailable_exception(e): 825 # protocol error; provide additional information in test output 826 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 827 828 def test_nonascii_methodname(self): 829 try: 830 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 831 self.assertEqual(p.têšt(42), 42) 832 except (xmlrpclib.ProtocolError, socket.error) as e: 833 # ignore failures due to non-blocking socket unavailable errors. 834 if not is_unavailable_exception(e): 835 # protocol error; provide additional information in test output 836 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 837 838 def test_404(self): 839 # send POST with http.client, it should return 404 header and 840 # 'Not Found' message. 841 with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: 842 conn.request('POST', '/this-is-not-valid') 843 response = conn.getresponse() 844 845 self.assertEqual(response.status, 404) 846 self.assertEqual(response.reason, 'Not Found') 847 848 def test_introspection1(self): 849 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 850 'system.listMethods', 'system.methodHelp', 851 'system.methodSignature', 'system.multicall', 852 'Fixture']) 853 try: 854 p = xmlrpclib.ServerProxy(URL) 855 meth = p.system.listMethods() 856 self.assertEqual(set(meth), expected_methods) 857 except (xmlrpclib.ProtocolError, OSError) as e: 858 # ignore failures due to non-blocking socket 'unavailable' errors 859 if not is_unavailable_exception(e): 860 # protocol error; provide additional information in test output 861 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 862 863 864 def test_introspection2(self): 865 try: 866 # test _methodHelp() 867 p = xmlrpclib.ServerProxy(URL) 868 divhelp = p.system.methodHelp('div') 869 self.assertEqual(divhelp, 'This is the div function') 870 except (xmlrpclib.ProtocolError, OSError) as e: 871 # ignore failures due to non-blocking socket 'unavailable' errors 872 if not is_unavailable_exception(e): 873 # protocol error; provide additional information in test output 874 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 875 876 @make_request_and_skipIf(sys.flags.optimize >= 2, 877 "Docstrings are omitted with -O2 and above") 878 def test_introspection3(self): 879 try: 880 # test native doc 881 p = xmlrpclib.ServerProxy(URL) 882 myfunction = p.system.methodHelp('my_function') 883 self.assertEqual(myfunction, 'This is my function') 884 except (xmlrpclib.ProtocolError, OSError) as e: 885 # ignore failures due to non-blocking socket 'unavailable' errors 886 if not is_unavailable_exception(e): 887 # protocol error; provide additional information in test output 888 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 889 890 def test_introspection4(self): 891 # the SimpleXMLRPCServer doesn't support signatures, but 892 # at least check that we can try making the call 893 try: 894 p = xmlrpclib.ServerProxy(URL) 895 divsig = p.system.methodSignature('div') 896 self.assertEqual(divsig, 'signatures not supported') 897 except (xmlrpclib.ProtocolError, OSError) as e: 898 # ignore failures due to non-blocking socket 'unavailable' errors 899 if not is_unavailable_exception(e): 900 # protocol error; provide additional information in test output 901 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 902 903 def test_multicall(self): 904 try: 905 p = xmlrpclib.ServerProxy(URL) 906 multicall = xmlrpclib.MultiCall(p) 907 multicall.add(2,3) 908 multicall.pow(6,8) 909 multicall.div(127,42) 910 add_result, pow_result, div_result = multicall() 911 self.assertEqual(add_result, 2+3) 912 self.assertEqual(pow_result, 6**8) 913 self.assertEqual(div_result, 127//42) 914 except (xmlrpclib.ProtocolError, OSError) as e: 915 # ignore failures due to non-blocking socket 'unavailable' errors 916 if not is_unavailable_exception(e): 917 # protocol error; provide additional information in test output 918 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 919 920 def test_non_existing_multicall(self): 921 try: 922 p = xmlrpclib.ServerProxy(URL) 923 multicall = xmlrpclib.MultiCall(p) 924 multicall.this_is_not_exists() 925 result = multicall() 926 927 # result.results contains; 928 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 929 # 'method "this_is_not_exists" is not supported'>}] 930 931 self.assertEqual(result.results[0]['faultCode'], 1) 932 self.assertEqual(result.results[0]['faultString'], 933 '<class \'Exception\'>:method "this_is_not_exists" ' 934 'is not supported') 935 except (xmlrpclib.ProtocolError, OSError) as e: 936 # ignore failures due to non-blocking socket 'unavailable' errors 937 if not is_unavailable_exception(e): 938 # protocol error; provide additional information in test output 939 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 940 941 def test_dotted_attribute(self): 942 # Raises an AttributeError because private methods are not allowed. 943 self.assertRaises(AttributeError, 944 xmlrpc.server.resolve_dotted_attribute, str, '__add') 945 946 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 947 # Get the test to run faster by sending a request with test_simple1. 948 # This avoids waiting for the socket timeout. 949 self.test_simple1() 950 951 def test_allow_dotted_names_true(self): 952 # XXX also need allow_dotted_names_false test. 953 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 954 data = server.Fixture.getData() 955 self.assertEqual(data, '42') 956 957 def test_unicode_host(self): 958 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 959 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 960 961 def test_partial_post(self): 962 # Check that a partial POST doesn't make the server loop: issue #14001. 963 with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn: 964 conn.send('POST /RPC2 HTTP/1.0\r\n' 965 'Content-Length: 100\r\n\r\n' 966 'bye HTTP/1.1\r\n' 967 f'Host: {ADDR}:{PORT}\r\n' 968 'Accept-Encoding: identity\r\n' 969 'Content-Length: 0\r\n\r\n'.encode('ascii')) 970 971 def test_context_manager(self): 972 with xmlrpclib.ServerProxy(URL) as server: 973 server.add(2, 3) 974 self.assertNotEqual(server('transport')._connection, 975 (None, None)) 976 self.assertEqual(server('transport')._connection, 977 (None, None)) 978 979 def test_context_manager_method_error(self): 980 try: 981 with xmlrpclib.ServerProxy(URL) as server: 982 server.add(2, "a") 983 except xmlrpclib.Fault: 984 pass 985 self.assertEqual(server('transport')._connection, 986 (None, None)) 987 988 989class SimpleServerEncodingTestCase(BaseServerTestCase): 990 @staticmethod 991 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 992 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 993 994 def test_server_encoding(self): 995 start_string = '\u20ac' 996 end_string = '\xa4' 997 998 try: 999 p = xmlrpclib.ServerProxy(URL) 1000 self.assertEqual(p.add(start_string, end_string), 1001 start_string + end_string) 1002 except (xmlrpclib.ProtocolError, socket.error) as e: 1003 # ignore failures due to non-blocking socket unavailable errors. 1004 if not is_unavailable_exception(e): 1005 # protocol error; provide additional information in test output 1006 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1007 1008 1009class MultiPathServerTestCase(BaseServerTestCase): 1010 threadFunc = staticmethod(http_multi_server) 1011 request_count = 2 1012 def test_path1(self): 1013 p = xmlrpclib.ServerProxy(URL+"/foo") 1014 self.assertEqual(p.pow(6,8), 6**8) 1015 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1016 1017 def test_path2(self): 1018 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 1019 self.assertEqual(p.add(6,8), 6+8) 1020 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 1021 1022 def test_path3(self): 1023 p = xmlrpclib.ServerProxy(URL+"/is/broken") 1024 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1025 1026 def test_invalid_path(self): 1027 p = xmlrpclib.ServerProxy(URL+"/invalid") 1028 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 1029 1030 def test_path_query_fragment(self): 1031 p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag") 1032 self.assertEqual(p.test(), "/foo?k=v#frag") 1033 1034 def test_path_fragment(self): 1035 p = xmlrpclib.ServerProxy(URL+"/foo#frag") 1036 self.assertEqual(p.test(), "/foo#frag") 1037 1038 def test_path_query(self): 1039 p = xmlrpclib.ServerProxy(URL+"/foo?k=v") 1040 self.assertEqual(p.test(), "/foo?k=v") 1041 1042 def test_empty_path(self): 1043 p = xmlrpclib.ServerProxy(URL) 1044 self.assertEqual(p.test(), "/RPC2") 1045 1046 def test_root_path(self): 1047 p = xmlrpclib.ServerProxy(URL + "/") 1048 self.assertEqual(p.test(), "/") 1049 1050 def test_empty_path_query(self): 1051 p = xmlrpclib.ServerProxy(URL + "?k=v") 1052 self.assertEqual(p.test(), "?k=v") 1053 1054 def test_empty_path_fragment(self): 1055 p = xmlrpclib.ServerProxy(URL + "#frag") 1056 self.assertEqual(p.test(), "#frag") 1057 1058 1059#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1060#does indeed serve subsequent requests on the same connection 1061class BaseKeepaliveServerTestCase(BaseServerTestCase): 1062 #a request handler that supports keep-alive and logs requests into a 1063 #class variable 1064 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1065 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1066 protocol_version = 'HTTP/1.1' 1067 myRequests = [] 1068 def handle(self): 1069 self.myRequests.append([]) 1070 self.reqidx = len(self.myRequests)-1 1071 return self.parentClass.handle(self) 1072 def handle_one_request(self): 1073 result = self.parentClass.handle_one_request(self) 1074 self.myRequests[self.reqidx].append(self.raw_requestline) 1075 return result 1076 1077 requestHandler = RequestHandler 1078 def setUp(self): 1079 #clear request log 1080 self.RequestHandler.myRequests = [] 1081 return BaseServerTestCase.setUp(self) 1082 1083#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 1084#does indeed serve subsequent requests on the same connection 1085class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 1086 def test_two(self): 1087 p = xmlrpclib.ServerProxy(URL) 1088 #do three requests. 1089 self.assertEqual(p.pow(6,8), 6**8) 1090 self.assertEqual(p.pow(6,8), 6**8) 1091 self.assertEqual(p.pow(6,8), 6**8) 1092 p("close")() 1093 1094 #they should have all been handled by a single request handler 1095 self.assertEqual(len(self.RequestHandler.myRequests), 1) 1096 1097 #check that we did at least two (the third may be pending append 1098 #due to thread scheduling) 1099 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1100 1101 1102#test special attribute access on the serverproxy, through the __call__ 1103#function. 1104class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 1105 #ask for two keepalive requests to be handled. 1106 request_count=2 1107 1108 def test_close(self): 1109 p = xmlrpclib.ServerProxy(URL) 1110 #do some requests with close. 1111 self.assertEqual(p.pow(6,8), 6**8) 1112 self.assertEqual(p.pow(6,8), 6**8) 1113 self.assertEqual(p.pow(6,8), 6**8) 1114 p("close")() #this should trigger a new keep-alive request 1115 self.assertEqual(p.pow(6,8), 6**8) 1116 self.assertEqual(p.pow(6,8), 6**8) 1117 self.assertEqual(p.pow(6,8), 6**8) 1118 p("close")() 1119 1120 #they should have all been two request handlers, each having logged at least 1121 #two complete requests 1122 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1123 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 1124 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 1125 1126 1127 def test_transport(self): 1128 p = xmlrpclib.ServerProxy(URL) 1129 #do some requests with close. 1130 self.assertEqual(p.pow(6,8), 6**8) 1131 p("transport").close() #same as above, really. 1132 self.assertEqual(p.pow(6,8), 6**8) 1133 p("close")() 1134 self.assertEqual(len(self.RequestHandler.myRequests), 2) 1135 1136#A test case that verifies that gzip encoding works in both directions 1137#(for a request and the response) 1138@unittest.skipIf(gzip is None, 'requires gzip') 1139class GzipServerTestCase(BaseServerTestCase): 1140 #a request handler that supports keep-alive and logs requests into a 1141 #class variable 1142 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1143 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1144 protocol_version = 'HTTP/1.1' 1145 1146 def do_POST(self): 1147 #store content of last request in class 1148 self.__class__.content_length = int(self.headers["content-length"]) 1149 return self.parentClass.do_POST(self) 1150 requestHandler = RequestHandler 1151 1152 class Transport(xmlrpclib.Transport): 1153 #custom transport, stores the response length for our perusal 1154 fake_gzip = False 1155 def parse_response(self, response): 1156 self.response_length=int(response.getheader("content-length", 0)) 1157 return xmlrpclib.Transport.parse_response(self, response) 1158 1159 def send_content(self, connection, body): 1160 if self.fake_gzip: 1161 #add a lone gzip header to induce decode error remotely 1162 connection.putheader("Content-Encoding", "gzip") 1163 return xmlrpclib.Transport.send_content(self, connection, body) 1164 1165 def setUp(self): 1166 BaseServerTestCase.setUp(self) 1167 1168 def test_gzip_request(self): 1169 t = self.Transport() 1170 t.encode_threshold = None 1171 p = xmlrpclib.ServerProxy(URL, transport=t) 1172 self.assertEqual(p.pow(6,8), 6**8) 1173 a = self.RequestHandler.content_length 1174 t.encode_threshold = 0 #turn on request encoding 1175 self.assertEqual(p.pow(6,8), 6**8) 1176 b = self.RequestHandler.content_length 1177 self.assertTrue(a>b) 1178 p("close")() 1179 1180 def test_bad_gzip_request(self): 1181 t = self.Transport() 1182 t.encode_threshold = None 1183 t.fake_gzip = True 1184 p = xmlrpclib.ServerProxy(URL, transport=t) 1185 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1186 re.compile(r"\b400\b")) 1187 with cm: 1188 p.pow(6, 8) 1189 p("close")() 1190 1191 def test_gzip_response(self): 1192 t = self.Transport() 1193 p = xmlrpclib.ServerProxy(URL, transport=t) 1194 old = self.requestHandler.encode_threshold 1195 self.requestHandler.encode_threshold = None #no encoding 1196 self.assertEqual(p.pow(6,8), 6**8) 1197 a = t.response_length 1198 self.requestHandler.encode_threshold = 0 #always encode 1199 self.assertEqual(p.pow(6,8), 6**8) 1200 p("close")() 1201 b = t.response_length 1202 self.requestHandler.encode_threshold = old 1203 self.assertTrue(a>b) 1204 1205 1206@unittest.skipIf(gzip is None, 'requires gzip') 1207class GzipUtilTestCase(unittest.TestCase): 1208 1209 def test_gzip_decode_limit(self): 1210 max_gzip_decode = 20 * 1024 * 1024 1211 data = b'\0' * max_gzip_decode 1212 encoded = xmlrpclib.gzip_encode(data) 1213 decoded = xmlrpclib.gzip_decode(encoded) 1214 self.assertEqual(len(decoded), max_gzip_decode) 1215 1216 data = b'\0' * (max_gzip_decode + 1) 1217 encoded = xmlrpclib.gzip_encode(data) 1218 1219 with self.assertRaisesRegex(ValueError, 1220 "max gzipped payload length exceeded"): 1221 xmlrpclib.gzip_decode(encoded) 1222 1223 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1224 1225 1226class HeadersServerTestCase(BaseServerTestCase): 1227 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1228 test_headers = None 1229 1230 def do_POST(self): 1231 self.__class__.test_headers = self.headers 1232 return super().do_POST() 1233 requestHandler = RequestHandler 1234 standard_headers = [ 1235 'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent', 1236 'Content-Length'] 1237 1238 def setUp(self): 1239 self.RequestHandler.test_headers = None 1240 return super().setUp() 1241 1242 def assertContainsAdditionalHeaders(self, headers, additional): 1243 expected_keys = sorted(self.standard_headers + list(additional.keys())) 1244 self.assertListEqual(sorted(headers.keys()), expected_keys) 1245 1246 for key, value in additional.items(): 1247 self.assertEqual(headers.get(key), value) 1248 1249 def test_header(self): 1250 p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')]) 1251 self.assertEqual(p.pow(6, 8), 6**8) 1252 1253 headers = self.RequestHandler.test_headers 1254 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1255 1256 def test_header_many(self): 1257 p = xmlrpclib.ServerProxy( 1258 URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')]) 1259 self.assertEqual(p.pow(6, 8), 6**8) 1260 1261 headers = self.RequestHandler.test_headers 1262 self.assertContainsAdditionalHeaders( 1263 headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'}) 1264 1265 def test_header_empty(self): 1266 p = xmlrpclib.ServerProxy(URL, headers=[]) 1267 self.assertEqual(p.pow(6, 8), 6**8) 1268 1269 headers = self.RequestHandler.test_headers 1270 self.assertContainsAdditionalHeaders(headers, {}) 1271 1272 def test_header_tuple(self): 1273 p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),)) 1274 self.assertEqual(p.pow(6, 8), 6**8) 1275 1276 headers = self.RequestHandler.test_headers 1277 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1278 1279 def test_header_items(self): 1280 p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items()) 1281 self.assertEqual(p.pow(6, 8), 6**8) 1282 1283 headers = self.RequestHandler.test_headers 1284 self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'}) 1285 1286 1287#Test special attributes of the ServerProxy object 1288class ServerProxyTestCase(unittest.TestCase): 1289 def setUp(self): 1290 unittest.TestCase.setUp(self) 1291 # Actual value of the URL doesn't matter if it is a string in 1292 # the correct format. 1293 self.url = 'http://fake.localhost' 1294 1295 def test_close(self): 1296 p = xmlrpclib.ServerProxy(self.url) 1297 self.assertEqual(p('close')(), None) 1298 1299 def test_transport(self): 1300 t = xmlrpclib.Transport() 1301 p = xmlrpclib.ServerProxy(self.url, transport=t) 1302 self.assertEqual(p('transport'), t) 1303 1304 1305# This is a contrived way to make a failure occur on the server side 1306# in order to test the _send_traceback_header flag on the server 1307class FailingMessageClass(http.client.HTTPMessage): 1308 def get(self, key, failobj=None): 1309 key = key.lower() 1310 if key == 'content-length': 1311 return 'I am broken' 1312 return super().get(key, failobj) 1313 1314 1315class FailingServerTestCase(unittest.TestCase): 1316 def setUp(self): 1317 self.evt = threading.Event() 1318 # start server thread to handle requests 1319 serv_args = (self.evt, 1) 1320 thread = threading.Thread(target=http_server, args=serv_args) 1321 thread.start() 1322 self.addCleanup(thread.join) 1323 1324 # wait for the server to be ready 1325 self.evt.wait() 1326 self.evt.clear() 1327 1328 def tearDown(self): 1329 # wait on the server thread to terminate 1330 self.evt.wait() 1331 # reset flag 1332 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1333 # reset message class 1334 default_class = http.client.HTTPMessage 1335 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1336 1337 def test_basic(self): 1338 # check that flag is false by default 1339 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1340 self.assertEqual(flagval, False) 1341 1342 # enable traceback reporting 1343 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1344 1345 # test a call that shouldn't fail just as a smoke test 1346 try: 1347 p = xmlrpclib.ServerProxy(URL) 1348 self.assertEqual(p.pow(6,8), 6**8) 1349 except (xmlrpclib.ProtocolError, OSError) as e: 1350 # ignore failures due to non-blocking socket 'unavailable' errors 1351 if not is_unavailable_exception(e): 1352 # protocol error; provide additional information in test output 1353 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1354 1355 def test_fail_no_info(self): 1356 # use the broken message class 1357 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1358 1359 try: 1360 p = xmlrpclib.ServerProxy(URL) 1361 p.pow(6,8) 1362 except (xmlrpclib.ProtocolError, OSError) as e: 1363 # ignore failures due to non-blocking socket 'unavailable' errors 1364 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1365 # The two server-side error headers shouldn't be sent back in this case 1366 self.assertTrue(e.headers.get("X-exception") is None) 1367 self.assertTrue(e.headers.get("X-traceback") is None) 1368 else: 1369 self.fail('ProtocolError not raised') 1370 1371 def test_fail_with_info(self): 1372 # use the broken message class 1373 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1374 1375 # Check that errors in the server send back exception/traceback 1376 # info when flag is set 1377 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1378 1379 try: 1380 p = xmlrpclib.ServerProxy(URL) 1381 p.pow(6,8) 1382 except (xmlrpclib.ProtocolError, OSError) as e: 1383 # ignore failures due to non-blocking socket 'unavailable' errors 1384 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1385 # We should get error info in the response 1386 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1387 self.assertEqual(e.headers.get("X-exception"), expected_err) 1388 self.assertTrue(e.headers.get("X-traceback") is not None) 1389 else: 1390 self.fail('ProtocolError not raised') 1391 1392 1393@contextlib.contextmanager 1394def captured_stdout(encoding='utf-8'): 1395 """A variation on support.captured_stdout() which gives a text stream 1396 having a `buffer` attribute. 1397 """ 1398 orig_stdout = sys.stdout 1399 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1400 try: 1401 yield sys.stdout 1402 finally: 1403 sys.stdout = orig_stdout 1404 1405 1406class CGIHandlerTestCase(unittest.TestCase): 1407 def setUp(self): 1408 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1409 1410 def tearDown(self): 1411 self.cgi = None 1412 1413 def test_cgi_get(self): 1414 with os_helper.EnvironmentVarGuard() as env: 1415 env['REQUEST_METHOD'] = 'GET' 1416 # if the method is GET and no request_text is given, it runs handle_get 1417 # get sysout output 1418 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1419 self.cgi.handle_request() 1420 1421 # parse Status header 1422 data_out.seek(0) 1423 handle = data_out.read() 1424 status = handle.split()[1] 1425 message = ' '.join(handle.split()[2:4]) 1426 1427 self.assertEqual(status, '400') 1428 self.assertEqual(message, 'Bad Request') 1429 1430 1431 def test_cgi_xmlrpc_response(self): 1432 data = """<?xml version='1.0'?> 1433 <methodCall> 1434 <methodName>test_method</methodName> 1435 <params> 1436 <param> 1437 <value><string>foo</string></value> 1438 </param> 1439 <param> 1440 <value><string>bar</string></value> 1441 </param> 1442 </params> 1443 </methodCall> 1444 """ 1445 1446 with os_helper.EnvironmentVarGuard() as env, \ 1447 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1448 support.captured_stdin() as data_in: 1449 data_in.write(data) 1450 data_in.seek(0) 1451 env['CONTENT_LENGTH'] = str(len(data)) 1452 self.cgi.handle_request() 1453 data_out.seek(0) 1454 1455 # will respond exception, if so, our goal is achieved ;) 1456 handle = data_out.read() 1457 1458 # start with 44th char so as not to get http header, we just 1459 # need only xml 1460 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1461 1462 # Also test the content-length returned by handle_request 1463 # Using the same test method inorder to avoid all the datapassing 1464 # boilerplate code. 1465 # Test for bug: http://bugs.python.org/issue5040 1466 1467 content = handle[handle.find("<?xml"):] 1468 1469 self.assertEqual( 1470 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1471 len(content)) 1472 1473 1474class UseBuiltinTypesTestCase(unittest.TestCase): 1475 1476 def test_use_builtin_types(self): 1477 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1478 # makes all dispatch of binary data as bytes instances, and all 1479 # dispatch of datetime argument as datetime.datetime instances. 1480 self.log = [] 1481 expected_bytes = b"my dog has fleas" 1482 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1483 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1484 def foobar(*args): 1485 self.log.extend(args) 1486 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1487 allow_none=True, encoding=None, use_builtin_types=True) 1488 handler.register_function(foobar) 1489 handler._marshaled_dispatch(marshaled) 1490 self.assertEqual(len(self.log), 2) 1491 mybytes, mydate = self.log 1492 self.assertEqual(self.log, [expected_bytes, expected_date]) 1493 self.assertIs(type(mydate), datetime.datetime) 1494 self.assertIs(type(mybytes), bytes) 1495 1496 def test_cgihandler_has_use_builtin_types_flag(self): 1497 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1498 self.assertTrue(handler.use_builtin_types) 1499 1500 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1501 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1502 use_builtin_types=True) 1503 server.server_close() 1504 self.assertTrue(server.use_builtin_types) 1505 1506 1507def setUpModule(): 1508 thread_info = threading_helper.threading_setup() 1509 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 1510 1511 1512if __name__ == "__main__": 1513 unittest.main() 1514