• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# (c) 2005 Ian Bicking and contributors; written for Paste
2# (http://pythonpaste.org)
3# Licensed under the MIT license:
4# http://www.opensource.org/licenses/mit-license.php
5"""
6Routines for testing WSGI applications.
7
8Most interesting is TestApp
9"""
10from __future__ import unicode_literals
11
12import os
13import re
14import json
15import random
16import fnmatch
17import mimetypes
18
19from base64 import b64encode
20
21from six import StringIO
22from six import BytesIO
23from six import string_types
24from six import binary_type
25from six import text_type
26from six.moves import http_cookiejar
27
28from webtest.compat import urlparse
29from webtest.compat import urlencode
30from webtest.compat import to_bytes
31from webtest.compat import escape_cookie_value
32from webtest.response import TestResponse
33from webtest import forms
34from webtest import lint
35from webtest import utils
36
37import webob
38
39
40__all__ = ['TestApp', 'TestRequest']
41
42
43class AppError(Exception):
44
45    def __init__(self, message, *args):
46        if isinstance(message, binary_type):
47            message = message.decode('utf8')
48        str_args = ()
49        for arg in args:
50            if isinstance(arg, webob.Response):
51                body = arg.body
52                if isinstance(body, binary_type):
53                    if arg.charset:
54                        arg = body.decode(arg.charset)
55                    else:
56                        arg = repr(body)
57            elif isinstance(arg, binary_type):
58                try:
59                    arg = arg.decode('utf8')
60                except UnicodeDecodeError:
61                    arg = repr(arg)
62            str_args += (arg,)
63        message = message % str_args
64        Exception.__init__(self, message)
65
66
67class CookiePolicy(http_cookiejar.DefaultCookiePolicy):
68    """A subclass of DefaultCookiePolicy to allow cookie set for
69    Domain=localhost."""
70
71    def return_ok_domain(self, cookie, request):
72        if cookie.domain == '.localhost':
73            return True
74        return http_cookiejar.DefaultCookiePolicy.return_ok_domain(
75            self, cookie, request)
76
77    def set_ok_domain(self, cookie, request):
78        if cookie.domain == '.localhost':
79            return True
80        return http_cookiejar.DefaultCookiePolicy.set_ok_domain(
81            self, cookie, request)
82
83
84class TestRequest(webob.BaseRequest):
85    """A subclass of webob.Request"""
86    ResponseClass = TestResponse
87
88
89class TestApp(object):
90    """
91    Wraps a WSGI application in a more convenient interface for
92    testing. It uses extended version of :class:`webob.BaseRequest`
93    and :class:`webob.Response`.
94
95    :param app:
96        May be an WSGI application or Paste Deploy app,
97        like ``'config:filename.ini#test'``.
98
99        .. versionadded:: 2.0
100
101        It can also be an actual full URL to an http server and webtest
102        will proxy requests with `WSGIProxy2
103        <https://pypi.python.org/pypi/WSGIProxy2/>`_.
104    :type app:
105        WSGI application
106    :param extra_environ:
107        A dictionary of values that should go
108        into the environment for each request. These can provide a
109        communication channel with the application.
110    :type extra_environ:
111        dict
112    :param relative_to:
113        A directory used for file
114        uploads are calculated relative to this.  Also ``config:``
115        URIs that aren't absolute.
116    :type relative_to:
117        string
118    :param cookiejar:
119        :class:`cookielib.CookieJar` alike API that keeps cookies
120        across requets.
121    :type cookiejar:
122        CookieJar instance
123
124    .. attribute:: cookies
125
126        A convenient shortcut for a dict of all cookies in
127        ``cookiejar``.
128
129    :param parser_features:
130        Passed to BeautifulSoup when parsing responses.
131    :type parser_features:
132        string or list
133    :param json_encoder:
134        Passed to json.dumps when encoding json
135    :type json_encoder:
136        A subclass of json.JSONEncoder
137    :param lint:
138        If True (default) then check that the application is WSGI compliant
139    :type lint:
140        A boolean
141    """
142
143    RequestClass = TestRequest
144
145    def __init__(self, app, extra_environ=None, relative_to=None,
146                 use_unicode=True, cookiejar=None, parser_features=None,
147                 json_encoder=None, lint=True):
148
149        if 'WEBTEST_TARGET_URL' in os.environ:
150            app = os.environ['WEBTEST_TARGET_URL']
151        if isinstance(app, string_types):
152            if app.startswith('http'):
153                try:
154                    from wsgiproxy import HostProxy
155                except ImportError:  # pragma: no cover
156                    raise ImportError((
157                        'Using webtest with a real url requires WSGIProxy2. '
158                        'Please install it with: '
159                        'pip install WSGIProxy2'))
160                if '#' not in app:
161                    app += '#httplib'
162                url, client = app.split('#', 1)
163                app = HostProxy(url, client=client)
164            else:
165                from paste.deploy import loadapp
166                # @@: Should pick up relative_to from calling module's
167                # __file__
168                app = loadapp(app, relative_to=relative_to)
169        self.app = app
170        self.lint = lint
171        self.relative_to = relative_to
172        if extra_environ is None:
173            extra_environ = {}
174        self.extra_environ = extra_environ
175        self.use_unicode = use_unicode
176        if cookiejar is None:
177            cookiejar = http_cookiejar.CookieJar(policy=CookiePolicy())
178        self.cookiejar = cookiejar
179        if parser_features is None:
180            parser_features = 'html.parser'
181        self.RequestClass.ResponseClass.parser_features = parser_features
182        if json_encoder is None:
183            json_encoder = json.JSONEncoder
184        self.JSONEncoder = json_encoder
185
186    def get_authorization(self):
187        """Allow to set the HTTP_AUTHORIZATION environ key. Value should looks
188        like ``('Basic', ('user', 'password'))``
189
190        If value is None the the HTTP_AUTHORIZATION is removed
191        """
192        return self.authorization_value
193
194    def set_authorization(self, value):
195        self.authorization_value = value
196        if value is not None:
197            invalid_value = (
198                "You should use a value like ('Basic', ('user', 'password'))"
199            )
200            if isinstance(value, (list, tuple)) and len(value) == 2:
201                authtype, val = value
202                if authtype == 'Basic' and val and \
203                   isinstance(val, (list, tuple)):
204                    val = ':'.join(list(val))
205                    val = b64encode(to_bytes(val)).strip()
206                    val = val.decode('latin1')
207                else:
208                    raise ValueError(invalid_value)
209                value = str('%s %s' % (authtype, val))
210            else:
211                raise ValueError(invalid_value)
212            self.extra_environ.update({
213                'HTTP_AUTHORIZATION': value,
214            })
215        else:
216            if 'HTTP_AUTHORIZATION' in self.extra_environ:
217                del self.extra_environ['HTTP_AUTHORIZATION']
218
219    authorization = property(get_authorization, set_authorization)
220
221    @property
222    def cookies(self):
223        return dict([(cookie.name, cookie.value) for cookie in self.cookiejar])
224
225    def set_cookie(self, name, value):
226        """
227        Sets a cookie to be passed through with requests.
228
229        """
230        value = escape_cookie_value(value)
231        cookie = http_cookiejar.Cookie(
232            version=0,
233            name=name,
234            value=value,
235            port=None,
236            port_specified=False,
237            domain='.localhost',
238            domain_specified=True,
239            domain_initial_dot=False,
240            path='/',
241            path_specified=True,
242            secure=False,
243            expires=None,
244            discard=False,
245            comment=None,
246            comment_url=None,
247            rest=None
248        )
249        self.cookiejar.set_cookie(cookie)
250
251    def reset(self):
252        """
253        Resets the state of the application; currently just clears
254        saved cookies.
255        """
256        self.cookiejar.clear()
257
258    def set_parser_features(self, parser_features):
259        """
260        Changes the parser used by BeautifulSoup. See its documentation to
261        know the supported parsers.
262        """
263        self.RequestClass.ResponseClass.parser_features = parser_features
264
265    def get(self, url, params=None, headers=None, extra_environ=None,
266            status=None, expect_errors=False, xhr=False):
267        """
268        Do a GET request given the url path.
269
270        :param params:
271            A query string, or a dictionary that will be encoded
272            into a query string.  You may also include a URL query
273            string on the ``url``.
274        :param headers:
275            Extra headers to send.
276        :type headers:
277            dictionary
278        :param extra_environ:
279            Environmental variables that should be added to the request.
280        :type extra_environ:
281            dictionary
282        :param status:
283            The HTTP status code you expect in response (if not 200 or 3xx).
284            You can also use a wildcard, like ``'3*'`` or ``'*'``.
285        :type status:
286            integer or string
287        :param expect_errors:
288            If this is False, then if anything is written to
289            environ ``wsgi.errors`` it will be an error.
290            If it is True, then non-200/3xx responses are also okay.
291        :type expect_errors:
292            boolean
293        :param xhr:
294            If this is true, then marks response as ajax. The same as
295            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
296        :type xhr:
297            boolean
298
299        :returns: :class:`webtest.TestResponse` instance.
300
301        """
302        environ = self._make_environ(extra_environ)
303        url = str(url)
304        url = self._remove_fragment(url)
305        if params:
306            if not isinstance(params, string_types):
307                params = urlencode(params, doseq=True)
308            if str('?') in url:
309                url += str('&')
310            else:
311                url += str('?')
312            url += params
313        if str('?') in url:
314            url, environ['QUERY_STRING'] = url.split(str('?'), 1)
315        else:
316            environ['QUERY_STRING'] = str('')
317        req = self.RequestClass.blank(url, environ)
318        if xhr:
319            headers = self._add_xhr_header(headers)
320        if headers:
321            req.headers.update(headers)
322        return self.do_request(req, status=status,
323                               expect_errors=expect_errors)
324
325    def post(self, url, params='', headers=None, extra_environ=None,
326             status=None, upload_files=None, expect_errors=False,
327             content_type=None, xhr=False):
328        """
329        Do a POST request. Similar to :meth:`~webtest.TestApp.get`.
330
331        :param params:
332            Are put in the body of the request. If params is a
333            iterator it will be urlencoded, if it is string it will not
334            be encoded, but placed in the body directly.
335
336            Can be a collections.OrderedDict with
337            :class:`webtest.forms.Upload` fields included::
338
339
340            app.post('/myurl', collections.OrderedDict([
341                ('textfield1', 'value1'),
342                ('uploadfield', webapp.Upload('filename.txt', 'contents'),
343                ('textfield2', 'value2')])))
344
345        :param upload_files:
346            It should be a list of ``(fieldname, filename, file_content)``.
347            You can also use just ``(fieldname, filename)`` and the file
348            contents will be read from disk.
349        :type upload_files:
350            list
351        :param content_type:
352            HTTP content type, for example `application/json`.
353        :type content_type:
354            string
355
356        :param xhr:
357            If this is true, then marks response as ajax. The same as
358            headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
359        :type xhr:
360            boolean
361
362        :returns: :class:`webtest.TestResponse` instance.
363
364        """
365        if xhr:
366            headers = self._add_xhr_header(headers)
367        return self._gen_request('POST', url, params=params, headers=headers,
368                                 extra_environ=extra_environ, status=status,
369                                 upload_files=upload_files,
370                                 expect_errors=expect_errors,
371                                 content_type=content_type)
372
373    def put(self, url, params='', headers=None, extra_environ=None,
374            status=None, upload_files=None, expect_errors=False,
375            content_type=None, xhr=False):
376        """
377        Do a PUT request. Similar to :meth:`~webtest.TestApp.post`.
378
379        :returns: :class:`webtest.TestResponse` instance.
380
381        """
382        if xhr:
383            headers = self._add_xhr_header(headers)
384        return self._gen_request('PUT', url, params=params, headers=headers,
385                                 extra_environ=extra_environ, status=status,
386                                 upload_files=upload_files,
387                                 expect_errors=expect_errors,
388                                 content_type=content_type,
389                                 )
390
391    def patch(self, url, params='', headers=None, extra_environ=None,
392              status=None, upload_files=None, expect_errors=False,
393              content_type=None, xhr=False):
394        """
395        Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`.
396
397        :returns: :class:`webtest.TestResponse` instance.
398
399        """
400        if xhr:
401            headers = self._add_xhr_header(headers)
402        return self._gen_request('PATCH', url, params=params, headers=headers,
403                                 extra_environ=extra_environ, status=status,
404                                 upload_files=upload_files,
405                                 expect_errors=expect_errors,
406                                 content_type=content_type)
407
408    def delete(self, url, params='', headers=None,
409               extra_environ=None, status=None, expect_errors=False,
410               content_type=None, xhr=False):
411        """
412        Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`.
413
414        :returns: :class:`webtest.TestResponse` instance.
415
416        """
417        if xhr:
418            headers = self._add_xhr_header(headers)
419        return self._gen_request('DELETE', url, params=params, headers=headers,
420                                 extra_environ=extra_environ, status=status,
421                                 upload_files=None,
422                                 expect_errors=expect_errors,
423                                 content_type=content_type)
424
425    def options(self, url, headers=None, extra_environ=None,
426                status=None, expect_errors=False, xhr=False):
427        """
428        Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`.
429
430        :returns: :class:`webtest.TestResponse` instance.
431
432        """
433        if xhr:
434            headers = self._add_xhr_header(headers)
435        return self._gen_request('OPTIONS', url, headers=headers,
436                                 extra_environ=extra_environ, status=status,
437                                 upload_files=None,
438                                 expect_errors=expect_errors)
439
440    def head(self, url, headers=None, extra_environ=None,
441             status=None, expect_errors=False, xhr=False):
442        """
443        Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`.
444
445        :returns: :class:`webtest.TestResponse` instance.
446
447        """
448        if xhr:
449            headers = self._add_xhr_header(headers)
450        return self._gen_request('HEAD', url, headers=headers,
451                                 extra_environ=extra_environ, status=status,
452                                 upload_files=None,
453                                 expect_errors=expect_errors)
454
455    post_json = utils.json_method('POST')
456    put_json = utils.json_method('PUT')
457    patch_json = utils.json_method('PATCH')
458    delete_json = utils.json_method('DELETE')
459
460    def encode_multipart(self, params, files):
461        """
462        Encodes a set of parameters (typically a name/value list) and
463        a set of files (a list of (name, filename, file_body, mimetype)) into a
464        typical POST body, returning the (content_type, body).
465
466        """
467        boundary = to_bytes(str(random.random()))[2:]
468        boundary = b'----------a_BoUnDaRy' + boundary + b'$'
469        lines = []
470
471        def _append_file(file_info):
472            key, filename, value, fcontent = self._get_file_info(file_info)
473            if isinstance(key, text_type):
474                try:
475                    key = key.encode('ascii')
476                except:  # pragma: no cover
477                    raise  # file name must be ascii
478            if isinstance(filename, text_type):
479                try:
480                    filename = filename.encode('utf8')
481                except:  # pragma: no cover
482                    raise  # file name must be ascii or utf8
483            if not fcontent:
484                fcontent = mimetypes.guess_type(filename.decode('utf8'))[0]
485            fcontent = to_bytes(fcontent)
486            fcontent = fcontent or b'application/octet-stream'
487            lines.extend([
488                b'--' + boundary,
489                b'Content-Disposition: form-data; ' +
490                b'name="' + key + b'"; filename="' + filename + b'"',
491                b'Content-Type: ' + fcontent, b'', value])
492
493        for key, value in params:
494            if isinstance(key, text_type):
495                try:
496                    key = key.encode('ascii')
497                except:  # pragma: no cover
498                    raise  # field name are always ascii
499            if isinstance(value, forms.File):
500                if value.value:
501                    _append_file([key] + list(value.value))
502            elif isinstance(value, forms.Upload):
503                file_info = [key, value.filename]
504                if value.content is not None:
505                    file_info.append(value.content)
506                    if value.content_type is not None:
507                        file_info.append(value.content_type)
508                _append_file(file_info)
509            else:
510                if isinstance(value, text_type):
511                    value = value.encode('utf8')
512                lines.extend([
513                    b'--' + boundary,
514                    b'Content-Disposition: form-data; name="' + key + b'"',
515                    b'', value])
516
517        for file_info in files:
518            _append_file(file_info)
519
520        lines.extend([b'--' + boundary + b'--', b''])
521        body = b'\r\n'.join(lines)
522        boundary = boundary.decode('ascii')
523        content_type = 'multipart/form-data; boundary=%s' % boundary
524        return content_type, body
525
526    def request(self, url_or_req, status=None, expect_errors=False,
527                **req_params):
528        """
529        Creates and executes a request. You may either pass in an
530        instantiated :class:`TestRequest` object, or you may pass in a
531        URL and keyword arguments to be passed to
532        :meth:`TestRequest.blank`.
533
534        You can use this to run a request without the intermediary
535        functioning of :meth:`TestApp.get` etc.  For instance, to
536        test a WebDAV method::
537
538            resp = app.request('/new-col', method='MKCOL')
539
540        Note that the request won't have a body unless you specify it,
541        like::
542
543            resp = app.request('/test.txt', method='PUT', body='test')
544
545        You can use :class:`webtest.TestRequest`::
546
547            req = webtest.TestRequest.blank('/url/', method='GET')
548            resp = app.do_request(req)
549
550        """
551        if isinstance(url_or_req, text_type):
552            url_or_req = str(url_or_req)
553        for (k, v) in req_params.items():
554            if isinstance(v, text_type):
555                req_params[k] = str(v)
556        if isinstance(url_or_req, string_types):
557            req = self.RequestClass.blank(url_or_req, **req_params)
558        else:
559            req = url_or_req.copy()
560            for name, value in req_params.items():
561                setattr(req, name, value)
562        req.environ['paste.throw_errors'] = True
563        for name, value in self.extra_environ.items():
564            req.environ.setdefault(name, value)
565        return self.do_request(req,
566                               status=status,
567                               expect_errors=expect_errors,
568                               )
569
570    def do_request(self, req, status=None, expect_errors=None):
571        """
572        Executes the given webob Request (``req``), with the expected
573        ``status``.  Generally :meth:`~webtest.TestApp.get` and
574        :meth:`~webtest.TestApp.post` are used instead.
575
576        To use this::
577
578            req = webtest.TestRequest.blank('url', ...args...)
579            resp = app.do_request(req)
580
581        .. note::
582
583            You can pass any keyword arguments to
584            ``TestRequest.blank()``, which will be set on the request.
585            These can be arguments like ``content_type``, ``accept``, etc.
586
587        """
588
589        errors = StringIO()
590        req.environ['wsgi.errors'] = errors
591        script_name = req.environ.get('SCRIPT_NAME', '')
592        if script_name and req.path_info.startswith(script_name):
593            req.path_info = req.path_info[len(script_name):]
594
595        # set framework hooks
596        req.environ['paste.testing'] = True
597        req.environ['paste.testing_variables'] = {}
598
599        # set request cookies
600        self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req))
601
602        # verify wsgi compatibility
603        app = lint.middleware(self.app) if self.lint else self.app
604
605        ## FIXME: should it be an option to not catch exc_info?
606        res = req.get_response(app, catch_exc_info=True)
607
608        # be sure to decode the content
609        res.decode_content()
610
611        # set a few handy attributes
612        res._use_unicode = self.use_unicode
613        res.request = req
614        res.app = app
615        res.test_app = self
616
617        # We do this to make sure the app_iter is exausted:
618        try:
619            res.body
620        except TypeError:  # pragma: no cover
621            pass
622        res.errors = errors.getvalue()
623
624        for name, value in req.environ['paste.testing_variables'].items():
625            if hasattr(res, name):
626                raise ValueError(
627                    "paste.testing_variables contains the variable %r, but "
628                    "the response object already has an attribute by that "
629                    "name" % name)
630            setattr(res, name, value)
631        if not expect_errors:
632            self._check_status(status, res)
633            self._check_errors(res)
634
635        # merge cookies back in
636        self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res),
637                                       utils._RequestCookieAdapter(req))
638
639        return res
640
641    def _check_status(self, status, res):
642        if status == '*':
643            return
644        res_status = res.status
645        if (isinstance(status, string_types) and '*' in status):
646            if re.match(fnmatch.translate(status), res_status, re.I):
647                return
648        if isinstance(status, string_types):
649            if status == res_status:
650                return
651        if isinstance(status, (list, tuple)):
652            if res.status_int not in status:
653                raise AppError(
654                    "Bad response: %s (not one of %s for %s)\n%s",
655                    res_status, ', '.join(map(str, status)),
656                    res.request.url, res)
657            return
658        if status is None:
659            if res.status_int >= 200 and res.status_int < 400:
660                return
661            raise AppError(
662                "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s",
663                res_status, res.request.url,
664                res)
665        if status != res.status_int:
666            raise AppError(
667                "Bad response: %s (not %s)", res_status, status)
668
669    def _check_errors(self, res):
670        errors = res.errors
671        if errors:
672            raise AppError(
673                "Application had errors logged:\n%s", errors)
674
675    def _make_environ(self, extra_environ=None):
676        environ = self.extra_environ.copy()
677        environ['paste.throw_errors'] = True
678        if extra_environ:
679            environ.update(extra_environ)
680        return environ
681
682    def _remove_fragment(self, url):
683        scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
684        return urlparse.urlunsplit((scheme, netloc, path, query, ""))
685
686    def _gen_request(self, method, url, params=utils.NoDefault,
687                     headers=None, extra_environ=None, status=None,
688                     upload_files=None, expect_errors=False,
689                     content_type=None):
690        """
691        Do a generic request.
692        """
693
694        environ = self._make_environ(extra_environ)
695
696        inline_uploads = []
697
698        # this supports OrderedDict
699        if isinstance(params, dict) or hasattr(params, 'items'):
700            params = list(params.items())
701
702        if isinstance(params, (list, tuple)):
703            inline_uploads = [v for (k, v) in params
704                              if isinstance(v, (forms.File, forms.Upload))]
705
706        if len(inline_uploads) > 0:
707            content_type, params = self.encode_multipart(
708                params, upload_files or ())
709            environ['CONTENT_TYPE'] = content_type
710        else:
711            params = utils.encode_params(params, content_type)
712            if upload_files or \
713                (content_type and
714                 to_bytes(content_type).startswith(b'multipart')):
715                params = urlparse.parse_qsl(params, keep_blank_values=True)
716                content_type, params = self.encode_multipart(
717                    params, upload_files or ())
718                environ['CONTENT_TYPE'] = content_type
719            elif params:
720                environ.setdefault('CONTENT_TYPE',
721                                   str('application/x-www-form-urlencoded'))
722
723        if content_type is not None:
724            environ['CONTENT_TYPE'] = content_type
725        environ['REQUEST_METHOD'] = str(method)
726        url = str(url)
727        url = self._remove_fragment(url)
728        req = self.RequestClass.blank(url, environ)
729        if isinstance(params, text_type):
730            params = params.encode(req.charset or 'utf8')
731        req.environ['wsgi.input'] = BytesIO(params)
732        req.content_length = len(params)
733        if headers:
734            req.headers.update(headers)
735        return self.do_request(req, status=status,
736                               expect_errors=expect_errors)
737
738    def _get_file_info(self, file_info):
739        if len(file_info) == 2:
740            # It only has a filename
741            filename = file_info[1]
742            if self.relative_to:
743                filename = os.path.join(self.relative_to, filename)
744            f = open(filename, 'rb')
745            content = f.read()
746            f.close()
747            return (file_info[0], filename, content, None)
748        elif 3 <= len(file_info) <= 4:
749            content = file_info[2]
750            if not isinstance(content, binary_type):
751                raise ValueError('File content must be %s not %s'
752                                 % (binary_type, type(content)))
753            if len(file_info) == 3:
754                return tuple(file_info) + (None,)
755            else:
756                return file_info
757        else:
758            raise ValueError(
759                "upload_files need to be a list of tuples of (fieldname, "
760                "filename, filecontent, mimetype) or (fieldname, "
761                "filename, filecontent) or (fieldname, filename); "
762                "you gave: %r"
763                % repr(file_info)[:100])
764
765    @staticmethod
766    def _add_xhr_header(headers):
767        headers = headers or {}
768        # if remove str we will be have an error in lint.middleware
769        headers.update({'X-REQUESTED-WITH': str('XMLHttpRequest')})
770        return headers
771