• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# (c) 2005 Ian Bicking and contributors; written for Paste
2# (http://pythonpaste.org)
3# Licensed under the MIT license:
4# http://www.opensource.org/licenses/mit-license.php
6Routines for testing WSGI applications.
8Most interesting is TestApp
10from __future__ import unicode_literals
12import os
13import re
14import json
15import random
16import fnmatch
17import mimetypes
19from base64 import b64encode
21from six import StringIO
22from six import BytesIO
23from six import string_types
24from six import binary_type
25from six import text_type
26from six.moves import http_cookiejar
28from webtest.compat import urlparse
29from webtest.compat import urlencode
30from webtest.compat import to_bytes
31from webtest.compat import escape_cookie_value
32from webtest.response import TestResponse
33from webtest import forms
34from webtest import lint
35from webtest import utils
37import webob
40__all__ = ['TestApp', 'TestRequest']
43class AppError(Exception):
45    def __init__(self, message, *args):
46        if isinstance(message, binary_type):
47            message = message.decode('utf8')
48        str_args = ()
49        for arg in args:
50            if isinstance(arg, webob.Response):
51                body = arg.body
52                if isinstance(body, binary_type):
53                    if arg.charset:
54                        arg = body.decode(arg.charset)
55                    else:
56                        arg = repr(body)
57            elif isinstance(arg, binary_type):
58                try:
59                    arg = arg.decode('utf8')
60                except UnicodeDecodeError:
61                    arg = repr(arg)
62            str_args += (arg,)
63        message = message % str_args
64        Exception.__init__(self, message)
67class CookiePolicy(http_cookiejar.DefaultCookiePolicy):
68    """A subclass of DefaultCookiePolicy to allow cookie set for
69    Domain=localhost."""
71    def return_ok_domain(self, cookie, request):
72        if cookie.domain == '.localhost':
73            return True
74        return http_cookiejar.DefaultCookiePolicy.return_ok_domain(
75            self, cookie, request)
77    def set_ok_domain(self, cookie, request):
78        if cookie.domain == '.localhost':
79            return True
80        return http_cookiejar.DefaultCookiePolicy.set_ok_domain(
81            self, cookie, request)
84class TestRequest(webob.BaseRequest):
85    """A subclass of webob.Request"""
86    ResponseClass = TestResponse
89class TestApp(object):
90    """
91    Wraps a WSGI application in a more convenient interface for
92    testing. It uses extended version of :class:`webob.BaseRequest`
93    and :class:`webob.Response`.
95    :param app:
96        May be an WSGI application or Paste Deploy app,
97        like ``'config:filename.ini#test'``.
99        .. versionadded:: 2.0
101        It can also be an actual full URL to an http server and webtest
102        will proxy requests with `WSGIProxy2
103        <https://pypi.python.org/pypi/WSGIProxy2/>`_.
104    :type app:
105        WSGI application
106    :param extra_environ:
107        A dictionary of values that should go
108        into the environment for each request. These can provide a
109        communication channel with the application.
110    :type extra_environ:
111        dict
112    :param relative_to:
113        A directory used for file
114        uploads are calculated relative to this.  Also ``config:``
115        URIs that aren't absolute.
116    :type relative_to:
117        string
118    :param cookiejar:
119        :class:`cookielib.CookieJar` alike API that keeps cookies
120        across requets.
121    :type cookiejar:
122        CookieJar instance
124    .. attribute:: cookies
126        A convenient shortcut for a dict of all cookies in
127        ``cookiejar``.
129    :param parser_features:
130        Passed to BeautifulSoup when parsing responses.
131    :type parser_features:
132        string or list
133    :param json_encoder:
134        Passed to json.dumps when encoding json
135    :type json_encoder:
136        A subclass of json.JSONEncoder
137    :param lint:
138        If True (default) then check that the application is WSGI compliant
139    :type lint:
140        A boolean
141    """
143    RequestClass = TestRequest
145    def __init__(self, app, extra_environ=None, relative_to=None,
146                 use_unicode=True, cookiejar=None, parser_features=None,
147                 json_encoder=None, lint=True):
149        if 'WEBTEST_TARGET_URL' in os.environ:
150            app = os.environ['WEBTEST_TARGET_URL']
151        if isinstance(app, string_types):
152            if app.startswith('http'):
153                try:
154                    from wsgiproxy import HostProxy
155                except ImportError:  # pragma: no cover
156                    raise ImportError((
157                        'Using webtest with a real url requires WSGIProxy2. '
158                        'Please install it with: '
159                        'pip install WSGIProxy2'))
160                if '#' not in app:
161                    app += '#httplib'
162                url, client = app.split('#', 1)
163                app = HostProxy(url, client=client)
164            else:
165                from paste.deploy import loadapp
166                # @@: Should pick up relative_to from calling module's
167                # __file__
168                app = loadapp(app, relative_to=relative_to)
169        self.app = app
170        self.lint = lint
171        self.relative_to = relative_to
172        if extra_environ is None:
173            extra_environ = {}
174        self.extra_environ = extra_environ
175        self.use_unicode = use_unicode
176        if cookiejar is None:
177            cookiejar = http_cookiejar.CookieJar(policy=CookiePolicy())
178        self.cookiejar = cookiejar
179        if parser_features is None:
180            parser_features = 'html.parser'
181        self.RequestClass.ResponseClass.parser_features = parser_features
182        if json_encoder is None:
183            json_encoder = json.JSONEncoder
184        self.JSONEncoder = json_encoder
186    def get_authorization(self):
187        """Allow to set the HTTP_AUTHORIZATION environ key. Value should looks
188        like ``('Basic', ('user', 'password'))``
190        If value is None the the HTTP_AUTHORIZATION is removed
191        """
192        return self.authorization_value
194    def set_authorization(self, value):
195        self.authorization_value = value
196        if value is not None:
197            invalid_value = (
198                "You should use a value like ('Basic', ('user', 'password'))"
199            )
200            if isinstance(value, (list, tuple)) and len(value) == 2:
201                authtype, val = value
202                if authtype == 'Basic' and val and \
203                   isinstance(val, (list, tuple)):
204                    val = ':'.join(list(val))
205                    val = b64encode(to_bytes(val)).strip()
206                    val = val.decode('latin1')
207                else:
208                    raise ValueError(invalid_value)
209                value = str('%s %s' % (authtype, val))
210            else:
211                raise ValueError(invalid_value)
212            self.extra_environ.update({
213                'HTTP_AUTHORIZATION': value,
214            })
215        else:
216            if 'HTTP_AUTHORIZATION' in self.extra_environ:
217                del self.extra_environ['HTTP_AUTHORIZATION']
219    authorization = property(get_authorization, set_authorization)
221    @property
222    def cookies(self):
223        return dict([(cookie.name, cookie.value) for cookie in self.cookiejar])
225    def set_cookie(self, name, value):
226        """
227        Sets a cookie to be passed through with requests.
229        """
230        value = escape_cookie_value(value)
231        cookie = http_cookiejar.Cookie(
232            version=0,
233            name=name,
234            value=value,
235            port=None,
236            port_specified=False,
237            domain='.localhost',
238            domain_specified=True,
239            domain_initial_dot=False,
240            path='/',
241            path_specified=True,
242            secure=False,
243            expires=None,
244            discard=False,
245            comment=None,
246            comment_url=None,
247            rest=None
248        )
249        self.cookiejar.set_cookie(cookie)
251    def reset(self):
252        """
253        Resets the state of the application; currently just clears
254        saved cookies.
255        """
256        self.cookiejar.clear()
258    def set_parser_features(self, parser_features):
259        """
260        Changes the parser used by BeautifulSoup. See its documentation to
261        know the supported parsers.
262        """
263        self.RequestClass.ResponseClass.parser_features = parser_features
265    def get(self, url, params=None, headers=None, extra_environ=None,
266            status=None, expect_errors=False, xhr=False):
267        """
268        Do a GET request given the url path.
270        :param params:
271            A query string, or a dictionary that will be encoded
272            into a query string.  You may also include a URL query
273            string on the ``url``.
274        :param headers:
275            Extra headers to send.
276        :type headers:
277            dictionary
278        :param extra_environ:
279            Environmental variables that should be added to the request.
280        :type extra_environ:
281            dictionary
282        :param status:
283            The HTTP status code you expect in response (if not 200 or 3xx).
284            You can also use a wildcard, like ``'3*'`` or ``'*'``.
285        :type status:
286            integer or string
287        :param expect_errors:
288            If this is False, then if anything is written to
289            environ ``wsgi.errors`` it will be an error.
290            If it is True, then non-200/3xx responses are also okay.
291        :type expect_errors:
292            boolean
293        :param xhr:
294            If this is true, then marks response as ajax. The same as
295            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
296        :type xhr:
297            boolean
299        :returns: :class:`webtest.TestResponse` instance.
301        """
302        environ = self._make_environ(extra_environ)
303        url = str(url)
304        url = self._remove_fragment(url)
305        if params:
306            if not isinstance(params, string_types):
307                params = urlencode(params, doseq=True)
308            if str('?') in url:
309                url += str('&')
310            else:
311                url += str('?')
312            url += params
313        if str('?') in url:
314            url, environ['QUERY_STRING'] = url.split(str('?'), 1)
315        else:
316            environ['QUERY_STRING'] = str('')
317        req = self.RequestClass.blank(url, environ)
318        if xhr:
319            headers = self._add_xhr_header(headers)
320        if headers:
321            req.headers.update(headers)
322        return self.do_request(req, status=status,
323                               expect_errors=expect_errors)
325    def post(self, url, params='', headers=None, extra_environ=None,
326             status=None, upload_files=None, expect_errors=False,
327             content_type=None, xhr=False):
328        """
329        Do a POST request. Similar to :meth:`~webtest.TestApp.get`.
331        :param params:
332            Are put in the body of the request. If params is a
333            iterator it will be urlencoded, if it is string it will not
334            be encoded, but placed in the body directly.
336            Can be a collections.OrderedDict with
337            :class:`webtest.forms.Upload` fields included::
340            app.post('/myurl', collections.OrderedDict([
341                ('textfield1', 'value1'),
342                ('uploadfield', webapp.Upload('filename.txt', 'contents'),
343                ('textfield2', 'value2')])))
345        :param upload_files:
346            It should be a list of ``(fieldname, filename, file_content)``.
347            You can also use just ``(fieldname, filename)`` and the file
348            contents will be read from disk.
349        :type upload_files:
350            list
351        :param content_type:
352            HTTP content type, for example `application/json`.
353        :type content_type:
354            string
356        :param xhr:
357            If this is true, then marks response as ajax. The same as
358            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
359        :type xhr:
360            boolean
362        :returns: :class:`webtest.TestResponse` instance.
364        """
365        if xhr:
366            headers = self._add_xhr_header(headers)
367        return self._gen_request('POST', url, params=params, headers=headers,
368                                 extra_environ=extra_environ, status=status,
369                                 upload_files=upload_files,
370                                 expect_errors=expect_errors,
371                                 content_type=content_type)
373    def put(self, url, params='', headers=None, extra_environ=None,
374            status=None, upload_files=None, expect_errors=False,
375            content_type=None, xhr=False):
376        """
377        Do a PUT request. Similar to :meth:`~webtest.TestApp.post`.
379        :returns: :class:`webtest.TestResponse` instance.
381        """
382        if xhr:
383            headers = self._add_xhr_header(headers)
384        return self._gen_request('PUT', url, params=params, headers=headers,
385                                 extra_environ=extra_environ, status=status,
386                                 upload_files=upload_files,
387                                 expect_errors=expect_errors,
388                                 content_type=content_type,
389                                 )
391    def patch(self, url, params='', headers=None, extra_environ=None,
392              status=None, upload_files=None, expect_errors=False,
393              content_type=None, xhr=False):
394        """
395        Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`.
397        :returns: :class:`webtest.TestResponse` instance.
399        """
400        if xhr:
401            headers = self._add_xhr_header(headers)
402        return self._gen_request('PATCH', url, params=params, headers=headers,
403                                 extra_environ=extra_environ, status=status,
404                                 upload_files=upload_files,
405                                 expect_errors=expect_errors,
406                                 content_type=content_type)
408    def delete(self, url, params='', headers=None,
409               extra_environ=None, status=None, expect_errors=False,
410               content_type=None, xhr=False):
411        """
412        Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`.
414        :returns: :class:`webtest.TestResponse` instance.
416        """
417        if xhr:
418            headers = self._add_xhr_header(headers)
419        return self._gen_request('DELETE', url, params=params, headers=headers,
420                                 extra_environ=extra_environ, status=status,
421                                 upload_files=None,
422                                 expect_errors=expect_errors,
423                                 content_type=content_type)
425    def options(self, url, headers=None, extra_environ=None,
426                status=None, expect_errors=False, xhr=False):
427        """
428        Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`.
430        :returns: :class:`webtest.TestResponse` instance.
432        """
433        if xhr:
434            headers = self._add_xhr_header(headers)
435        return self._gen_request('OPTIONS', url, headers=headers,
436                                 extra_environ=extra_environ, status=status,
437                                 upload_files=None,
438                                 expect_errors=expect_errors)
440    def head(self, url, headers=None, extra_environ=None,
441             status=None, expect_errors=False, xhr=False):
442        """
443        Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`.
445        :returns: :class:`webtest.TestResponse` instance.
447        """
448        if xhr:
449            headers = self._add_xhr_header(headers)
450        return self._gen_request('HEAD', url, headers=headers,
451                                 extra_environ=extra_environ, status=status,
452                                 upload_files=None,
453                                 expect_errors=expect_errors)
455    post_json = utils.json_method('POST')
456    put_json = utils.json_method('PUT')
457    patch_json = utils.json_method('PATCH')
458    delete_json = utils.json_method('DELETE')
460    def encode_multipart(self, params, files):
461        """
462        Encodes a set of parameters (typically a name/value list) and
463        a set of files (a list of (name, filename, file_body, mimetype)) into a
464        typical POST body, returning the (content_type, body).
466        """
467        boundary = to_bytes(str(random.random()))[2:]
468        boundary = b'----------a_BoUnDaRy' + boundary + b'$'
469        lines = []
471        def _append_file(file_info):
472            key, filename, value, fcontent = self._get_file_info(file_info)
473            if isinstance(key, text_type):
474                try:
475                    key = key.encode('ascii')
476                except:  # pragma: no cover
477                    raise  # file name must be ascii
478            if isinstance(filename, text_type):
479                try:
480                    filename = filename.encode('utf8')
481                except:  # pragma: no cover
482                    raise  # file name must be ascii or utf8
483            if not fcontent:
484                fcontent = mimetypes.guess_type(filename.decode('utf8'))[0]
485            fcontent = to_bytes(fcontent)
486            fcontent = fcontent or b'application/octet-stream'
487            lines.extend([
488                b'--' + boundary,
489                b'Content-Disposition: form-data; ' +
490                b'name="' + key + b'"; filename="' + filename + b'"',
491                b'Content-Type: ' + fcontent, b'', value])
493        for key, value in params:
494            if isinstance(key, text_type):
495                try:
496                    key = key.encode('ascii')
497                except:  # pragma: no cover
498                    raise  # field name are always ascii
499            if isinstance(value, forms.File):
500                if value.value:
501                    _append_file([key] + list(value.value))
502            elif isinstance(value, forms.Upload):
503                file_info = [key, value.filename]
504                if value.content is not None:
505                    file_info.append(value.content)
506                    if value.content_type is not None:
507                        file_info.append(value.content_type)
508                _append_file(file_info)
509            else:
510                if isinstance(value, text_type):
511                    value = value.encode('utf8')
512                lines.extend([
513                    b'--' + boundary,
514                    b'Content-Disposition: form-data; name="' + key + b'"',
515                    b'', value])
517        for file_info in files:
518            _append_file(file_info)
520        lines.extend([b'--' + boundary + b'--', b''])
521        body = b'\r\n'.join(lines)
522        boundary = boundary.decode('ascii')
523        content_type = 'multipart/form-data; boundary=%s' % boundary
524        return content_type, body
526    def request(self, url_or_req, status=None, expect_errors=False,
527                **req_params):
528        """
529        Creates and executes a request. You may either pass in an
530        instantiated :class:`TestRequest` object, or you may pass in a
531        URL and keyword arguments to be passed to
532        :meth:`TestRequest.blank`.
534        You can use this to run a request without the intermediary
535        functioning of :meth:`TestApp.get` etc.  For instance, to
536        test a WebDAV method::
538            resp = app.request('/new-col', method='MKCOL')
540        Note that the request won't have a body unless you specify it,
541        like::
543            resp = app.request('/test.txt', method='PUT', body='test')
545        You can use :class:`webtest.TestRequest`::
547            req = webtest.TestRequest.blank('/url/', method='GET')
548            resp = app.do_request(req)
550        """
551        if isinstance(url_or_req, text_type):
552            url_or_req = str(url_or_req)
553        for (k, v) in req_params.items():
554            if isinstance(v, text_type):
555                req_params[k] = str(v)
556        if isinstance(url_or_req, string_types):
557            req = self.RequestClass.blank(url_or_req, **req_params)
558        else:
559            req = url_or_req.copy()
560            for name, value in req_params.items():
561                setattr(req, name, value)
562        req.environ['paste.throw_errors'] = True
563        for name, value in self.extra_environ.items():
564            req.environ.setdefault(name, value)
565        return self.do_request(req,
566                               status=status,
567                               expect_errors=expect_errors,
568                               )
570    def do_request(self, req, status=None, expect_errors=None):
571        """
572        Executes the given webob Request (``req``), with the expected
573        ``status``.  Generally :meth:`~webtest.TestApp.get` and
574        :meth:`~webtest.TestApp.post` are used instead.
576        To use this::
578            req = webtest.TestRequest.blank('url', ...args...)
579            resp = app.do_request(req)
581        .. note::
583            You can pass any keyword arguments to
584            ``TestRequest.blank()``, which will be set on the request.
585            These can be arguments like ``content_type``, ``accept``, etc.
587        """
589        errors = StringIO()
590        req.environ['wsgi.errors'] = errors
591        script_name = req.environ.get('SCRIPT_NAME', '')
592        if script_name and req.path_info.startswith(script_name):
593            req.path_info = req.path_info[len(script_name):]
595        # set framework hooks
596        req.environ['paste.testing'] = True
597        req.environ['paste.testing_variables'] = {}
599        # set request cookies
600        self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req))
602        # verify wsgi compatibility
603        app = lint.middleware(self.app) if self.lint else self.app
605        ## FIXME: should it be an option to not catch exc_info?
606        res = req.get_response(app, catch_exc_info=True)
608        # be sure to decode the content
609        res.decode_content()
611        # set a few handy attributes
612        res._use_unicode = self.use_unicode
613        res.request = req
614        res.app = app
615        res.test_app = self
617        # We do this to make sure the app_iter is exausted:
618        try:
619            res.body
620        except TypeError:  # pragma: no cover
621            pass
622        res.errors = errors.getvalue()
624        for name, value in req.environ['paste.testing_variables'].items():
625            if hasattr(res, name):
626                raise ValueError(
627                    "paste.testing_variables contains the variable %r, but "
628                    "the response object already has an attribute by that "
629                    "name" % name)
630            setattr(res, name, value)
631        if not expect_errors:
632            self._check_status(status, res)
633            self._check_errors(res)
635        # merge cookies back in
636        self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res),
637                                       utils._RequestCookieAdapter(req))
639        return res
641    def _check_status(self, status, res):
642        if status == '*':
643            return
644        res_status = res.status
645        if (isinstance(status, string_types) and '*' in status):
646            if re.match(fnmatch.translate(status), res_status, re.I):
647                return
648        if isinstance(status, string_types):
649            if status == res_status:
650                return
651        if isinstance(status, (list, tuple)):
652            if res.status_int not in status:
653                raise AppError(
654                    "Bad response: %s (not one of %s for %s)\n%s",
655                    res_status, ', '.join(map(str, status)),
656                    res.request.url, res)
657            return
658        if status is None:
659            if res.status_int >= 200 and res.status_int < 400:
660                return
661            raise AppError(
662                "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s",
663                res_status, res.request.url,
664                res)
665        if status != res.status_int:
666            raise AppError(
667                "Bad response: %s (not %s)", res_status, status)
669    def _check_errors(self, res):
670        errors = res.errors
671        if errors:
672            raise AppError(
673                "Application had errors logged:\n%s", errors)
675    def _make_environ(self, extra_environ=None):
676        environ = self.extra_environ.copy()
677        environ['paste.throw_errors'] = True
678        if extra_environ:
679            environ.update(extra_environ)
680        return environ
682    def _remove_fragment(self, url):
683        scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
684        return urlparse.urlunsplit((scheme, netloc, path, query, ""))
686    def _gen_request(self, method, url, params=utils.NoDefault,
687                     headers=None, extra_environ=None, status=None,
688                     upload_files=None, expect_errors=False,
689                     content_type=None):
690        """
691        Do a generic request.
692        """
694        environ = self._make_environ(extra_environ)
696        inline_uploads = []
698        # this supports OrderedDict
699        if isinstance(params, dict) or hasattr(params, 'items'):
700            params = list(params.items())
702        if isinstance(params, (list, tuple)):
703            inline_uploads = [v for (k, v) in params
704                              if isinstance(v, (forms.File, forms.Upload))]
706        if len(inline_uploads) > 0:
707            content_type, params = self.encode_multipart(
708                params, upload_files or ())
709            environ['CONTENT_TYPE'] = content_type
710        else:
711            params = utils.encode_params(params, content_type)
712            if upload_files or \
713                (content_type and
714                 to_bytes(content_type).startswith(b'multipart')):
715                params = urlparse.parse_qsl(params, keep_blank_values=True)
716                content_type, params = self.encode_multipart(
717                    params, upload_files or ())
718                environ['CONTENT_TYPE'] = content_type
719            elif params:
720                environ.setdefault('CONTENT_TYPE',
721                                   str('application/x-www-form-urlencoded'))
723        if content_type is not None:
724            environ['CONTENT_TYPE'] = content_type
725        environ['REQUEST_METHOD'] = str(method)
726        url = str(url)
727        url = self._remove_fragment(url)
728        req = self.RequestClass.blank(url, environ)
729        if isinstance(params, text_type):
730            params = params.encode(req.charset or 'utf8')
731        req.environ['wsgi.input'] = BytesIO(params)
732        req.content_length = len(params)
733        if headers:
734            req.headers.update(headers)
735        return self.do_request(req, status=status,
736                               expect_errors=expect_errors)
738    def _get_file_info(self, file_info):
739        if len(file_info) == 2:
740            # It only has a filename
741            filename = file_info[1]
742            if self.relative_to:
743                filename = os.path.join(self.relative_to, filename)
744            f = open(filename, 'rb')
745            content = f.read()
746            f.close()
747            return (file_info[0], filename, content, None)
748        elif 3 <= len(file_info) <= 4:
749            content = file_info[2]
750            if not isinstance(content, binary_type):
751                raise ValueError('File content must be %s not %s'
752                                 % (binary_type, type(content)))
753            if len(file_info) == 3:
754                return tuple(file_info) + (None,)
755            else:
756                return file_info
757        else:
758            raise ValueError(
759                "upload_files need to be a list of tuples of (fieldname, "
760                "filename, filecontent, mimetype) or (fieldname, "
761                "filename, filecontent) or (fieldname, filename); "
762                "you gave: %r"
763                % repr(file_info)[:100])
765    @staticmethod
766    def _add_xhr_header(headers):
767        headers = headers or {}
768        # if remove str we will be have an error in lint.middleware
769        headers.update({'X-REQUESTED-WITH': str('XMLHttpRequest')})
770        return headers