1# (c) 2005 Ian Bicking and contributors; written for Paste 2# (http://pythonpaste.org) 3# Licensed under the MIT license: 4# http://www.opensource.org/licenses/mit-license.php 5""" 6Routines for testing WSGI applications. 7 8Most interesting is TestApp 9""" 10from __future__ import unicode_literals 11 12import os 13import re 14import json 15import random 16import fnmatch 17import mimetypes 18 19from base64 import b64encode 20 21from six import StringIO 22from six import BytesIO 23from six import string_types 24from six import binary_type 25from six import text_type 26from six.moves import http_cookiejar 27 28from webtest.compat import urlparse 29from webtest.compat import urlencode 30from webtest.compat import to_bytes 31from webtest.compat import escape_cookie_value 32from webtest.response import TestResponse 33from webtest import forms 34from webtest import lint 35from webtest import utils 36 37import webob 38 39 40__all__ = ['TestApp', 'TestRequest'] 41 42 43class AppError(Exception): 44 45 def __init__(self, message, *args): 46 if isinstance(message, binary_type): 47 message = message.decode('utf8') 48 str_args = () 49 for arg in args: 50 if isinstance(arg, webob.Response): 51 body = arg.body 52 if isinstance(body, binary_type): 53 if arg.charset: 54 arg = body.decode(arg.charset) 55 else: 56 arg = repr(body) 57 elif isinstance(arg, binary_type): 58 try: 59 arg = arg.decode('utf8') 60 except UnicodeDecodeError: 61 arg = repr(arg) 62 str_args += (arg,) 63 message = message % str_args 64 Exception.__init__(self, message) 65 66 67class CookiePolicy(http_cookiejar.DefaultCookiePolicy): 68 """A subclass of DefaultCookiePolicy to allow cookie set for 69 Domain=localhost.""" 70 71 def return_ok_domain(self, cookie, request): 72 if cookie.domain == '.localhost': 73 return True 74 return http_cookiejar.DefaultCookiePolicy.return_ok_domain( 75 self, cookie, request) 76 77 def set_ok_domain(self, cookie, request): 78 if cookie.domain == '.localhost': 79 return True 80 return http_cookiejar.DefaultCookiePolicy.set_ok_domain( 81 self, cookie, request) 82 83 84class TestRequest(webob.BaseRequest): 85 """A subclass of webob.Request""" 86 ResponseClass = TestResponse 87 88 89class TestApp(object): 90 """ 91 Wraps a WSGI application in a more convenient interface for 92 testing. It uses extended version of :class:`webob.BaseRequest` 93 and :class:`webob.Response`. 94 95 :param app: 96 May be an WSGI application or Paste Deploy app, 97 like ``'config:filename.ini#test'``. 98 99 .. versionadded:: 2.0 100 101 It can also be an actual full URL to an http server and webtest 102 will proxy requests with `WSGIProxy2 103 <https://pypi.python.org/pypi/WSGIProxy2/>`_. 104 :type app: 105 WSGI application 106 :param extra_environ: 107 A dictionary of values that should go 108 into the environment for each request. These can provide a 109 communication channel with the application. 110 :type extra_environ: 111 dict 112 :param relative_to: 113 A directory used for file 114 uploads are calculated relative to this. Also ``config:`` 115 URIs that aren't absolute. 116 :type relative_to: 117 string 118 :param cookiejar: 119 :class:`cookielib.CookieJar` alike API that keeps cookies 120 across requets. 121 :type cookiejar: 122 CookieJar instance 123 124 .. attribute:: cookies 125 126 A convenient shortcut for a dict of all cookies in 127 ``cookiejar``. 128 129 :param parser_features: 130 Passed to BeautifulSoup when parsing responses. 131 :type parser_features: 132 string or list 133 :param json_encoder: 134 Passed to json.dumps when encoding json 135 :type json_encoder: 136 A subclass of json.JSONEncoder 137 :param lint: 138 If True (default) then check that the application is WSGI compliant 139 :type lint: 140 A boolean 141 """ 142 143 RequestClass = TestRequest 144 145 def __init__(self, app, extra_environ=None, relative_to=None, 146 use_unicode=True, cookiejar=None, parser_features=None, 147 json_encoder=None, lint=True): 148 149 if 'WEBTEST_TARGET_URL' in os.environ: 150 app = os.environ['WEBTEST_TARGET_URL'] 151 if isinstance(app, string_types): 152 if app.startswith('http'): 153 try: 154 from wsgiproxy import HostProxy 155 except ImportError: # pragma: no cover 156 raise ImportError(( 157 'Using webtest with a real url requires WSGIProxy2. ' 158 'Please install it with: ' 159 'pip install WSGIProxy2')) 160 if '#' not in app: 161 app += '#httplib' 162 url, client = app.split('#', 1) 163 app = HostProxy(url, client=client) 164 else: 165 from paste.deploy import loadapp 166 # @@: Should pick up relative_to from calling module's 167 # __file__ 168 app = loadapp(app, relative_to=relative_to) 169 self.app = app 170 self.lint = lint 171 self.relative_to = relative_to 172 if extra_environ is None: 173 extra_environ = {} 174 self.extra_environ = extra_environ 175 self.use_unicode = use_unicode 176 if cookiejar is None: 177 cookiejar = http_cookiejar.CookieJar(policy=CookiePolicy()) 178 self.cookiejar = cookiejar 179 if parser_features is None: 180 parser_features = 'html.parser' 181 self.RequestClass.ResponseClass.parser_features = parser_features 182 if json_encoder is None: 183 json_encoder = json.JSONEncoder 184 self.JSONEncoder = json_encoder 185 186 def get_authorization(self): 187 """Allow to set the HTTP_AUTHORIZATION environ key. Value should looks 188 like ``('Basic', ('user', 'password'))`` 189 190 If value is None the the HTTP_AUTHORIZATION is removed 191 """ 192 return self.authorization_value 193 194 def set_authorization(self, value): 195 self.authorization_value = value 196 if value is not None: 197 invalid_value = ( 198 "You should use a value like ('Basic', ('user', 'password'))" 199 ) 200 if isinstance(value, (list, tuple)) and len(value) == 2: 201 authtype, val = value 202 if authtype == 'Basic' and val and \ 203 isinstance(val, (list, tuple)): 204 val = ':'.join(list(val)) 205 val = b64encode(to_bytes(val)).strip() 206 val = val.decode('latin1') 207 else: 208 raise ValueError(invalid_value) 209 value = str('%s %s' % (authtype, val)) 210 else: 211 raise ValueError(invalid_value) 212 self.extra_environ.update({ 213 'HTTP_AUTHORIZATION': value, 214 }) 215 else: 216 if 'HTTP_AUTHORIZATION' in self.extra_environ: 217 del self.extra_environ['HTTP_AUTHORIZATION'] 218 219 authorization = property(get_authorization, set_authorization) 220 221 @property 222 def cookies(self): 223 return dict([(cookie.name, cookie.value) for cookie in self.cookiejar]) 224 225 def set_cookie(self, name, value): 226 """ 227 Sets a cookie to be passed through with requests. 228 229 """ 230 value = escape_cookie_value(value) 231 cookie = http_cookiejar.Cookie( 232 version=0, 233 name=name, 234 value=value, 235 port=None, 236 port_specified=False, 237 domain='.localhost', 238 domain_specified=True, 239 domain_initial_dot=False, 240 path='/', 241 path_specified=True, 242 secure=False, 243 expires=None, 244 discard=False, 245 comment=None, 246 comment_url=None, 247 rest=None 248 ) 249 self.cookiejar.set_cookie(cookie) 250 251 def reset(self): 252 """ 253 Resets the state of the application; currently just clears 254 saved cookies. 255 """ 256 self.cookiejar.clear() 257 258 def set_parser_features(self, parser_features): 259 """ 260 Changes the parser used by BeautifulSoup. See its documentation to 261 know the supported parsers. 262 """ 263 self.RequestClass.ResponseClass.parser_features = parser_features 264 265 def get(self, url, params=None, headers=None, extra_environ=None, 266 status=None, expect_errors=False, xhr=False): 267 """ 268 Do a GET request given the url path. 269 270 :param params: 271 A query string, or a dictionary that will be encoded 272 into a query string. You may also include a URL query 273 string on the ``url``. 274 :param headers: 275 Extra headers to send. 276 :type headers: 277 dictionary 278 :param extra_environ: 279 Environmental variables that should be added to the request. 280 :type extra_environ: 281 dictionary 282 :param status: 283 The HTTP status code you expect in response (if not 200 or 3xx). 284 You can also use a wildcard, like ``'3*'`` or ``'*'``. 285 :type status: 286 integer or string 287 :param expect_errors: 288 If this is False, then if anything is written to 289 environ ``wsgi.errors`` it will be an error. 290 If it is True, then non-200/3xx responses are also okay. 291 :type expect_errors: 292 boolean 293 :param xhr: 294 If this is true, then marks response as ajax. The same as 295 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', } 296 :type xhr: 297 boolean 298 299 :returns: :class:`webtest.TestResponse` instance. 300 301 """ 302 environ = self._make_environ(extra_environ) 303 url = str(url) 304 url = self._remove_fragment(url) 305 if params: 306 if not isinstance(params, string_types): 307 params = urlencode(params, doseq=True) 308 if str('?') in url: 309 url += str('&') 310 else: 311 url += str('?') 312 url += params 313 if str('?') in url: 314 url, environ['QUERY_STRING'] = url.split(str('?'), 1) 315 else: 316 environ['QUERY_STRING'] = str('') 317 req = self.RequestClass.blank(url, environ) 318 if xhr: 319 headers = self._add_xhr_header(headers) 320 if headers: 321 req.headers.update(headers) 322 return self.do_request(req, status=status, 323 expect_errors=expect_errors) 324 325 def post(self, url, params='', headers=None, extra_environ=None, 326 status=None, upload_files=None, expect_errors=False, 327 content_type=None, xhr=False): 328 """ 329 Do a POST request. Similar to :meth:`~webtest.TestApp.get`. 330 331 :param params: 332 Are put in the body of the request. If params is a 333 iterator it will be urlencoded, if it is string it will not 334 be encoded, but placed in the body directly. 335 336 Can be a collections.OrderedDict with 337 :class:`webtest.forms.Upload` fields included:: 338 339 340 app.post('/myurl', collections.OrderedDict([ 341 ('textfield1', 'value1'), 342 ('uploadfield', webapp.Upload('filename.txt', 'contents'), 343 ('textfield2', 'value2')]))) 344 345 :param upload_files: 346 It should be a list of ``(fieldname, filename, file_content)``. 347 You can also use just ``(fieldname, filename)`` and the file 348 contents will be read from disk. 349 :type upload_files: 350 list 351 :param content_type: 352 HTTP content type, for example `application/json`. 353 :type content_type: 354 string 355 356 :param xhr: 357 If this is true, then marks response as ajax. The same as 358 headers={'X-REQUESTED-WITH': 'XMLHttpRequest', } 359 :type xhr: 360 boolean 361 362 :returns: :class:`webtest.TestResponse` instance. 363 364 """ 365 if xhr: 366 headers = self._add_xhr_header(headers) 367 return self._gen_request('POST', url, params=params, headers=headers, 368 extra_environ=extra_environ, status=status, 369 upload_files=upload_files, 370 expect_errors=expect_errors, 371 content_type=content_type) 372 373 def put(self, url, params='', headers=None, extra_environ=None, 374 status=None, upload_files=None, expect_errors=False, 375 content_type=None, xhr=False): 376 """ 377 Do a PUT request. Similar to :meth:`~webtest.TestApp.post`. 378 379 :returns: :class:`webtest.TestResponse` instance. 380 381 """ 382 if xhr: 383 headers = self._add_xhr_header(headers) 384 return self._gen_request('PUT', url, params=params, headers=headers, 385 extra_environ=extra_environ, status=status, 386 upload_files=upload_files, 387 expect_errors=expect_errors, 388 content_type=content_type, 389 ) 390 391 def patch(self, url, params='', headers=None, extra_environ=None, 392 status=None, upload_files=None, expect_errors=False, 393 content_type=None, xhr=False): 394 """ 395 Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`. 396 397 :returns: :class:`webtest.TestResponse` instance. 398 399 """ 400 if xhr: 401 headers = self._add_xhr_header(headers) 402 return self._gen_request('PATCH', url, params=params, headers=headers, 403 extra_environ=extra_environ, status=status, 404 upload_files=upload_files, 405 expect_errors=expect_errors, 406 content_type=content_type) 407 408 def delete(self, url, params='', headers=None, 409 extra_environ=None, status=None, expect_errors=False, 410 content_type=None, xhr=False): 411 """ 412 Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`. 413 414 :returns: :class:`webtest.TestResponse` instance. 415 416 """ 417 if xhr: 418 headers = self._add_xhr_header(headers) 419 return self._gen_request('DELETE', url, params=params, headers=headers, 420 extra_environ=extra_environ, status=status, 421 upload_files=None, 422 expect_errors=expect_errors, 423 content_type=content_type) 424 425 def options(self, url, headers=None, extra_environ=None, 426 status=None, expect_errors=False, xhr=False): 427 """ 428 Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`. 429 430 :returns: :class:`webtest.TestResponse` instance. 431 432 """ 433 if xhr: 434 headers = self._add_xhr_header(headers) 435 return self._gen_request('OPTIONS', url, headers=headers, 436 extra_environ=extra_environ, status=status, 437 upload_files=None, 438 expect_errors=expect_errors) 439 440 def head(self, url, headers=None, extra_environ=None, 441 status=None, expect_errors=False, xhr=False): 442 """ 443 Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`. 444 445 :returns: :class:`webtest.TestResponse` instance. 446 447 """ 448 if xhr: 449 headers = self._add_xhr_header(headers) 450 return self._gen_request('HEAD', url, headers=headers, 451 extra_environ=extra_environ, status=status, 452 upload_files=None, 453 expect_errors=expect_errors) 454 455 post_json = utils.json_method('POST') 456 put_json = utils.json_method('PUT') 457 patch_json = utils.json_method('PATCH') 458 delete_json = utils.json_method('DELETE') 459 460 def encode_multipart(self, params, files): 461 """ 462 Encodes a set of parameters (typically a name/value list) and 463 a set of files (a list of (name, filename, file_body, mimetype)) into a 464 typical POST body, returning the (content_type, body). 465 466 """ 467 boundary = to_bytes(str(random.random()))[2:] 468 boundary = b'----------a_BoUnDaRy' + boundary + b'$' 469 lines = [] 470 471 def _append_file(file_info): 472 key, filename, value, fcontent = self._get_file_info(file_info) 473 if isinstance(key, text_type): 474 try: 475 key = key.encode('ascii') 476 except: # pragma: no cover 477 raise # file name must be ascii 478 if isinstance(filename, text_type): 479 try: 480 filename = filename.encode('utf8') 481 except: # pragma: no cover 482 raise # file name must be ascii or utf8 483 if not fcontent: 484 fcontent = mimetypes.guess_type(filename.decode('utf8'))[0] 485 fcontent = to_bytes(fcontent) 486 fcontent = fcontent or b'application/octet-stream' 487 lines.extend([ 488 b'--' + boundary, 489 b'Content-Disposition: form-data; ' + 490 b'name="' + key + b'"; filename="' + filename + b'"', 491 b'Content-Type: ' + fcontent, b'', value]) 492 493 for key, value in params: 494 if isinstance(key, text_type): 495 try: 496 key = key.encode('ascii') 497 except: # pragma: no cover 498 raise # field name are always ascii 499 if isinstance(value, forms.File): 500 if value.value: 501 _append_file([key] + list(value.value)) 502 elif isinstance(value, forms.Upload): 503 file_info = [key, value.filename] 504 if value.content is not None: 505 file_info.append(value.content) 506 if value.content_type is not None: 507 file_info.append(value.content_type) 508 _append_file(file_info) 509 else: 510 if isinstance(value, text_type): 511 value = value.encode('utf8') 512 lines.extend([ 513 b'--' + boundary, 514 b'Content-Disposition: form-data; name="' + key + b'"', 515 b'', value]) 516 517 for file_info in files: 518 _append_file(file_info) 519 520 lines.extend([b'--' + boundary + b'--', b'']) 521 body = b'\r\n'.join(lines) 522 boundary = boundary.decode('ascii') 523 content_type = 'multipart/form-data; boundary=%s' % boundary 524 return content_type, body 525 526 def request(self, url_or_req, status=None, expect_errors=False, 527 **req_params): 528 """ 529 Creates and executes a request. You may either pass in an 530 instantiated :class:`TestRequest` object, or you may pass in a 531 URL and keyword arguments to be passed to 532 :meth:`TestRequest.blank`. 533 534 You can use this to run a request without the intermediary 535 functioning of :meth:`TestApp.get` etc. For instance, to 536 test a WebDAV method:: 537 538 resp = app.request('/new-col', method='MKCOL') 539 540 Note that the request won't have a body unless you specify it, 541 like:: 542 543 resp = app.request('/test.txt', method='PUT', body='test') 544 545 You can use :class:`webtest.TestRequest`:: 546 547 req = webtest.TestRequest.blank('/url/', method='GET') 548 resp = app.do_request(req) 549 550 """ 551 if isinstance(url_or_req, text_type): 552 url_or_req = str(url_or_req) 553 for (k, v) in req_params.items(): 554 if isinstance(v, text_type): 555 req_params[k] = str(v) 556 if isinstance(url_or_req, string_types): 557 req = self.RequestClass.blank(url_or_req, **req_params) 558 else: 559 req = url_or_req.copy() 560 for name, value in req_params.items(): 561 setattr(req, name, value) 562 req.environ['paste.throw_errors'] = True 563 for name, value in self.extra_environ.items(): 564 req.environ.setdefault(name, value) 565 return self.do_request(req, 566 status=status, 567 expect_errors=expect_errors, 568 ) 569 570 def do_request(self, req, status=None, expect_errors=None): 571 """ 572 Executes the given webob Request (``req``), with the expected 573 ``status``. Generally :meth:`~webtest.TestApp.get` and 574 :meth:`~webtest.TestApp.post` are used instead. 575 576 To use this:: 577 578 req = webtest.TestRequest.blank('url', ...args...) 579 resp = app.do_request(req) 580 581 .. note:: 582 583 You can pass any keyword arguments to 584 ``TestRequest.blank()``, which will be set on the request. 585 These can be arguments like ``content_type``, ``accept``, etc. 586 587 """ 588 589 errors = StringIO() 590 req.environ['wsgi.errors'] = errors 591 script_name = req.environ.get('SCRIPT_NAME', '') 592 if script_name and req.path_info.startswith(script_name): 593 req.path_info = req.path_info[len(script_name):] 594 595 # set framework hooks 596 req.environ['paste.testing'] = True 597 req.environ['paste.testing_variables'] = {} 598 599 # set request cookies 600 self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req)) 601 602 # verify wsgi compatibility 603 app = lint.middleware(self.app) if self.lint else self.app 604 605 ## FIXME: should it be an option to not catch exc_info? 606 res = req.get_response(app, catch_exc_info=True) 607 608 # be sure to decode the content 609 res.decode_content() 610 611 # set a few handy attributes 612 res._use_unicode = self.use_unicode 613 res.request = req 614 res.app = app 615 res.test_app = self 616 617 # We do this to make sure the app_iter is exausted: 618 try: 619 res.body 620 except TypeError: # pragma: no cover 621 pass 622 res.errors = errors.getvalue() 623 624 for name, value in req.environ['paste.testing_variables'].items(): 625 if hasattr(res, name): 626 raise ValueError( 627 "paste.testing_variables contains the variable %r, but " 628 "the response object already has an attribute by that " 629 "name" % name) 630 setattr(res, name, value) 631 if not expect_errors: 632 self._check_status(status, res) 633 self._check_errors(res) 634 635 # merge cookies back in 636 self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res), 637 utils._RequestCookieAdapter(req)) 638 639 return res 640 641 def _check_status(self, status, res): 642 if status == '*': 643 return 644 res_status = res.status 645 if (isinstance(status, string_types) and '*' in status): 646 if re.match(fnmatch.translate(status), res_status, re.I): 647 return 648 if isinstance(status, string_types): 649 if status == res_status: 650 return 651 if isinstance(status, (list, tuple)): 652 if res.status_int not in status: 653 raise AppError( 654 "Bad response: %s (not one of %s for %s)\n%s", 655 res_status, ', '.join(map(str, status)), 656 res.request.url, res) 657 return 658 if status is None: 659 if res.status_int >= 200 and res.status_int < 400: 660 return 661 raise AppError( 662 "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s", 663 res_status, res.request.url, 664 res) 665 if status != res.status_int: 666 raise AppError( 667 "Bad response: %s (not %s)", res_status, status) 668 669 def _check_errors(self, res): 670 errors = res.errors 671 if errors: 672 raise AppError( 673 "Application had errors logged:\n%s", errors) 674 675 def _make_environ(self, extra_environ=None): 676 environ = self.extra_environ.copy() 677 environ['paste.throw_errors'] = True 678 if extra_environ: 679 environ.update(extra_environ) 680 return environ 681 682 def _remove_fragment(self, url): 683 scheme, netloc, path, query, fragment = urlparse.urlsplit(url) 684 return urlparse.urlunsplit((scheme, netloc, path, query, "")) 685 686 def _gen_request(self, method, url, params=utils.NoDefault, 687 headers=None, extra_environ=None, status=None, 688 upload_files=None, expect_errors=False, 689 content_type=None): 690 """ 691 Do a generic request. 692 """ 693 694 environ = self._make_environ(extra_environ) 695 696 inline_uploads = [] 697 698 # this supports OrderedDict 699 if isinstance(params, dict) or hasattr(params, 'items'): 700 params = list(params.items()) 701 702 if isinstance(params, (list, tuple)): 703 inline_uploads = [v for (k, v) in params 704 if isinstance(v, (forms.File, forms.Upload))] 705 706 if len(inline_uploads) > 0: 707 content_type, params = self.encode_multipart( 708 params, upload_files or ()) 709 environ['CONTENT_TYPE'] = content_type 710 else: 711 params = utils.encode_params(params, content_type) 712 if upload_files or \ 713 (content_type and 714 to_bytes(content_type).startswith(b'multipart')): 715 params = urlparse.parse_qsl(params, keep_blank_values=True) 716 content_type, params = self.encode_multipart( 717 params, upload_files or ()) 718 environ['CONTENT_TYPE'] = content_type 719 elif params: 720 environ.setdefault('CONTENT_TYPE', 721 str('application/x-www-form-urlencoded')) 722 723 if content_type is not None: 724 environ['CONTENT_TYPE'] = content_type 725 environ['REQUEST_METHOD'] = str(method) 726 url = str(url) 727 url = self._remove_fragment(url) 728 req = self.RequestClass.blank(url, environ) 729 if isinstance(params, text_type): 730 params = params.encode(req.charset or 'utf8') 731 req.environ['wsgi.input'] = BytesIO(params) 732 req.content_length = len(params) 733 if headers: 734 req.headers.update(headers) 735 return self.do_request(req, status=status, 736 expect_errors=expect_errors) 737 738 def _get_file_info(self, file_info): 739 if len(file_info) == 2: 740 # It only has a filename 741 filename = file_info[1] 742 if self.relative_to: 743 filename = os.path.join(self.relative_to, filename) 744 f = open(filename, 'rb') 745 content = f.read() 746 f.close() 747 return (file_info[0], filename, content, None) 748 elif 3 <= len(file_info) <= 4: 749 content = file_info[2] 750 if not isinstance(content, binary_type): 751 raise ValueError('File content must be %s not %s' 752 % (binary_type, type(content))) 753 if len(file_info) == 3: 754 return tuple(file_info) + (None,) 755 else: 756 return file_info 757 else: 758 raise ValueError( 759 "upload_files need to be a list of tuples of (fieldname, " 760 "filename, filecontent, mimetype) or (fieldname, " 761 "filename, filecontent) or (fieldname, filename); " 762 "you gave: %r" 763 % repr(file_info)[:100]) 764 765 @staticmethod 766 def _add_xhr_header(headers): 767 headers = headers or {} 768 # if remove str we will be have an error in lint.middleware 769 headers.update({'X-REQUESTED-WITH': str('XMLHttpRequest')}) 770 return headers 771