1# Copyright 2015 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unit tests for the Flask utilities""" 16 17import datetime 18import json 19import logging 20 21import flask 22import httplib2 23import mock 24import six.moves.http_client as httplib 25import six.moves.urllib.parse as urlparse 26import unittest2 27 28import oauth2client 29from oauth2client import client 30from oauth2client import clientsecrets 31from oauth2client.contrib import flask_util 32 33 34__author__ = 'jonwayne@google.com (Jon Wayne Parrott)' 35 36 37class Http2Mock(object): 38 """Mock httplib2.Http for code exchange / refresh""" 39 40 def __init__(self, status=httplib.OK, **kwargs): 41 self.status = status 42 self.content = { 43 'access_token': 'foo_access_token', 44 'refresh_token': 'foo_refresh_token', 45 'expires_in': 3600, 46 'extra': 'value', 47 } 48 self.content.update(kwargs) 49 50 def request(self, token_uri, method, body, headers, *args, **kwargs): 51 self.body = body 52 self.headers = headers 53 return (self, json.dumps(self.content).encode('utf-8')) 54 55 def __enter__(self): 56 self.httplib2_orig = httplib2.Http 57 httplib2.Http = self 58 return self 59 60 def __exit__(self, exc_type, exc_value, traceback): 61 httplib2.Http = self.httplib2_orig 62 63 def __call__(self, *args, **kwargs): 64 return self 65 66 67class FlaskOAuth2Tests(unittest2.TestCase): 68 69 def setUp(self): 70 self.app = flask.Flask(__name__) 71 self.app.testing = True 72 self.app.config['SECRET_KEY'] = 'notasecert' 73 self.app.logger.setLevel(logging.CRITICAL) 74 self.oauth2 = flask_util.UserOAuth2( 75 self.app, 76 client_id='client_idz', 77 client_secret='client_secretz') 78 79 def _generate_credentials(self, scopes=None): 80 return client.OAuth2Credentials( 81 'access_tokenz', 82 'client_idz', 83 'client_secretz', 84 'refresh_tokenz', 85 datetime.datetime.utcnow() + datetime.timedelta(seconds=3600), 86 oauth2client.GOOGLE_TOKEN_URI, 87 'Test', 88 id_token={ 89 'sub': '123', 90 'email': 'user@example.com' 91 }, 92 scopes=scopes) 93 94 def test_explicit_configuration(self): 95 oauth2 = flask_util.UserOAuth2( 96 flask.Flask(__name__), client_id='id', client_secret='secret') 97 98 self.assertEqual(oauth2.client_id, 'id') 99 self.assertEqual(oauth2.client_secret, 'secret') 100 101 return_val = ( 102 clientsecrets.TYPE_WEB, 103 {'client_id': 'id', 'client_secret': 'secret'}) 104 105 with mock.patch('oauth2client.clientsecrets.loadfile', 106 return_value=return_val): 107 108 oauth2 = flask_util.UserOAuth2( 109 flask.Flask(__name__), client_secrets_file='file.json') 110 111 self.assertEqual(oauth2.client_id, 'id') 112 self.assertEqual(oauth2.client_secret, 'secret') 113 114 def test_delayed_configuration(self): 115 app = flask.Flask(__name__) 116 oauth2 = flask_util.UserOAuth2() 117 oauth2.init_app(app, client_id='id', client_secret='secret') 118 self.assertEqual(oauth2.app, app) 119 120 def test_explicit_storage(self): 121 storage_mock = mock.Mock() 122 oauth2 = flask_util.UserOAuth2( 123 flask.Flask(__name__), storage=storage_mock, client_id='id', 124 client_secret='secret') 125 self.assertEqual(oauth2.storage, storage_mock) 126 127 def test_explicit_scopes(self): 128 oauth2 = flask_util.UserOAuth2( 129 flask.Flask(__name__), scopes=['1', '2'], client_id='id', 130 client_secret='secret') 131 self.assertEqual(oauth2.scopes, ['1', '2']) 132 133 def test_bad_client_secrets(self): 134 return_val = ( 135 'other', 136 {'client_id': 'id', 'client_secret': 'secret'}) 137 138 with mock.patch('oauth2client.clientsecrets.loadfile', 139 return_value=return_val): 140 with self.assertRaises(ValueError): 141 flask_util.UserOAuth2(flask.Flask(__name__), 142 client_secrets_file='file.json') 143 144 def test_app_configuration(self): 145 app = flask.Flask(__name__) 146 app.config['GOOGLE_OAUTH2_CLIENT_ID'] = 'id' 147 app.config['GOOGLE_OAUTH2_CLIENT_SECRET'] = 'secret' 148 149 oauth2 = flask_util.UserOAuth2(app) 150 151 self.assertEqual(oauth2.client_id, 'id') 152 self.assertEqual(oauth2.client_secret, 'secret') 153 154 return_val = ( 155 clientsecrets.TYPE_WEB, 156 {'client_id': 'id2', 'client_secret': 'secret2'}) 157 158 with mock.patch('oauth2client.clientsecrets.loadfile', 159 return_value=return_val): 160 161 app = flask.Flask(__name__) 162 app.config['GOOGLE_OAUTH2_CLIENT_SECRETS_FILE'] = 'file.json' 163 oauth2 = flask_util.UserOAuth2(app) 164 165 self.assertEqual(oauth2.client_id, 'id2') 166 self.assertEqual(oauth2.client_secret, 'secret2') 167 168 def test_no_configuration(self): 169 with self.assertRaises(ValueError): 170 flask_util.UserOAuth2(flask.Flask(__name__)) 171 172 def test_create_flow(self): 173 with self.app.test_request_context(): 174 flow = self.oauth2._make_flow() 175 state = json.loads(flow.params['state']) 176 self.assertIn('google_oauth2_csrf_token', flask.session) 177 self.assertEqual( 178 flask.session['google_oauth2_csrf_token'], state['csrf_token']) 179 self.assertEqual(flow.client_id, self.oauth2.client_id) 180 self.assertEqual(flow.client_secret, self.oauth2.client_secret) 181 self.assertIn('http', flow.redirect_uri) 182 self.assertIn('oauth2callback', flow.redirect_uri) 183 184 flow = self.oauth2._make_flow(return_url='/return_url') 185 state = json.loads(flow.params['state']) 186 self.assertEqual(state['return_url'], '/return_url') 187 188 flow = self.oauth2._make_flow(extra_arg='test') 189 self.assertEqual(flow.params['extra_arg'], 'test') 190 191 # Test extra args specified in the constructor. 192 app = flask.Flask(__name__) 193 app.config['SECRET_KEY'] = 'notasecert' 194 oauth2 = flask_util.UserOAuth2( 195 app, client_id='client_id', client_secret='secret', 196 extra_arg='test') 197 198 with app.test_request_context(): 199 flow = oauth2._make_flow() 200 self.assertEqual(flow.params['extra_arg'], 'test') 201 202 def test_authorize_view(self): 203 with self.app.test_client() as client: 204 response = client.get('/oauth2authorize') 205 location = response.headers['Location'] 206 q = urlparse.parse_qs(location.split('?', 1)[1]) 207 state = json.loads(q['state'][0]) 208 209 self.assertIn(oauth2client.GOOGLE_AUTH_URI, location) 210 self.assertNotIn(self.oauth2.client_secret, location) 211 self.assertIn(self.oauth2.client_id, q['client_id']) 212 self.assertEqual( 213 flask.session['google_oauth2_csrf_token'], state['csrf_token']) 214 self.assertEqual(state['return_url'], '/') 215 216 with self.app.test_client() as client: 217 response = client.get('/oauth2authorize?return_url=/test') 218 location = response.headers['Location'] 219 q = urlparse.parse_qs(location.split('?', 1)[1]) 220 state = json.loads(q['state'][0]) 221 self.assertEqual(state['return_url'], '/test') 222 223 with self.app.test_client() as client: 224 response = client.get('/oauth2authorize?extra_param=test') 225 location = response.headers['Location'] 226 self.assertIn('extra_param=test', location) 227 228 def _setup_callback_state(self, client, **kwargs): 229 with self.app.test_request_context(): 230 # Flask doesn't create a request context with a session 231 # transaction for some reason, so, set up the flow here, 232 # then apply it to the session in the transaction. 233 if not kwargs: 234 self.oauth2._make_flow(return_url='/return_url') 235 else: 236 self.oauth2._make_flow(**kwargs) 237 238 with client.session_transaction() as session: 239 session.update(flask.session) 240 csrf_token = session['google_oauth2_csrf_token'] 241 flow = flask_util._get_flow_for_token(csrf_token) 242 state = flow.params['state'] 243 244 return state 245 246 def test_callback_view(self): 247 self.oauth2.storage = mock.Mock() 248 with self.app.test_client() as client: 249 with Http2Mock() as http: 250 state = self._setup_callback_state(client) 251 252 response = client.get( 253 '/oauth2callback?state={0}&code=codez'.format(state)) 254 255 self.assertEqual(response.status_code, httplib.FOUND) 256 self.assertIn('/return_url', response.headers['Location']) 257 self.assertIn(self.oauth2.client_secret, http.body) 258 self.assertIn('codez', http.body) 259 self.assertTrue(self.oauth2.storage.put.called) 260 261 def test_authorize_callback(self): 262 self.oauth2.authorize_callback = mock.Mock() 263 self.test_callback_view() 264 self.assertTrue(self.oauth2.authorize_callback.called) 265 266 def test_callback_view_errors(self): 267 # Error supplied to callback 268 with self.app.test_client() as client: 269 with client.session_transaction() as session: 270 session['google_oauth2_csrf_token'] = 'tokenz' 271 272 response = client.get('/oauth2callback?state={}&error=something') 273 self.assertEqual(response.status_code, httplib.BAD_REQUEST) 274 self.assertIn('something', response.data.decode('utf-8')) 275 276 # CSRF mismatch 277 with self.app.test_client() as client: 278 with client.session_transaction() as session: 279 session['google_oauth2_csrf_token'] = 'goodstate' 280 281 state = json.dumps({ 282 'csrf_token': 'badstate', 283 'return_url': '/return_url' 284 }) 285 286 response = client.get( 287 '/oauth2callback?state={0}&code=codez'.format(state)) 288 self.assertEqual(response.status_code, httplib.BAD_REQUEST) 289 290 # KeyError, no CSRF state. 291 with self.app.test_client() as client: 292 response = client.get('/oauth2callback?state={}&code=codez') 293 self.assertEqual(response.status_code, httplib.BAD_REQUEST) 294 295 # Code exchange error 296 with self.app.test_client() as client: 297 state = self._setup_callback_state(client) 298 299 with Http2Mock(status=httplib.INTERNAL_SERVER_ERROR): 300 response = client.get( 301 '/oauth2callback?state={0}&code=codez'.format(state)) 302 self.assertEqual(response.status_code, httplib.BAD_REQUEST) 303 304 # Invalid state json 305 with self.app.test_client() as client: 306 with client.session_transaction() as session: 307 session['google_oauth2_csrf_token'] = 'tokenz' 308 309 state = '[{' 310 response = client.get( 311 '/oauth2callback?state={0}&code=codez'.format(state)) 312 self.assertEqual(response.status_code, httplib.BAD_REQUEST) 313 314 # Missing flow. 315 with self.app.test_client() as client: 316 with client.session_transaction() as session: 317 session['google_oauth2_csrf_token'] = 'tokenz' 318 319 state = json.dumps({ 320 'csrf_token': 'tokenz', 321 'return_url': '/return_url' 322 }) 323 324 response = client.get( 325 '/oauth2callback?state={0}&code=codez'.format(state)) 326 self.assertEqual(response.status_code, httplib.BAD_REQUEST) 327 328 def test_no_credentials(self): 329 with self.app.test_request_context(): 330 self.assertFalse(self.oauth2.has_credentials()) 331 self.assertTrue(self.oauth2.credentials is None) 332 self.assertTrue(self.oauth2.user_id is None) 333 self.assertTrue(self.oauth2.email is None) 334 with self.assertRaises(ValueError): 335 self.oauth2.http() 336 self.assertFalse(self.oauth2.storage.get()) 337 self.oauth2.storage.delete() 338 339 def test_with_credentials(self): 340 credentials = self._generate_credentials() 341 with self.app.test_request_context(): 342 self.oauth2.storage.put(credentials) 343 self.assertEqual( 344 self.oauth2.credentials.access_token, credentials.access_token) 345 self.assertEqual( 346 self.oauth2.credentials.refresh_token, 347 credentials.refresh_token) 348 self.assertEqual(self.oauth2.user_id, '123') 349 self.assertEqual(self.oauth2.email, 'user@example.com') 350 self.assertTrue(self.oauth2.http()) 351 352 @mock.patch('oauth2client.client._UTCNOW') 353 def test_with_expired_credentials(self, utcnow): 354 utcnow.return_value = datetime.datetime(1990, 5, 29) 355 356 credentials = self._generate_credentials() 357 credentials.token_expiry = datetime.datetime(1990, 5, 28) 358 359 # Has a refresh token, so this should be fine. 360 with self.app.test_request_context(): 361 self.oauth2.storage.put(credentials) 362 self.assertTrue(self.oauth2.has_credentials()) 363 364 # Without a refresh token this should return false. 365 credentials.refresh_token = None 366 with self.app.test_request_context(): 367 self.oauth2.storage.put(credentials) 368 self.assertFalse(self.oauth2.has_credentials()) 369 370 def test_bad_id_token(self): 371 credentials = self._generate_credentials() 372 credentials.id_token = {} 373 with self.app.test_request_context(): 374 self.oauth2.storage.put(credentials) 375 self.assertTrue(self.oauth2.user_id is None) 376 self.assertTrue(self.oauth2.email is None) 377 378 def test_required(self): 379 @self.app.route('/protected') 380 @self.oauth2.required 381 def index(): 382 return 'Hello' 383 384 # No credentials, should redirect 385 with self.app.test_client() as client: 386 response = client.get('/protected') 387 self.assertEqual(response.status_code, httplib.FOUND) 388 self.assertIn('oauth2authorize', response.headers['Location']) 389 self.assertIn('protected', response.headers['Location']) 390 391 credentials = self._generate_credentials(scopes=self.oauth2.scopes) 392 393 # With credentials, should allow 394 with self.app.test_client() as client: 395 with client.session_transaction() as session: 396 session['google_oauth2_credentials'] = credentials.to_json() 397 398 response = client.get('/protected') 399 self.assertEqual(response.status_code, httplib.OK) 400 self.assertIn('Hello', response.data.decode('utf-8')) 401 402 # Expired credentials with refresh token, should allow. 403 credentials.token_expiry = datetime.datetime(1990, 5, 28) 404 with mock.patch('oauth2client.client._UTCNOW') as utcnow: 405 utcnow.return_value = datetime.datetime(1990, 5, 29) 406 407 with self.app.test_client() as client: 408 with client.session_transaction() as session: 409 session['google_oauth2_credentials'] = ( 410 credentials.to_json()) 411 412 response = client.get('/protected') 413 self.assertEqual(response.status_code, httplib.OK) 414 self.assertIn('Hello', response.data.decode('utf-8')) 415 416 # Expired credentials without a refresh token, should redirect. 417 credentials.refresh_token = None 418 with mock.patch('oauth2client.client._UTCNOW') as utcnow: 419 utcnow.return_value = datetime.datetime(1990, 5, 29) 420 421 with self.app.test_client() as client: 422 with client.session_transaction() as session: 423 session['google_oauth2_credentials'] = ( 424 credentials.to_json()) 425 426 response = client.get('/protected') 427 self.assertEqual(response.status_code, httplib.FOUND) 428 self.assertIn('oauth2authorize', response.headers['Location']) 429 self.assertIn('protected', response.headers['Location']) 430 431 def _create_incremental_auth_app(self): 432 self.app = flask.Flask(__name__) 433 self.app.testing = True 434 self.app.config['SECRET_KEY'] = 'notasecert' 435 self.oauth2 = flask_util.UserOAuth2( 436 self.app, 437 client_id='client_idz', 438 client_secret='client_secretz', 439 include_granted_scopes=True) 440 441 @self.app.route('/one') 442 @self.oauth2.required(scopes=['one']) 443 def one(): 444 return 'Hello' 445 446 @self.app.route('/two') 447 @self.oauth2.required(scopes=['two', 'three']) 448 def two(): 449 return 'Hello' 450 451 def test_incremental_auth(self): 452 self._create_incremental_auth_app() 453 454 # No credentials, should redirect 455 with self.app.test_client() as client: 456 response = client.get('/one') 457 self.assertIn('one', response.headers['Location']) 458 self.assertEqual(response.status_code, httplib.FOUND) 459 460 # Credentials for one. /one should allow, /two should redirect. 461 credentials = self._generate_credentials(scopes=['email', 'one']) 462 463 with self.app.test_client() as client: 464 with client.session_transaction() as session: 465 session['google_oauth2_credentials'] = credentials.to_json() 466 467 response = client.get('/one') 468 self.assertEqual(response.status_code, httplib.OK) 469 470 response = client.get('/two') 471 self.assertIn('two', response.headers['Location']) 472 self.assertEqual(response.status_code, httplib.FOUND) 473 474 # Starting the authorization flow should include the 475 # include_granted_scopes parameter as well as the scopes. 476 response = client.get(response.headers['Location'][17:]) 477 q = urlparse.parse_qs( 478 response.headers['Location'].split('?', 1)[1]) 479 self.assertIn('include_granted_scopes', q) 480 self.assertEqual( 481 set(q['scope'][0].split(' ')), 482 set(['one', 'email', 'two', 'three'])) 483 484 # Actually call two() without a redirect. 485 credentials2 = self._generate_credentials( 486 scopes=['email', 'two', 'three']) 487 488 with self.app.test_client() as client: 489 with client.session_transaction() as session: 490 session['google_oauth2_credentials'] = credentials2.to_json() 491 492 response = client.get('/two') 493 self.assertEqual(response.status_code, httplib.OK) 494 495 def test_incremental_auth_exchange(self): 496 self._create_incremental_auth_app() 497 498 with Http2Mock(): 499 with self.app.test_client() as client: 500 state = self._setup_callback_state( 501 client, 502 return_url='/return_url', 503 # Incremental auth scopes. 504 scopes=['one', 'two']) 505 506 response = client.get( 507 '/oauth2callback?state={0}&code=codez'.format(state)) 508 self.assertEqual(response.status_code, httplib.FOUND) 509 510 credentials = self.oauth2.credentials 511 self.assertTrue( 512 credentials.has_scopes(['email', 'one', 'two'])) 513 514 def test_refresh(self): 515 with self.app.test_request_context(): 516 with mock.patch('flask.session'): 517 self.oauth2.storage.put(self._generate_credentials()) 518 519 self.oauth2.credentials.refresh( 520 Http2Mock(access_token='new_token')) 521 522 self.assertEqual( 523 self.oauth2.storage.get().access_token, 'new_token') 524 525 def test_delete(self): 526 with self.app.test_request_context(): 527 528 self.oauth2.storage.put(self._generate_credentials()) 529 self.oauth2.storage.delete() 530 531 self.assertNotIn('google_oauth2_credentials', flask.session) 532