1# Copyright 2016 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unit test for django_util views""" 16 17import copy 18import json 19 20import django 21from django import http 22import django.conf 23from django.contrib.auth.models import AnonymousUser, User 24import mock 25from six.moves import reload_module 26 27from tests.contrib.django_util import TestWithDjangoEnvironment 28from tests.contrib.django_util.models import CredentialsModel 29 30from oauth2client.client import FlowExchangeError, OAuth2WebServerFlow 31import oauth2client.contrib.django_util 32from oauth2client.contrib.django_util import views 33from oauth2client.contrib.django_util.models import CredentialsField 34 35 36class OAuth2AuthorizeTest(TestWithDjangoEnvironment): 37 38 def setUp(self): 39 super(OAuth2AuthorizeTest, self).setUp() 40 self.save_settings = copy.deepcopy(django.conf.settings) 41 reload_module(oauth2client.contrib.django_util) 42 self.user = User.objects.create_user( 43 username='bill', email='bill@example.com', password='hunter2') 44 45 def tearDown(self): 46 django.conf.settings = copy.deepcopy(self.save_settings) 47 48 def test_authorize_works(self): 49 request = self.factory.get('oauth2/oauth2authorize') 50 request.session = self.session 51 request.user = self.user 52 response = views.oauth2_authorize(request) 53 self.assertIsInstance(response, http.HttpResponseRedirect) 54 55 def test_authorize_anonymous_user(self): 56 request = self.factory.get('oauth2/oauth2authorize') 57 request.session = self.session 58 request.user = AnonymousUser() 59 response = views.oauth2_authorize(request) 60 self.assertIsInstance(response, http.HttpResponseRedirect) 61 62 def test_authorize_works_explicit_return_url(self): 63 request = self.factory.get('oauth2/oauth2authorize', 64 data={'return_url': '/return_endpoint'}) 65 request.session = self.session 66 request.user = self.user 67 response = views.oauth2_authorize(request) 68 self.assertIsInstance(response, http.HttpResponseRedirect) 69 70 71class Oauth2AuthorizeStorageModelTest(TestWithDjangoEnvironment): 72 73 def setUp(self): 74 super(Oauth2AuthorizeStorageModelTest, self).setUp() 75 self.save_settings = copy.deepcopy(django.conf.settings) 76 77 STORAGE_MODEL = { 78 'model': 'tests.contrib.django_util.models.CredentialsModel', 79 'user_property': 'user_id', 80 'credentials_property': 'credentials' 81 } 82 django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL 83 84 # OAuth2 Settings gets configured based on Django settings 85 # at import time, so in order for us to reload the settings 86 # we need to reload the module 87 reload_module(oauth2client.contrib.django_util) 88 self.user = User.objects.create_user( 89 username='bill', email='bill@example.com', password='hunter2') 90 91 def tearDown(self): 92 django.conf.settings = copy.deepcopy(self.save_settings) 93 94 def test_authorize_works(self): 95 request = self.factory.get('oauth2/oauth2authorize') 96 request.session = self.session 97 request.user = self.user 98 response = views.oauth2_authorize(request) 99 self.assertIsInstance(response, http.HttpResponseRedirect) 100 # redirects to Google oauth 101 self.assertIn('accounts.google.com', response.url) 102 103 def test_authorize_anonymous_user_redirects_login(self): 104 request = self.factory.get('oauth2/oauth2authorize') 105 request.session = self.session 106 request.user = AnonymousUser() 107 response = views.oauth2_authorize(request) 108 self.assertIsInstance(response, http.HttpResponseRedirect) 109 # redirects to Django login 110 self.assertIn(django.conf.settings.LOGIN_URL, response.url) 111 112 def test_authorize_works_explicit_return_url(self): 113 request = self.factory.get('oauth2/oauth2authorize', 114 data={'return_url': '/return_endpoint'}) 115 request.session = self.session 116 request.user = self.user 117 response = views.oauth2_authorize(request) 118 self.assertIsInstance(response, http.HttpResponseRedirect) 119 120 def test_authorized_user_not_logged_in_redirects(self): 121 request = self.factory.get('oauth2/oauth2authorize', 122 data={'return_url': '/return_endpoint'}) 123 request.session = self.session 124 125 authorized_user = User.objects.create_user( 126 username='bill2', email='bill@example.com', password='hunter2') 127 credentials = CredentialsField() 128 129 CredentialsModel.objects.create( 130 user_id=authorized_user, 131 credentials=credentials) 132 133 request.user = authorized_user 134 response = views.oauth2_authorize(request) 135 self.assertIsInstance(response, http.HttpResponseRedirect) 136 137 138class Oauth2CallbackTest(TestWithDjangoEnvironment): 139 140 def setUp(self): 141 super(Oauth2CallbackTest, self).setUp() 142 self.save_settings = copy.deepcopy(django.conf.settings) 143 reload_module(oauth2client.contrib.django_util) 144 145 self.CSRF_TOKEN = 'token' 146 self.RETURN_URL = 'http://return-url.com' 147 self.fake_state = { 148 'csrf_token': self.CSRF_TOKEN, 149 'return_url': self.RETURN_URL, 150 'scopes': django.conf.settings.GOOGLE_OAUTH2_SCOPES 151 } 152 self.user = User.objects.create_user( 153 username='bill', email='bill@example.com', password='hunter2') 154 155 @mock.patch('oauth2client.contrib.django_util.views.pickle') 156 def test_callback_works(self, pickle): 157 request = self.factory.get('oauth2/oauth2callback', data={ 158 'state': json.dumps(self.fake_state), 159 'code': 123 160 }) 161 162 self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN 163 164 flow = OAuth2WebServerFlow( 165 client_id='clientid', 166 client_secret='clientsecret', 167 scope=['email'], 168 state=json.dumps(self.fake_state), 169 redirect_uri=request.build_absolute_uri("oauth2/oauth2callback")) 170 171 name = 'google_oauth2_flow_{0}'.format(self.CSRF_TOKEN) 172 self.session[name] = pickle.dumps(flow) 173 flow.step2_exchange = mock.Mock() 174 pickle.loads.return_value = flow 175 176 request.session = self.session 177 request.user = self.user 178 response = views.oauth2_callback(request) 179 self.assertIsInstance(response, http.HttpResponseRedirect) 180 self.assertEqual( 181 response.status_code, django.http.HttpResponseRedirect.status_code) 182 self.assertEqual(response['Location'], self.RETURN_URL) 183 184 @mock.patch('oauth2client.contrib.django_util.views.pickle') 185 def test_callback_handles_bad_flow_exchange(self, pickle): 186 request = self.factory.get('oauth2/oauth2callback', data={ 187 "state": json.dumps(self.fake_state), 188 "code": 123 189 }) 190 191 self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN 192 193 flow = OAuth2WebServerFlow( 194 client_id='clientid', 195 client_secret='clientsecret', 196 scope=['email'], 197 state=json.dumps(self.fake_state), 198 redirect_uri=request.build_absolute_uri('oauth2/oauth2callback')) 199 200 self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] \ 201 = pickle.dumps(flow) 202 203 def local_throws(code): 204 raise FlowExchangeError('test') 205 206 flow.step2_exchange = local_throws 207 pickle.loads.return_value = flow 208 209 request.session = self.session 210 response = views.oauth2_callback(request) 211 self.assertIsInstance(response, http.HttpResponseBadRequest) 212 213 def test_error_returns_bad_request(self): 214 request = self.factory.get('oauth2/oauth2callback', data={ 215 'error': 'There was an error in your authorization.', 216 }) 217 response = views.oauth2_callback(request) 218 self.assertIsInstance(response, http.HttpResponseBadRequest) 219 self.assertIn(b'Authorization failed', response.content) 220 221 def test_no_session(self): 222 request = self.factory.get('oauth2/oauth2callback', data={ 223 'code': 123, 224 'state': json.dumps(self.fake_state) 225 }) 226 227 request.session = self.session 228 response = views.oauth2_callback(request) 229 self.assertIsInstance(response, http.HttpResponseBadRequest) 230 self.assertEqual( 231 response.content, b'No existing session for this flow.') 232 233 def test_missing_state_returns_bad_request(self): 234 request = self.factory.get('oauth2/oauth2callback', data={ 235 'code': 123 236 }) 237 self.session['google_oauth2_csrf_token'] = "token" 238 request.session = self.session 239 response = views.oauth2_callback(request) 240 self.assertIsInstance(response, http.HttpResponseBadRequest) 241 242 def test_bad_state(self): 243 request = self.factory.get('oauth2/oauth2callback', data={ 244 'code': 123, 245 'state': json.dumps({'wrong': 'state'}) 246 }) 247 self.session['google_oauth2_csrf_token'] = 'token' 248 request.session = self.session 249 response = views.oauth2_callback(request) 250 self.assertIsInstance(response, http.HttpResponseBadRequest) 251 self.assertEqual(response.content, b'Invalid state parameter.') 252 253 def test_bad_csrf(self): 254 request = self.factory.get('oauth2/oauth2callback', data={ 255 "state": json.dumps(self.fake_state), 256 "code": 123 257 }) 258 self.session['google_oauth2_csrf_token'] = 'WRONG TOKEN' 259 request.session = self.session 260 response = views.oauth2_callback(request) 261 self.assertIsInstance(response, http.HttpResponseBadRequest) 262 self.assertEqual(response.content, b'Invalid CSRF token.') 263 264 def test_no_saved_flow(self): 265 request = self.factory.get('oauth2/oauth2callback', data={ 266 'state': json.dumps(self.fake_state), 267 'code': 123 268 }) 269 self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN 270 self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] = None 271 request.session = self.session 272 response = views.oauth2_callback(request) 273 self.assertIsInstance(response, http.HttpResponseBadRequest) 274 self.assertEqual(response.content, b'Missing Oauth2 flow.') 275