• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 Google Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unit tests for the Flask utilities"""
16
17import datetime
18import json
19import logging
20
21import flask
22import httplib2
23import mock
24import six.moves.http_client as httplib
25import six.moves.urllib.parse as urlparse
26import unittest2
27
28import oauth2client
29from oauth2client import client
30from oauth2client import clientsecrets
31from oauth2client.contrib import flask_util
32
33
34__author__ = 'jonwayne@google.com (Jon Wayne Parrott)'
35
36
37class Http2Mock(object):
38    """Mock httplib2.Http for code exchange / refresh"""
39
40    def __init__(self, status=httplib.OK, **kwargs):
41        self.status = status
42        self.content = {
43            'access_token': 'foo_access_token',
44            'refresh_token': 'foo_refresh_token',
45            'expires_in': 3600,
46            'extra': 'value',
47        }
48        self.content.update(kwargs)
49
50    def request(self, token_uri, method, body, headers, *args, **kwargs):
51        self.body = body
52        self.headers = headers
53        return (self, json.dumps(self.content).encode('utf-8'))
54
55    def __enter__(self):
56        self.httplib2_orig = httplib2.Http
57        httplib2.Http = self
58        return self
59
60    def __exit__(self, exc_type, exc_value, traceback):
61        httplib2.Http = self.httplib2_orig
62
63    def __call__(self, *args, **kwargs):
64        return self
65
66
67class FlaskOAuth2Tests(unittest2.TestCase):
68
69    def setUp(self):
70        self.app = flask.Flask(__name__)
71        self.app.testing = True
72        self.app.config['SECRET_KEY'] = 'notasecert'
73        self.app.logger.setLevel(logging.CRITICAL)
74        self.oauth2 = flask_util.UserOAuth2(
75            self.app,
76            client_id='client_idz',
77            client_secret='client_secretz')
78
79    def _generate_credentials(self, scopes=None):
80        return client.OAuth2Credentials(
81            'access_tokenz',
82            'client_idz',
83            'client_secretz',
84            'refresh_tokenz',
85            datetime.datetime.utcnow() + datetime.timedelta(seconds=3600),
86            oauth2client.GOOGLE_TOKEN_URI,
87            'Test',
88            id_token={
89                'sub': '123',
90                'email': 'user@example.com'
91            },
92            scopes=scopes)
93
94    def test_explicit_configuration(self):
95        oauth2 = flask_util.UserOAuth2(
96            flask.Flask(__name__), client_id='id', client_secret='secret')
97
98        self.assertEqual(oauth2.client_id, 'id')
99        self.assertEqual(oauth2.client_secret, 'secret')
100
101        return_val = (
102            clientsecrets.TYPE_WEB,
103            {'client_id': 'id', 'client_secret': 'secret'})
104
105        with mock.patch('oauth2client.clientsecrets.loadfile',
106                        return_value=return_val):
107
108            oauth2 = flask_util.UserOAuth2(
109                flask.Flask(__name__), client_secrets_file='file.json')
110
111            self.assertEqual(oauth2.client_id, 'id')
112            self.assertEqual(oauth2.client_secret, 'secret')
113
114    def test_delayed_configuration(self):
115        app = flask.Flask(__name__)
116        oauth2 = flask_util.UserOAuth2()
117        oauth2.init_app(app, client_id='id', client_secret='secret')
118        self.assertEqual(oauth2.app, app)
119
120    def test_explicit_storage(self):
121        storage_mock = mock.Mock()
122        oauth2 = flask_util.UserOAuth2(
123            flask.Flask(__name__), storage=storage_mock, client_id='id',
124            client_secret='secret')
125        self.assertEqual(oauth2.storage, storage_mock)
126
127    def test_explicit_scopes(self):
128        oauth2 = flask_util.UserOAuth2(
129            flask.Flask(__name__), scopes=['1', '2'], client_id='id',
130            client_secret='secret')
131        self.assertEqual(oauth2.scopes, ['1', '2'])
132
133    def test_bad_client_secrets(self):
134        return_val = (
135            'other',
136            {'client_id': 'id', 'client_secret': 'secret'})
137
138        with mock.patch('oauth2client.clientsecrets.loadfile',
139                        return_value=return_val):
140            with self.assertRaises(ValueError):
141                flask_util.UserOAuth2(flask.Flask(__name__),
142                                      client_secrets_file='file.json')
143
144    def test_app_configuration(self):
145        app = flask.Flask(__name__)
146        app.config['GOOGLE_OAUTH2_CLIENT_ID'] = 'id'
147        app.config['GOOGLE_OAUTH2_CLIENT_SECRET'] = 'secret'
148
149        oauth2 = flask_util.UserOAuth2(app)
150
151        self.assertEqual(oauth2.client_id, 'id')
152        self.assertEqual(oauth2.client_secret, 'secret')
153
154        return_val = (
155            clientsecrets.TYPE_WEB,
156            {'client_id': 'id2', 'client_secret': 'secret2'})
157
158        with mock.patch('oauth2client.clientsecrets.loadfile',
159                        return_value=return_val):
160
161            app = flask.Flask(__name__)
162            app.config['GOOGLE_OAUTH2_CLIENT_SECRETS_FILE'] = 'file.json'
163            oauth2 = flask_util.UserOAuth2(app)
164
165            self.assertEqual(oauth2.client_id, 'id2')
166            self.assertEqual(oauth2.client_secret, 'secret2')
167
168    def test_no_configuration(self):
169        with self.assertRaises(ValueError):
170            flask_util.UserOAuth2(flask.Flask(__name__))
171
172    def test_create_flow(self):
173        with self.app.test_request_context():
174            flow = self.oauth2._make_flow()
175            state = json.loads(flow.params['state'])
176            self.assertIn('google_oauth2_csrf_token', flask.session)
177            self.assertEqual(
178                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
179            self.assertEqual(flow.client_id, self.oauth2.client_id)
180            self.assertEqual(flow.client_secret, self.oauth2.client_secret)
181            self.assertIn('http', flow.redirect_uri)
182            self.assertIn('oauth2callback', flow.redirect_uri)
183
184            flow = self.oauth2._make_flow(return_url='/return_url')
185            state = json.loads(flow.params['state'])
186            self.assertEqual(state['return_url'], '/return_url')
187
188            flow = self.oauth2._make_flow(extra_arg='test')
189            self.assertEqual(flow.params['extra_arg'], 'test')
190
191        # Test extra args specified in the constructor.
192        app = flask.Flask(__name__)
193        app.config['SECRET_KEY'] = 'notasecert'
194        oauth2 = flask_util.UserOAuth2(
195            app, client_id='client_id', client_secret='secret',
196            extra_arg='test')
197
198        with app.test_request_context():
199            flow = oauth2._make_flow()
200            self.assertEqual(flow.params['extra_arg'], 'test')
201
202    def test_authorize_view(self):
203        with self.app.test_client() as client:
204            response = client.get('/oauth2authorize')
205            location = response.headers['Location']
206            q = urlparse.parse_qs(location.split('?', 1)[1])
207            state = json.loads(q['state'][0])
208
209            self.assertIn(oauth2client.GOOGLE_AUTH_URI, location)
210            self.assertNotIn(self.oauth2.client_secret, location)
211            self.assertIn(self.oauth2.client_id, q['client_id'])
212            self.assertEqual(
213                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
214            self.assertEqual(state['return_url'], '/')
215
216        with self.app.test_client() as client:
217            response = client.get('/oauth2authorize?return_url=/test')
218            location = response.headers['Location']
219            q = urlparse.parse_qs(location.split('?', 1)[1])
220            state = json.loads(q['state'][0])
221            self.assertEqual(state['return_url'], '/test')
222
223        with self.app.test_client() as client:
224            response = client.get('/oauth2authorize?extra_param=test')
225            location = response.headers['Location']
226            self.assertIn('extra_param=test', location)
227
228    def _setup_callback_state(self, client, **kwargs):
229        with self.app.test_request_context():
230            # Flask doesn't create a request context with a session
231            # transaction for some reason, so, set up the flow here,
232            # then apply it to the session in the transaction.
233            if not kwargs:
234                self.oauth2._make_flow(return_url='/return_url')
235            else:
236                self.oauth2._make_flow(**kwargs)
237
238            with client.session_transaction() as session:
239                session.update(flask.session)
240                csrf_token = session['google_oauth2_csrf_token']
241                flow = flask_util._get_flow_for_token(csrf_token)
242                state = flow.params['state']
243
244        return state
245
246    def test_callback_view(self):
247        self.oauth2.storage = mock.Mock()
248        with self.app.test_client() as client:
249            with Http2Mock() as http:
250                state = self._setup_callback_state(client)
251
252                response = client.get(
253                    '/oauth2callback?state={0}&code=codez'.format(state))
254
255                self.assertEqual(response.status_code, httplib.FOUND)
256                self.assertIn('/return_url', response.headers['Location'])
257                self.assertIn(self.oauth2.client_secret, http.body)
258                self.assertIn('codez', http.body)
259                self.assertTrue(self.oauth2.storage.put.called)
260
261    def test_authorize_callback(self):
262        self.oauth2.authorize_callback = mock.Mock()
263        self.test_callback_view()
264        self.assertTrue(self.oauth2.authorize_callback.called)
265
266    def test_callback_view_errors(self):
267        # Error supplied to callback
268        with self.app.test_client() as client:
269            with client.session_transaction() as session:
270                session['google_oauth2_csrf_token'] = 'tokenz'
271
272            response = client.get('/oauth2callback?state={}&error=something')
273            self.assertEqual(response.status_code, httplib.BAD_REQUEST)
274            self.assertIn('something', response.data.decode('utf-8'))
275
276        # CSRF mismatch
277        with self.app.test_client() as client:
278            with client.session_transaction() as session:
279                session['google_oauth2_csrf_token'] = 'goodstate'
280
281            state = json.dumps({
282                'csrf_token': 'badstate',
283                'return_url': '/return_url'
284            })
285
286            response = client.get(
287                '/oauth2callback?state={0}&code=codez'.format(state))
288            self.assertEqual(response.status_code, httplib.BAD_REQUEST)
289
290        # KeyError, no CSRF state.
291        with self.app.test_client() as client:
292            response = client.get('/oauth2callback?state={}&code=codez')
293            self.assertEqual(response.status_code, httplib.BAD_REQUEST)
294
295        # Code exchange error
296        with self.app.test_client() as client:
297            state = self._setup_callback_state(client)
298
299            with Http2Mock(status=httplib.INTERNAL_SERVER_ERROR):
300                response = client.get(
301                    '/oauth2callback?state={0}&code=codez'.format(state))
302                self.assertEqual(response.status_code, httplib.BAD_REQUEST)
303
304        # Invalid state json
305        with self.app.test_client() as client:
306            with client.session_transaction() as session:
307                session['google_oauth2_csrf_token'] = 'tokenz'
308
309            state = '[{'
310            response = client.get(
311                '/oauth2callback?state={0}&code=codez'.format(state))
312            self.assertEqual(response.status_code, httplib.BAD_REQUEST)
313
314        # Missing flow.
315        with self.app.test_client() as client:
316            with client.session_transaction() as session:
317                session['google_oauth2_csrf_token'] = 'tokenz'
318
319            state = json.dumps({
320                'csrf_token': 'tokenz',
321                'return_url': '/return_url'
322            })
323
324            response = client.get(
325                '/oauth2callback?state={0}&code=codez'.format(state))
326            self.assertEqual(response.status_code, httplib.BAD_REQUEST)
327
328    def test_no_credentials(self):
329        with self.app.test_request_context():
330            self.assertFalse(self.oauth2.has_credentials())
331            self.assertTrue(self.oauth2.credentials is None)
332            self.assertTrue(self.oauth2.user_id is None)
333            self.assertTrue(self.oauth2.email is None)
334            with self.assertRaises(ValueError):
335                self.oauth2.http()
336            self.assertFalse(self.oauth2.storage.get())
337            self.oauth2.storage.delete()
338
339    def test_with_credentials(self):
340        credentials = self._generate_credentials()
341        with self.app.test_request_context():
342            self.oauth2.storage.put(credentials)
343            self.assertEqual(
344                self.oauth2.credentials.access_token, credentials.access_token)
345            self.assertEqual(
346                self.oauth2.credentials.refresh_token,
347                credentials.refresh_token)
348            self.assertEqual(self.oauth2.user_id, '123')
349            self.assertEqual(self.oauth2.email, 'user@example.com')
350            self.assertTrue(self.oauth2.http())
351
352    @mock.patch('oauth2client.client._UTCNOW')
353    def test_with_expired_credentials(self, utcnow):
354        utcnow.return_value = datetime.datetime(1990, 5, 29)
355
356        credentials = self._generate_credentials()
357        credentials.token_expiry = datetime.datetime(1990, 5, 28)
358
359        # Has a refresh token, so this should be fine.
360        with self.app.test_request_context():
361            self.oauth2.storage.put(credentials)
362            self.assertTrue(self.oauth2.has_credentials())
363
364        # Without a refresh token this should return false.
365        credentials.refresh_token = None
366        with self.app.test_request_context():
367            self.oauth2.storage.put(credentials)
368            self.assertFalse(self.oauth2.has_credentials())
369
370    def test_bad_id_token(self):
371        credentials = self._generate_credentials()
372        credentials.id_token = {}
373        with self.app.test_request_context():
374            self.oauth2.storage.put(credentials)
375            self.assertTrue(self.oauth2.user_id is None)
376            self.assertTrue(self.oauth2.email is None)
377
378    def test_required(self):
379        @self.app.route('/protected')
380        @self.oauth2.required
381        def index():
382            return 'Hello'
383
384        # No credentials, should redirect
385        with self.app.test_client() as client:
386            response = client.get('/protected')
387            self.assertEqual(response.status_code, httplib.FOUND)
388            self.assertIn('oauth2authorize', response.headers['Location'])
389            self.assertIn('protected', response.headers['Location'])
390
391        credentials = self._generate_credentials(scopes=self.oauth2.scopes)
392
393        # With credentials, should allow
394        with self.app.test_client() as client:
395            with client.session_transaction() as session:
396                session['google_oauth2_credentials'] = credentials.to_json()
397
398            response = client.get('/protected')
399            self.assertEqual(response.status_code, httplib.OK)
400            self.assertIn('Hello', response.data.decode('utf-8'))
401
402        # Expired credentials with refresh token, should allow.
403        credentials.token_expiry = datetime.datetime(1990, 5, 28)
404        with mock.patch('oauth2client.client._UTCNOW') as utcnow:
405            utcnow.return_value = datetime.datetime(1990, 5, 29)
406
407            with self.app.test_client() as client:
408                with client.session_transaction() as session:
409                    session['google_oauth2_credentials'] = (
410                        credentials.to_json())
411
412                response = client.get('/protected')
413                self.assertEqual(response.status_code, httplib.OK)
414                self.assertIn('Hello', response.data.decode('utf-8'))
415
416        # Expired credentials without a refresh token, should redirect.
417        credentials.refresh_token = None
418        with mock.patch('oauth2client.client._UTCNOW') as utcnow:
419            utcnow.return_value = datetime.datetime(1990, 5, 29)
420
421            with self.app.test_client() as client:
422                with client.session_transaction() as session:
423                    session['google_oauth2_credentials'] = (
424                        credentials.to_json())
425
426                response = client.get('/protected')
427            self.assertEqual(response.status_code, httplib.FOUND)
428            self.assertIn('oauth2authorize', response.headers['Location'])
429            self.assertIn('protected', response.headers['Location'])
430
431    def _create_incremental_auth_app(self):
432        self.app = flask.Flask(__name__)
433        self.app.testing = True
434        self.app.config['SECRET_KEY'] = 'notasecert'
435        self.oauth2 = flask_util.UserOAuth2(
436            self.app,
437            client_id='client_idz',
438            client_secret='client_secretz',
439            include_granted_scopes=True)
440
441        @self.app.route('/one')
442        @self.oauth2.required(scopes=['one'])
443        def one():
444            return 'Hello'
445
446        @self.app.route('/two')
447        @self.oauth2.required(scopes=['two', 'three'])
448        def two():
449            return 'Hello'
450
451    def test_incremental_auth(self):
452        self._create_incremental_auth_app()
453
454        # No credentials, should redirect
455        with self.app.test_client() as client:
456            response = client.get('/one')
457            self.assertIn('one', response.headers['Location'])
458            self.assertEqual(response.status_code, httplib.FOUND)
459
460        # Credentials for one. /one should allow, /two should redirect.
461        credentials = self._generate_credentials(scopes=['email', 'one'])
462
463        with self.app.test_client() as client:
464            with client.session_transaction() as session:
465                session['google_oauth2_credentials'] = credentials.to_json()
466
467            response = client.get('/one')
468            self.assertEqual(response.status_code, httplib.OK)
469
470            response = client.get('/two')
471            self.assertIn('two', response.headers['Location'])
472            self.assertEqual(response.status_code, httplib.FOUND)
473
474            # Starting the authorization flow should include the
475            # include_granted_scopes parameter as well as the scopes.
476            response = client.get(response.headers['Location'][17:])
477            q = urlparse.parse_qs(
478                response.headers['Location'].split('?', 1)[1])
479            self.assertIn('include_granted_scopes', q)
480            self.assertEqual(
481                set(q['scope'][0].split(' ')),
482                set(['one', 'email', 'two', 'three']))
483
484        # Actually call two() without a redirect.
485        credentials2 = self._generate_credentials(
486            scopes=['email', 'two', 'three'])
487
488        with self.app.test_client() as client:
489            with client.session_transaction() as session:
490                session['google_oauth2_credentials'] = credentials2.to_json()
491
492            response = client.get('/two')
493            self.assertEqual(response.status_code, httplib.OK)
494
495    def test_incremental_auth_exchange(self):
496        self._create_incremental_auth_app()
497
498        with Http2Mock():
499            with self.app.test_client() as client:
500                state = self._setup_callback_state(
501                    client,
502                    return_url='/return_url',
503                    # Incremental auth scopes.
504                    scopes=['one', 'two'])
505
506                response = client.get(
507                    '/oauth2callback?state={0}&code=codez'.format(state))
508                self.assertEqual(response.status_code, httplib.FOUND)
509
510                credentials = self.oauth2.credentials
511                self.assertTrue(
512                    credentials.has_scopes(['email', 'one', 'two']))
513
514    def test_refresh(self):
515        with self.app.test_request_context():
516            with mock.patch('flask.session'):
517                self.oauth2.storage.put(self._generate_credentials())
518
519                self.oauth2.credentials.refresh(
520                    Http2Mock(access_token='new_token'))
521
522                self.assertEqual(
523                    self.oauth2.storage.get().access_token, 'new_token')
524
525    def test_delete(self):
526        with self.app.test_request_context():
527
528            self.oauth2.storage.put(self._generate_credentials())
529            self.oauth2.storage.delete()
530
531            self.assertNotIn('google_oauth2_credentials', flask.session)
532