1# Copyright 2014 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unit tests for oauth2client.clientsecrets.""" 16 17import errno 18from io import StringIO 19import os 20import tempfile 21 22import unittest2 23 24import oauth2client 25from oauth2client import _helpers 26from oauth2client import clientsecrets 27 28 29__author__ = 'jcgregorio@google.com (Joe Gregorio)' 30 31 32DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') 33VALID_FILE = os.path.join(DATA_DIR, 'client_secrets.json') 34INVALID_FILE = os.path.join(DATA_DIR, 'unfilled_client_secrets.json') 35NONEXISTENT_FILE = os.path.join( 36 os.path.dirname(__file__), 'afilethatisntthere.json') 37 38 39class Test__validate_clientsecrets(unittest2.TestCase): 40 41 def test_with_none(self): 42 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 43 clientsecrets._validate_clientsecrets(None) 44 45 def test_with_other_than_one_key(self): 46 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 47 clientsecrets._validate_clientsecrets({}) 48 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 49 clientsecrets._validate_clientsecrets({'one': 'val', 'two': 'val'}) 50 51 def test_with_non_dictionary(self): 52 non_dict = [None] 53 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 54 clientsecrets._validate_clientsecrets(non_dict) 55 56 def test_invalid_client_type(self): 57 fake_type = 'fake_type' 58 self.assertNotEqual(fake_type, clientsecrets.TYPE_WEB) 59 self.assertNotEqual(fake_type, clientsecrets.TYPE_INSTALLED) 60 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 61 clientsecrets._validate_clientsecrets({fake_type: None}) 62 63 def test_missing_required_type_web(self): 64 required = clientsecrets.VALID_CLIENT[ 65 clientsecrets.TYPE_WEB]['required'] 66 # We will certainly have less than all 5 keys. 67 self.assertEqual(len(required), 5) 68 69 clientsecrets_dict = { 70 clientsecrets.TYPE_WEB: {'not_required': None}, 71 } 72 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 73 clientsecrets._validate_clientsecrets(clientsecrets_dict) 74 75 def test_string_not_configured_type_web(self): 76 string_props = clientsecrets.VALID_CLIENT[ 77 clientsecrets.TYPE_WEB]['string'] 78 79 self.assertTrue('client_id' in string_props) 80 clientsecrets_dict = { 81 clientsecrets.TYPE_WEB: { 82 'client_id': '[[template]]', 83 'client_secret': 'seekrit', 84 'redirect_uris': None, 85 'auth_uri': None, 86 'token_uri': None, 87 }, 88 } 89 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 90 clientsecrets._validate_clientsecrets(clientsecrets_dict) 91 92 def test_missing_required_type_installed(self): 93 required = clientsecrets.VALID_CLIENT[ 94 clientsecrets.TYPE_INSTALLED]['required'] 95 # We will certainly have less than all 5 keys. 96 self.assertEqual(len(required), 5) 97 98 clientsecrets_dict = { 99 clientsecrets.TYPE_INSTALLED: {'not_required': None}, 100 } 101 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 102 clientsecrets._validate_clientsecrets(clientsecrets_dict) 103 104 def test_string_not_configured_type_installed(self): 105 string_props = clientsecrets.VALID_CLIENT[ 106 clientsecrets.TYPE_INSTALLED]['string'] 107 108 self.assertTrue('client_id' in string_props) 109 clientsecrets_dict = { 110 clientsecrets.TYPE_INSTALLED: { 111 'client_id': '[[template]]', 112 'client_secret': 'seekrit', 113 'redirect_uris': None, 114 'auth_uri': None, 115 'token_uri': None, 116 }, 117 } 118 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 119 clientsecrets._validate_clientsecrets(clientsecrets_dict) 120 121 def test_success_type_web(self): 122 client_info = { 123 'client_id': 'eye-dee', 124 'client_secret': 'seekrit', 125 'redirect_uris': None, 126 'auth_uri': None, 127 'token_uri': None, 128 } 129 clientsecrets_dict = { 130 clientsecrets.TYPE_WEB: client_info, 131 } 132 result = clientsecrets._validate_clientsecrets(clientsecrets_dict) 133 self.assertEqual(result, (clientsecrets.TYPE_WEB, client_info)) 134 135 def test_success_type_installed(self): 136 client_info = { 137 'client_id': 'eye-dee', 138 'client_secret': 'seekrit', 139 'redirect_uris': None, 140 'auth_uri': None, 141 'token_uri': None, 142 } 143 clientsecrets_dict = { 144 clientsecrets.TYPE_INSTALLED: client_info, 145 } 146 result = clientsecrets._validate_clientsecrets(clientsecrets_dict) 147 self.assertEqual(result, (clientsecrets.TYPE_INSTALLED, client_info)) 148 149 150class Test__loadfile(unittest2.TestCase): 151 152 def test_success(self): 153 client_type, client_info = clientsecrets._loadfile(VALID_FILE) 154 expected_client_info = { 155 'client_id': 'foo_client_id', 156 'client_secret': 'foo_client_secret', 157 'redirect_uris': [], 158 'auth_uri': oauth2client.GOOGLE_AUTH_URI, 159 'token_uri': oauth2client.GOOGLE_TOKEN_URI, 160 'revoke_uri': oauth2client.GOOGLE_REVOKE_URI, 161 } 162 self.assertEqual(client_type, clientsecrets.TYPE_WEB) 163 self.assertEqual(client_info, expected_client_info) 164 165 def test_non_existent(self): 166 path = os.path.join(DATA_DIR, 'fake.json') 167 self.assertFalse(os.path.exists(path)) 168 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 169 clientsecrets._loadfile(path) 170 171 def test_bad_json(self): 172 filename = tempfile.mktemp() 173 with open(filename, 'wb') as file_obj: 174 file_obj.write(b'[') 175 with self.assertRaises(ValueError): 176 clientsecrets._loadfile(filename) 177 178 179class OAuth2CredentialsTests(unittest2.TestCase): 180 181 def test_validate_error(self): 182 payload = ( 183 b'{' 184 b' "web": {' 185 b' "client_id": "[[CLIENT ID REQUIRED]]",' 186 b' "client_secret": "[[CLIENT SECRET REQUIRED]]",' 187 b' "redirect_uris": ["http://localhost:8080/oauth2callback"],' 188 b' "auth_uri": "",' 189 b' "token_uri": ""' 190 b' }' 191 b'}') 192 ERRORS = [ 193 ('{}', 'Invalid'), 194 ('{"foo": {}}', 'Unknown'), 195 ('{"web": {}}', 'Missing'), 196 ('{"web": {"client_id": "dkkd"}}', 'Missing'), 197 (payload, 'Property'), 198 ] 199 for src, match in ERRORS: 200 # Ensure that it is unicode 201 src = _helpers._from_bytes(src) 202 # Test load(s) 203 with self.assertRaises( 204 clientsecrets.InvalidClientSecretsError) as exc_manager: 205 clientsecrets.loads(src) 206 207 self.assertTrue(str(exc_manager.exception).startswith(match)) 208 209 # Test loads(fp) 210 with self.assertRaises( 211 clientsecrets.InvalidClientSecretsError) as exc_manager: 212 fp = StringIO(src) 213 clientsecrets.load(fp) 214 215 self.assertTrue(str(exc_manager.exception).startswith(match)) 216 217 def test_load_by_filename_missing_file(self): 218 with self.assertRaises( 219 clientsecrets.InvalidClientSecretsError) as exc_manager: 220 clientsecrets._loadfile(NONEXISTENT_FILE) 221 222 self.assertEquals(exc_manager.exception.args[1], NONEXISTENT_FILE) 223 self.assertEquals(exc_manager.exception.args[3], errno.ENOENT) 224 225 226class CachedClientsecretsTests(unittest2.TestCase): 227 228 class CacheMock(object): 229 def __init__(self): 230 self.cache = {} 231 self.last_get_ns = None 232 self.last_set_ns = None 233 234 def get(self, key, namespace=''): 235 # ignoring namespace for easier testing 236 self.last_get_ns = namespace 237 return self.cache.get(key, None) 238 239 def set(self, key, value, namespace=''): 240 # ignoring namespace for easier testing 241 self.last_set_ns = namespace 242 self.cache[key] = value 243 244 def setUp(self): 245 self.cache_mock = self.CacheMock() 246 247 def test_cache_miss(self): 248 client_type, client_info = clientsecrets.loadfile( 249 VALID_FILE, cache=self.cache_mock) 250 self.assertEqual('web', client_type) 251 self.assertEqual('foo_client_secret', client_info['client_secret']) 252 253 cached = self.cache_mock.cache[VALID_FILE] 254 self.assertEqual({client_type: client_info}, cached) 255 256 # make sure we're using non-empty namespace 257 ns = self.cache_mock.last_set_ns 258 self.assertTrue(bool(ns)) 259 # make sure they're equal 260 self.assertEqual(ns, self.cache_mock.last_get_ns) 261 262 def test_cache_hit(self): 263 self.cache_mock.cache[NONEXISTENT_FILE] = {'web': 'secret info'} 264 265 client_type, client_info = clientsecrets.loadfile( 266 NONEXISTENT_FILE, cache=self.cache_mock) 267 self.assertEqual('web', client_type) 268 self.assertEqual('secret info', client_info) 269 # make sure we didn't do any set() RPCs 270 self.assertEqual(None, self.cache_mock.last_set_ns) 271 272 def test_validation(self): 273 with self.assertRaises(clientsecrets.InvalidClientSecretsError): 274 clientsecrets.loadfile(INVALID_FILE, cache=self.cache_mock) 275 276 def test_without_cache(self): 277 # this also ensures loadfile() is backward compatible 278 client_type, client_info = clientsecrets.loadfile(VALID_FILE) 279 self.assertEqual('web', client_type) 280 self.assertEqual('foo_client_secret', client_info['client_secret']) 281