1# (c) 2005 Ben Bangert 2# This module is part of the Python Paste Project and is released under 3# the MIT License: http://www.opensource.org/licenses/mit-license.php 4from nose.tools import assert_raises 5 6from paste.fixture import * 7from paste.registry import * 8from paste.registry import Registry 9from paste.evalexception.middleware import EvalException 10 11regobj = StackedObjectProxy() 12secondobj = StackedObjectProxy(default=dict(hi='people')) 13 14def simpleapp(environ, start_response): 15 status = '200 OK' 16 response_headers = [('Content-type','text/plain')] 17 start_response(status, response_headers) 18 return [b'Hello world!\n'] 19 20def simpleapp_withregistry(environ, start_response): 21 status = '200 OK' 22 response_headers = [('Content-type','text/plain')] 23 start_response(status, response_headers) 24 body = 'Hello world!Value is %s\n' % regobj.keys() 25 if six.PY3: 26 body = body.encode('utf8') 27 return [body] 28 29def simpleapp_withregistry_default(environ, start_response): 30 status = '200 OK' 31 response_headers = [('Content-type','text/plain')] 32 start_response(status, response_headers) 33 body = 'Hello world!Value is %s\n' % secondobj 34 if six.PY3: 35 body = body.encode('utf8') 36 return [body] 37 38 39class RegistryUsingApp(object): 40 def __init__(self, var, value, raise_exc=False): 41 self.var = var 42 self.value = value 43 self.raise_exc = raise_exc 44 45 def __call__(self, environ, start_response): 46 if 'paste.registry' in environ: 47 environ['paste.registry'].register(self.var, self.value) 48 if self.raise_exc: 49 raise self.raise_exc 50 status = '200 OK' 51 response_headers = [('Content-type','text/plain')] 52 start_response(status, response_headers) 53 body = 'Hello world!\nThe variable is %s' % str(regobj) 54 if six.PY3: 55 body = body.encode('utf8') 56 return [body] 57 58class RegistryUsingIteratorApp(object): 59 def __init__(self, var, value): 60 self.var = var 61 self.value = value 62 63 def __call__(self, environ, start_response): 64 if 'paste.registry' in environ: 65 environ['paste.registry'].register(self.var, self.value) 66 status = '200 OK' 67 response_headers = [('Content-type','text/plain')] 68 start_response(status, response_headers) 69 body = 'Hello world!\nThe variable is %s' % str(regobj) 70 if six.PY3: 71 body = body.encode('utf8') 72 return iter([body]) 73 74class RegistryMiddleMan(object): 75 def __init__(self, app, var, value, depth): 76 self.app = app 77 self.var = var 78 self.value = value 79 self.depth = depth 80 81 def __call__(self, environ, start_response): 82 if 'paste.registry' in environ: 83 environ['paste.registry'].register(self.var, self.value) 84 line = ('\nInserted by middleware!\nInsertValue at depth %s is %s' 85 % (self.depth, str(regobj))) 86 if six.PY3: 87 line = line.encode('utf8') 88 app_response = [line] 89 app_iter = None 90 app_iter = self.app(environ, start_response) 91 if type(app_iter) in (list, tuple): 92 app_response.extend(app_iter) 93 else: 94 response = [] 95 for line in app_iter: 96 response.append(line) 97 if hasattr(app_iter, 'close'): 98 app_iter.close() 99 app_response.extend(response) 100 line = ('\nAppended by middleware!\nAppendValue at \ 101 depth %s is %s' % (self.depth, str(regobj))) 102 if six.PY3: 103 line = line.encode('utf8') 104 app_response.append(line) 105 return app_response 106 107 108def test_simple(): 109 app = TestApp(simpleapp) 110 response = app.get('/') 111 assert 'Hello world' in response 112 113def test_solo_registry(): 114 obj = {'hi':'people'} 115 wsgiapp = RegistryUsingApp(regobj, obj) 116 wsgiapp = RegistryManager(wsgiapp) 117 app = TestApp(wsgiapp) 118 res = app.get('/') 119 assert 'Hello world' in res 120 assert 'The variable is' in res 121 assert "{'hi': 'people'}" in res 122 123def test_registry_no_object_error(): 124 app = TestApp(simpleapp_withregistry) 125 assert_raises(TypeError, app.get, '/') 126 127def test_with_default_object(): 128 app = TestApp(simpleapp_withregistry_default) 129 res = app.get('/') 130 print(res) 131 assert 'Hello world' in res 132 assert "Value is {'hi': 'people'}" in res 133 134def test_double_registry(): 135 obj = {'hi':'people'} 136 secondobj = {'bye':'friends'} 137 wsgiapp = RegistryUsingApp(regobj, obj) 138 wsgiapp = RegistryManager(wsgiapp) 139 wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0) 140 wsgiapp = RegistryManager(wsgiapp) 141 app = TestApp(wsgiapp) 142 res = app.get('/') 143 assert 'Hello world' in res 144 assert 'The variable is' in res 145 assert "{'hi': 'people'}" in res 146 assert "InsertValue at depth 0 is {'bye': 'friends'}" in res 147 assert "AppendValue at depth 0 is {'bye': 'friends'}" in res 148 149def test_really_deep_registry(): 150 keylist = ['fred', 'wilma', 'barney', 'homer', 'marge', 'bart', 'lisa', 151 'maggie'] 152 valuelist = range(0, len(keylist)) 153 obj = {'hi':'people'} 154 wsgiapp = RegistryUsingApp(regobj, obj) 155 wsgiapp = RegistryManager(wsgiapp) 156 for depth in valuelist: 157 newobj = {keylist[depth]: depth} 158 wsgiapp = RegistryMiddleMan(wsgiapp, regobj, newobj, depth) 159 wsgiapp = RegistryManager(wsgiapp) 160 app = TestApp(wsgiapp) 161 res = app.get('/') 162 assert 'Hello world' in res 163 assert 'The variable is' in res 164 assert "{'hi': 'people'}" in res 165 for depth in valuelist: 166 assert "InsertValue at depth %s is {'%s': %s}" % \ 167 (depth, keylist[depth], depth) in res 168 for depth in valuelist: 169 assert "AppendValue at depth %s is {'%s': %s}" % \ 170 (depth, keylist[depth], depth) in res 171 172def test_iterating_response(): 173 obj = {'hi':'people'} 174 secondobj = {'bye':'friends'} 175 wsgiapp = RegistryUsingIteratorApp(regobj, obj) 176 wsgiapp = RegistryManager(wsgiapp) 177 wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0) 178 wsgiapp = RegistryManager(wsgiapp) 179 app = TestApp(wsgiapp) 180 res = app.get('/') 181 assert 'Hello world' in res 182 assert 'The variable is' in res 183 assert "{'hi': 'people'}" in res 184 assert "InsertValue at depth 0 is {'bye': 'friends'}" in res 185 assert "AppendValue at depth 0 is {'bye': 'friends'}" in res 186 187def _test_restorer(stack, data): 188 # We need to test the request's specific Registry. Initialize it here so we 189 # can use it later (RegistryManager will re-use one preexisting in the 190 # environ) 191 registry = Registry() 192 extra_environ={'paste.throw_errors': False, 193 'paste.registry': registry} 194 request_id = restorer.get_request_id(extra_environ) 195 app = TestApp(stack) 196 res = app.get('/', extra_environ=extra_environ, expect_errors=True) 197 198 # Ensure all the StackedObjectProxies are empty after the RegistryUsingApp 199 # raises an Exception 200 for stacked, proxied_obj, test_cleanup in data: 201 only_key = list(proxied_obj.keys())[0] 202 try: 203 assert only_key not in stacked 204 assert False 205 except TypeError: 206 # Definitely empty 207 pass 208 209 # Ensure the StackedObjectProxies & Registry 'work' in the simulated 210 # EvalException context 211 replace = {'replace': 'dict'} 212 new = {'new': 'object'} 213 restorer.restoration_begin(request_id) 214 try: 215 for stacked, proxied_obj, test_cleanup in data: 216 # Ensure our original data magically re-appears in this context 217 only_key, only_val = list(proxied_obj.items())[0] 218 assert only_key in stacked and stacked[only_key] == only_val 219 220 # Ensure the Registry still works 221 registry.prepare() 222 registry.register(stacked, new) 223 assert 'new' in stacked and stacked['new'] == 'object' 224 registry.cleanup() 225 226 # Back to the original (pre-prepare()) 227 assert only_key in stacked and stacked[only_key] == only_val 228 229 registry.replace(stacked, replace) 230 assert 'replace' in stacked and stacked['replace'] == 'dict' 231 232 if test_cleanup: 233 registry.cleanup() 234 try: 235 stacked._current_obj() 236 assert False 237 except TypeError: 238 # Definitely empty 239 pass 240 finally: 241 restorer.restoration_end() 242 243def _restorer_data(): 244 S = StackedObjectProxy 245 d = [[S(name='first'), dict(top='of the registry stack'), False], 246 [S(name='second'), dict(middle='of the stack'), False], 247 [S(name='third'), dict(bottom='of the STACK.'), False]] 248 return d 249 250def _set_cleanup_test(data): 251 """Instruct _test_restorer to check registry cleanup at this level of the stack 252 """ 253 data[2] = True 254 255def test_restorer_basic(): 256 data = _restorer_data()[0] 257 wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception()) 258 wsgiapp = RegistryManager(wsgiapp) 259 _set_cleanup_test(data) 260 wsgiapp = EvalException(wsgiapp) 261 _test_restorer(wsgiapp, [data]) 262 263def test_restorer_basic_manager_outside(): 264 data = _restorer_data()[0] 265 wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception()) 266 wsgiapp = EvalException(wsgiapp) 267 wsgiapp = RegistryManager(wsgiapp) 268 _set_cleanup_test(data) 269 _test_restorer(wsgiapp, [data]) 270 271def test_restorer_middleman_nested_evalexception(): 272 data = _restorer_data()[:2] 273 wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception()) 274 wsgiapp = EvalException(wsgiapp) 275 wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0) 276 wsgiapp = RegistryManager(wsgiapp) 277 _set_cleanup_test(data[1]) 278 _test_restorer(wsgiapp, data) 279 280def test_restorer_nested_middleman(): 281 data = _restorer_data()[:2] 282 wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception()) 283 wsgiapp = RegistryManager(wsgiapp) 284 _set_cleanup_test(data[0]) 285 wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0) 286 wsgiapp = EvalException(wsgiapp) 287 wsgiapp = RegistryManager(wsgiapp) 288 _set_cleanup_test(data[1]) 289 _test_restorer(wsgiapp, data) 290 291def test_restorer_middlemen_nested_evalexception(): 292 data = _restorer_data() 293 wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception()) 294 wsgiapp = RegistryManager(wsgiapp) 295 _set_cleanup_test(data[0]) 296 wsgiapp = EvalException(wsgiapp) 297 wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0) 298 wsgiapp = RegistryManager(wsgiapp) 299 _set_cleanup_test(data[1]) 300 wsgiapp = RegistryMiddleMan(wsgiapp, data[2][0], data[2][1], 1) 301 wsgiapp = RegistryManager(wsgiapp) 302 _set_cleanup_test(data[2]) 303 _test_restorer(wsgiapp, data) 304 305def test_restorer_disabled(): 306 # Ensure restoration_begin/end work safely when there's no Registry 307 wsgiapp = TestApp(simpleapp) 308 wsgiapp.get('/') 309 try: 310 restorer.restoration_begin(1) 311 finally: 312 restorer.restoration_end() 313 # A second call should do nothing 314 restorer.restoration_end() 315