• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unit test for django_util views"""
16
17import copy
18import json
19
20import django
21from django import http
22import django.conf
23from django.contrib.auth.models import AnonymousUser, User
24import mock
25from six.moves import reload_module
26
27from tests.contrib.django_util import TestWithDjangoEnvironment
28from tests.contrib.django_util.models import CredentialsModel
29
30from oauth2client.client import FlowExchangeError, OAuth2WebServerFlow
31import oauth2client.contrib.django_util
32from oauth2client.contrib.django_util import views
33from oauth2client.contrib.django_util.models import CredentialsField
34
35
36class OAuth2AuthorizeTest(TestWithDjangoEnvironment):
37
38    def setUp(self):
39        super(OAuth2AuthorizeTest, self).setUp()
40        self.save_settings = copy.deepcopy(django.conf.settings)
41        reload_module(oauth2client.contrib.django_util)
42        self.user = User.objects.create_user(
43          username='bill', email='bill@example.com', password='hunter2')
44
45    def tearDown(self):
46        django.conf.settings = copy.deepcopy(self.save_settings)
47
48    def test_authorize_works(self):
49        request = self.factory.get('oauth2/oauth2authorize')
50        request.session = self.session
51        request.user = self.user
52        response = views.oauth2_authorize(request)
53        self.assertIsInstance(response, http.HttpResponseRedirect)
54
55    def test_authorize_anonymous_user(self):
56        request = self.factory.get('oauth2/oauth2authorize')
57        request.session = self.session
58        request.user = AnonymousUser()
59        response = views.oauth2_authorize(request)
60        self.assertIsInstance(response, http.HttpResponseRedirect)
61
62    def test_authorize_works_explicit_return_url(self):
63        request = self.factory.get('oauth2/oauth2authorize',
64                                   data={'return_url': '/return_endpoint'})
65        request.session = self.session
66        request.user = self.user
67        response = views.oauth2_authorize(request)
68        self.assertIsInstance(response, http.HttpResponseRedirect)
69
70
71class Oauth2AuthorizeStorageModelTest(TestWithDjangoEnvironment):
72
73    def setUp(self):
74        super(Oauth2AuthorizeStorageModelTest, self).setUp()
75        self.save_settings = copy.deepcopy(django.conf.settings)
76
77        STORAGE_MODEL = {
78            'model': 'tests.contrib.django_util.models.CredentialsModel',
79            'user_property': 'user_id',
80            'credentials_property': 'credentials'
81        }
82        django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL
83
84        # OAuth2 Settings gets configured based on Django settings
85        # at import time, so in order for us to reload the settings
86        # we need to reload the module
87        reload_module(oauth2client.contrib.django_util)
88        self.user = User.objects.create_user(
89            username='bill', email='bill@example.com', password='hunter2')
90
91    def tearDown(self):
92        django.conf.settings = copy.deepcopy(self.save_settings)
93
94    def test_authorize_works(self):
95        request = self.factory.get('oauth2/oauth2authorize')
96        request.session = self.session
97        request.user = self.user
98        response = views.oauth2_authorize(request)
99        self.assertIsInstance(response, http.HttpResponseRedirect)
100        # redirects to Google oauth
101        self.assertIn('accounts.google.com', response.url)
102
103    def test_authorize_anonymous_user_redirects_login(self):
104        request = self.factory.get('oauth2/oauth2authorize')
105        request.session = self.session
106        request.user = AnonymousUser()
107        response = views.oauth2_authorize(request)
108        self.assertIsInstance(response, http.HttpResponseRedirect)
109        # redirects to Django login
110        self.assertIn(django.conf.settings.LOGIN_URL, response.url)
111
112    def test_authorize_works_explicit_return_url(self):
113        request = self.factory.get('oauth2/oauth2authorize',
114                                   data={'return_url': '/return_endpoint'})
115        request.session = self.session
116        request.user = self.user
117        response = views.oauth2_authorize(request)
118        self.assertIsInstance(response, http.HttpResponseRedirect)
119
120    def test_authorized_user_not_logged_in_redirects(self):
121        request = self.factory.get('oauth2/oauth2authorize',
122                                   data={'return_url': '/return_endpoint'})
123        request.session = self.session
124
125        authorized_user = User.objects.create_user(
126            username='bill2', email='bill@example.com', password='hunter2')
127        credentials = CredentialsField()
128
129        CredentialsModel.objects.create(
130            user_id=authorized_user,
131            credentials=credentials)
132
133        request.user = authorized_user
134        response = views.oauth2_authorize(request)
135        self.assertIsInstance(response, http.HttpResponseRedirect)
136
137
138class Oauth2CallbackTest(TestWithDjangoEnvironment):
139
140    def setUp(self):
141        super(Oauth2CallbackTest, self).setUp()
142        self.save_settings = copy.deepcopy(django.conf.settings)
143        reload_module(oauth2client.contrib.django_util)
144
145        self.CSRF_TOKEN = 'token'
146        self.RETURN_URL = 'http://return-url.com'
147        self.fake_state = {
148            'csrf_token': self.CSRF_TOKEN,
149            'return_url': self.RETURN_URL,
150            'scopes': django.conf.settings.GOOGLE_OAUTH2_SCOPES
151        }
152        self.user = User.objects.create_user(
153            username='bill', email='bill@example.com', password='hunter2')
154
155    @mock.patch('oauth2client.contrib.django_util.views.pickle')
156    def test_callback_works(self, pickle):
157        request = self.factory.get('oauth2/oauth2callback', data={
158            'state': json.dumps(self.fake_state),
159            'code': 123
160        })
161
162        self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
163
164        flow = OAuth2WebServerFlow(
165            client_id='clientid',
166            client_secret='clientsecret',
167            scope=['email'],
168            state=json.dumps(self.fake_state),
169            redirect_uri=request.build_absolute_uri("oauth2/oauth2callback"))
170
171        name = 'google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)
172        self.session[name] = pickle.dumps(flow)
173        flow.step2_exchange = mock.Mock()
174        pickle.loads.return_value = flow
175
176        request.session = self.session
177        request.user = self.user
178        response = views.oauth2_callback(request)
179        self.assertIsInstance(response, http.HttpResponseRedirect)
180        self.assertEqual(
181            response.status_code, django.http.HttpResponseRedirect.status_code)
182        self.assertEqual(response['Location'], self.RETURN_URL)
183
184    @mock.patch('oauth2client.contrib.django_util.views.pickle')
185    def test_callback_handles_bad_flow_exchange(self, pickle):
186        request = self.factory.get('oauth2/oauth2callback', data={
187            "state": json.dumps(self.fake_state),
188            "code": 123
189        })
190
191        self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
192
193        flow = OAuth2WebServerFlow(
194            client_id='clientid',
195            client_secret='clientsecret',
196            scope=['email'],
197            state=json.dumps(self.fake_state),
198            redirect_uri=request.build_absolute_uri('oauth2/oauth2callback'))
199
200        self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] \
201            = pickle.dumps(flow)
202
203        def local_throws(code):
204            raise FlowExchangeError('test')
205
206        flow.step2_exchange = local_throws
207        pickle.loads.return_value = flow
208
209        request.session = self.session
210        response = views.oauth2_callback(request)
211        self.assertIsInstance(response, http.HttpResponseBadRequest)
212
213    def test_error_returns_bad_request(self):
214        request = self.factory.get('oauth2/oauth2callback', data={
215            'error': 'There was an error in your authorization.',
216        })
217        response = views.oauth2_callback(request)
218        self.assertIsInstance(response, http.HttpResponseBadRequest)
219        self.assertIn(b'Authorization failed', response.content)
220
221    def test_no_session(self):
222        request = self.factory.get('oauth2/oauth2callback', data={
223            'code': 123,
224            'state': json.dumps(self.fake_state)
225        })
226
227        request.session = self.session
228        response = views.oauth2_callback(request)
229        self.assertIsInstance(response, http.HttpResponseBadRequest)
230        self.assertEqual(
231            response.content, b'No existing session for this flow.')
232
233    def test_missing_state_returns_bad_request(self):
234        request = self.factory.get('oauth2/oauth2callback', data={
235            'code': 123
236        })
237        self.session['google_oauth2_csrf_token'] = "token"
238        request.session = self.session
239        response = views.oauth2_callback(request)
240        self.assertIsInstance(response, http.HttpResponseBadRequest)
241
242    def test_bad_state(self):
243        request = self.factory.get('oauth2/oauth2callback', data={
244            'code': 123,
245            'state': json.dumps({'wrong': 'state'})
246        })
247        self.session['google_oauth2_csrf_token'] = 'token'
248        request.session = self.session
249        response = views.oauth2_callback(request)
250        self.assertIsInstance(response, http.HttpResponseBadRequest)
251        self.assertEqual(response.content, b'Invalid state parameter.')
252
253    def test_bad_csrf(self):
254        request = self.factory.get('oauth2/oauth2callback', data={
255            "state": json.dumps(self.fake_state),
256            "code": 123
257        })
258        self.session['google_oauth2_csrf_token'] = 'WRONG TOKEN'
259        request.session = self.session
260        response = views.oauth2_callback(request)
261        self.assertIsInstance(response, http.HttpResponseBadRequest)
262        self.assertEqual(response.content, b'Invalid CSRF token.')
263
264    def test_no_saved_flow(self):
265        request = self.factory.get('oauth2/oauth2callback', data={
266            'state': json.dumps(self.fake_state),
267            'code': 123
268        })
269        self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
270        self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] = None
271        request.session = self.session
272        response = views.oauth2_callback(request)
273        self.assertIsInstance(response, http.HttpResponseBadRequest)
274        self.assertEqual(response.content, b'Missing Oauth2 flow.')
275