• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# (c) 2005 Ben Bangert
2# This module is part of the Python Paste Project and is released under
3# the MIT License: http://www.opensource.org/licenses/mit-license.php
4from nose.tools import assert_raises
5
6from paste.fixture import *
7from paste.registry import *
8from paste.registry import Registry
9from paste.evalexception.middleware import EvalException
10
11regobj = StackedObjectProxy()
12secondobj = StackedObjectProxy(default=dict(hi='people'))
13
14def simpleapp(environ, start_response):
15    status = '200 OK'
16    response_headers = [('Content-type','text/plain')]
17    start_response(status, response_headers)
18    return [b'Hello world!\n']
19
20def simpleapp_withregistry(environ, start_response):
21    status = '200 OK'
22    response_headers = [('Content-type','text/plain')]
23    start_response(status, response_headers)
24    body = 'Hello world!Value is %s\n' % regobj.keys()
25    if six.PY3:
26        body = body.encode('utf8')
27    return [body]
28
29def simpleapp_withregistry_default(environ, start_response):
30    status = '200 OK'
31    response_headers = [('Content-type','text/plain')]
32    start_response(status, response_headers)
33    body = 'Hello world!Value is %s\n' % secondobj
34    if six.PY3:
35        body = body.encode('utf8')
36    return [body]
37
38
39class RegistryUsingApp(object):
40    def __init__(self, var, value, raise_exc=False):
41        self.var = var
42        self.value = value
43        self.raise_exc = raise_exc
44
45    def __call__(self, environ, start_response):
46        if 'paste.registry' in environ:
47            environ['paste.registry'].register(self.var, self.value)
48        if self.raise_exc:
49            raise self.raise_exc
50        status = '200 OK'
51        response_headers = [('Content-type','text/plain')]
52        start_response(status, response_headers)
53        body = 'Hello world!\nThe variable is %s' % str(regobj)
54        if six.PY3:
55            body = body.encode('utf8')
56        return [body]
57
58class RegistryUsingIteratorApp(object):
59    def __init__(self, var, value):
60        self.var = var
61        self.value = value
62
63    def __call__(self, environ, start_response):
64        if 'paste.registry' in environ:
65            environ['paste.registry'].register(self.var, self.value)
66        status = '200 OK'
67        response_headers = [('Content-type','text/plain')]
68        start_response(status, response_headers)
69        body = 'Hello world!\nThe variable is %s' % str(regobj)
70        if six.PY3:
71            body = body.encode('utf8')
72        return iter([body])
73
74class RegistryMiddleMan(object):
75    def __init__(self, app, var, value, depth):
76        self.app = app
77        self.var = var
78        self.value = value
79        self.depth = depth
80
81    def __call__(self, environ, start_response):
82        if 'paste.registry' in environ:
83            environ['paste.registry'].register(self.var, self.value)
84        line = ('\nInserted by middleware!\nInsertValue at depth %s is %s'
85                % (self.depth, str(regobj)))
86        if six.PY3:
87            line = line.encode('utf8')
88        app_response = [line]
89        app_iter = None
90        app_iter = self.app(environ, start_response)
91        if type(app_iter) in (list, tuple):
92            app_response.extend(app_iter)
93        else:
94            response = []
95            for line in app_iter:
96                response.append(line)
97            if hasattr(app_iter, 'close'):
98                app_iter.close()
99            app_response.extend(response)
100        line = ('\nAppended by middleware!\nAppendValue at \
101                depth %s is %s' % (self.depth, str(regobj)))
102        if six.PY3:
103            line = line.encode('utf8')
104        app_response.append(line)
105        return app_response
106
107
108def test_simple():
109    app = TestApp(simpleapp)
110    response = app.get('/')
111    assert 'Hello world' in response
112
113def test_solo_registry():
114    obj = {'hi':'people'}
115    wsgiapp = RegistryUsingApp(regobj, obj)
116    wsgiapp = RegistryManager(wsgiapp)
117    app = TestApp(wsgiapp)
118    res = app.get('/')
119    assert 'Hello world' in res
120    assert 'The variable is' in res
121    assert "{'hi': 'people'}" in res
122
123def test_registry_no_object_error():
124    app = TestApp(simpleapp_withregistry)
125    assert_raises(TypeError, app.get, '/')
126
127def test_with_default_object():
128    app = TestApp(simpleapp_withregistry_default)
129    res = app.get('/')
130    print(res)
131    assert 'Hello world' in res
132    assert "Value is {'hi': 'people'}" in res
133
134def test_double_registry():
135    obj = {'hi':'people'}
136    secondobj = {'bye':'friends'}
137    wsgiapp = RegistryUsingApp(regobj, obj)
138    wsgiapp = RegistryManager(wsgiapp)
139    wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
140    wsgiapp = RegistryManager(wsgiapp)
141    app = TestApp(wsgiapp)
142    res = app.get('/')
143    assert 'Hello world' in res
144    assert 'The variable is' in res
145    assert "{'hi': 'people'}" in res
146    assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
147    assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
148
149def test_really_deep_registry():
150    keylist = ['fred', 'wilma', 'barney', 'homer', 'marge', 'bart', 'lisa',
151        'maggie']
152    valuelist = range(0, len(keylist))
153    obj = {'hi':'people'}
154    wsgiapp = RegistryUsingApp(regobj, obj)
155    wsgiapp = RegistryManager(wsgiapp)
156    for depth in valuelist:
157        newobj = {keylist[depth]: depth}
158        wsgiapp = RegistryMiddleMan(wsgiapp, regobj, newobj, depth)
159        wsgiapp = RegistryManager(wsgiapp)
160    app = TestApp(wsgiapp)
161    res = app.get('/')
162    assert 'Hello world' in res
163    assert 'The variable is' in res
164    assert "{'hi': 'people'}" in res
165    for depth in valuelist:
166        assert "InsertValue at depth %s is {'%s': %s}" % \
167            (depth, keylist[depth], depth) in res
168    for depth in valuelist:
169        assert "AppendValue at depth %s is {'%s': %s}" % \
170            (depth, keylist[depth], depth) in res
171
172def test_iterating_response():
173    obj = {'hi':'people'}
174    secondobj = {'bye':'friends'}
175    wsgiapp = RegistryUsingIteratorApp(regobj, obj)
176    wsgiapp = RegistryManager(wsgiapp)
177    wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
178    wsgiapp = RegistryManager(wsgiapp)
179    app = TestApp(wsgiapp)
180    res = app.get('/')
181    assert 'Hello world' in res
182    assert 'The variable is' in res
183    assert "{'hi': 'people'}" in res
184    assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
185    assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
186
187def _test_restorer(stack, data):
188    # We need to test the request's specific Registry. Initialize it here so we
189    # can use it later (RegistryManager will re-use one preexisting in the
190    # environ)
191    registry = Registry()
192    extra_environ={'paste.throw_errors': False,
193                   'paste.registry': registry}
194    request_id = restorer.get_request_id(extra_environ)
195    app = TestApp(stack)
196    res = app.get('/', extra_environ=extra_environ, expect_errors=True)
197
198    # Ensure all the StackedObjectProxies are empty after the RegistryUsingApp
199    # raises an Exception
200    for stacked, proxied_obj, test_cleanup in data:
201        only_key = list(proxied_obj.keys())[0]
202        try:
203            assert only_key not in stacked
204            assert False
205        except TypeError:
206            # Definitely empty
207            pass
208
209    # Ensure the StackedObjectProxies & Registry 'work' in the simulated
210    # EvalException context
211    replace = {'replace': 'dict'}
212    new = {'new': 'object'}
213    restorer.restoration_begin(request_id)
214    try:
215        for stacked, proxied_obj, test_cleanup in data:
216            # Ensure our original data magically re-appears in this context
217            only_key, only_val = list(proxied_obj.items())[0]
218            assert only_key in stacked and stacked[only_key] == only_val
219
220            # Ensure the Registry still works
221            registry.prepare()
222            registry.register(stacked, new)
223            assert 'new' in stacked and stacked['new'] == 'object'
224            registry.cleanup()
225
226            # Back to the original (pre-prepare())
227            assert only_key in stacked and stacked[only_key] == only_val
228
229            registry.replace(stacked, replace)
230            assert 'replace' in stacked and stacked['replace'] == 'dict'
231
232            if test_cleanup:
233                registry.cleanup()
234                try:
235                    stacked._current_obj()
236                    assert False
237                except TypeError:
238                    # Definitely empty
239                    pass
240    finally:
241        restorer.restoration_end()
242
243def _restorer_data():
244    S = StackedObjectProxy
245    d = [[S(name='first'), dict(top='of the registry stack'), False],
246         [S(name='second'), dict(middle='of the stack'), False],
247         [S(name='third'), dict(bottom='of the STACK.'), False]]
248    return d
249
250def _set_cleanup_test(data):
251    """Instruct _test_restorer to check registry cleanup at this level of the stack
252    """
253    data[2] = True
254
255def test_restorer_basic():
256    data = _restorer_data()[0]
257    wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
258    wsgiapp = RegistryManager(wsgiapp)
259    _set_cleanup_test(data)
260    wsgiapp = EvalException(wsgiapp)
261    _test_restorer(wsgiapp, [data])
262
263def test_restorer_basic_manager_outside():
264    data = _restorer_data()[0]
265    wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
266    wsgiapp = EvalException(wsgiapp)
267    wsgiapp = RegistryManager(wsgiapp)
268    _set_cleanup_test(data)
269    _test_restorer(wsgiapp, [data])
270
271def test_restorer_middleman_nested_evalexception():
272    data = _restorer_data()[:2]
273    wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
274    wsgiapp = EvalException(wsgiapp)
275    wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
276    wsgiapp = RegistryManager(wsgiapp)
277    _set_cleanup_test(data[1])
278    _test_restorer(wsgiapp, data)
279
280def test_restorer_nested_middleman():
281    data = _restorer_data()[:2]
282    wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
283    wsgiapp = RegistryManager(wsgiapp)
284    _set_cleanup_test(data[0])
285    wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
286    wsgiapp = EvalException(wsgiapp)
287    wsgiapp = RegistryManager(wsgiapp)
288    _set_cleanup_test(data[1])
289    _test_restorer(wsgiapp, data)
290
291def test_restorer_middlemen_nested_evalexception():
292    data = _restorer_data()
293    wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
294    wsgiapp = RegistryManager(wsgiapp)
295    _set_cleanup_test(data[0])
296    wsgiapp = EvalException(wsgiapp)
297    wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
298    wsgiapp = RegistryManager(wsgiapp)
299    _set_cleanup_test(data[1])
300    wsgiapp = RegistryMiddleMan(wsgiapp, data[2][0], data[2][1], 1)
301    wsgiapp = RegistryManager(wsgiapp)
302    _set_cleanup_test(data[2])
303    _test_restorer(wsgiapp, data)
304
305def test_restorer_disabled():
306    # Ensure restoration_begin/end work safely when there's no Registry
307    wsgiapp = TestApp(simpleapp)
308    wsgiapp.get('/')
309    try:
310        restorer.restoration_begin(1)
311    finally:
312        restorer.restoration_end()
313        # A second call should do nothing
314        restorer.restoration_end()
315