1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16 17import mock 18import pytest 19 20from google.auth import app_engine 21 22 23class _AppIdentityModule(object): 24 """The interface of the App Idenity app engine module. 25 See https://cloud.google.com/appengine/docs/standard/python/refdocs 26 /google.appengine.api.app_identity.app_identity 27 """ 28 29 def get_application_id(self): 30 raise NotImplementedError() 31 32 def sign_blob(self, bytes_to_sign, deadline=None): 33 raise NotImplementedError() 34 35 def get_service_account_name(self, deadline=None): 36 raise NotImplementedError() 37 38 def get_access_token(self, scopes, service_account_id=None): 39 raise NotImplementedError() 40 41 42@pytest.fixture 43def app_identity(monkeypatch): 44 """Mocks the app_identity module for google.auth.app_engine.""" 45 app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True) 46 monkeypatch.setattr(app_engine, "app_identity", app_identity_module) 47 yield app_identity_module 48 49 50def test_get_project_id(app_identity): 51 app_identity.get_application_id.return_value = mock.sentinel.project 52 assert app_engine.get_project_id() == mock.sentinel.project 53 54 55@mock.patch.object(app_engine, "app_identity", new=None) 56def test_get_project_id_missing_apis(): 57 with pytest.raises(EnvironmentError) as excinfo: 58 assert app_engine.get_project_id() 59 60 assert excinfo.match(r"App Engine APIs are not available") 61 62 63class TestSigner(object): 64 def test_key_id(self, app_identity): 65 app_identity.sign_blob.return_value = ( 66 mock.sentinel.key_id, 67 mock.sentinel.signature, 68 ) 69 70 signer = app_engine.Signer() 71 72 assert signer.key_id is None 73 74 def test_sign(self, app_identity): 75 app_identity.sign_blob.return_value = ( 76 mock.sentinel.key_id, 77 mock.sentinel.signature, 78 ) 79 80 signer = app_engine.Signer() 81 to_sign = b"123" 82 83 signature = signer.sign(to_sign) 84 85 assert signature == mock.sentinel.signature 86 app_identity.sign_blob.assert_called_with(to_sign) 87 88 89class TestCredentials(object): 90 @mock.patch.object(app_engine, "app_identity", new=None) 91 def test_missing_apis(self): 92 with pytest.raises(EnvironmentError) as excinfo: 93 app_engine.Credentials() 94 95 assert excinfo.match(r"App Engine APIs are not available") 96 97 def test_default_state(self, app_identity): 98 credentials = app_engine.Credentials() 99 100 # Not token acquired yet 101 assert not credentials.valid 102 # Expiration hasn't been set yet 103 assert not credentials.expired 104 # Scopes are required 105 assert not credentials.scopes 106 assert not credentials.default_scopes 107 assert credentials.requires_scopes 108 assert not credentials.quota_project_id 109 110 def test_with_scopes(self, app_identity): 111 credentials = app_engine.Credentials() 112 113 assert not credentials.scopes 114 assert credentials.requires_scopes 115 116 scoped_credentials = credentials.with_scopes(["email"]) 117 118 assert scoped_credentials.has_scopes(["email"]) 119 assert not scoped_credentials.requires_scopes 120 121 def test_with_default_scopes(self, app_identity): 122 credentials = app_engine.Credentials() 123 124 assert not credentials.scopes 125 assert not credentials.default_scopes 126 assert credentials.requires_scopes 127 128 scoped_credentials = credentials.with_scopes( 129 scopes=None, default_scopes=["email"] 130 ) 131 132 assert scoped_credentials.has_scopes(["email"]) 133 assert not scoped_credentials.requires_scopes 134 135 def test_with_quota_project(self, app_identity): 136 credentials = app_engine.Credentials() 137 138 assert not credentials.scopes 139 assert not credentials.quota_project_id 140 141 quota_project_creds = credentials.with_quota_project("project-foo") 142 143 assert quota_project_creds.quota_project_id == "project-foo" 144 145 def test_service_account_email_implicit(self, app_identity): 146 app_identity.get_service_account_name.return_value = ( 147 mock.sentinel.service_account_email 148 ) 149 credentials = app_engine.Credentials() 150 151 assert credentials.service_account_email == mock.sentinel.service_account_email 152 assert app_identity.get_service_account_name.called 153 154 def test_service_account_email_explicit(self, app_identity): 155 credentials = app_engine.Credentials( 156 service_account_id=mock.sentinel.service_account_email 157 ) 158 159 assert credentials.service_account_email == mock.sentinel.service_account_email 160 assert not app_identity.get_service_account_name.called 161 162 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 163 def test_refresh(self, utcnow, app_identity): 164 token = "token" 165 ttl = 643942923 166 app_identity.get_access_token.return_value = token, ttl 167 credentials = app_engine.Credentials( 168 scopes=["email"], default_scopes=["profile"] 169 ) 170 171 credentials.refresh(None) 172 173 app_identity.get_access_token.assert_called_with( 174 credentials.scopes, credentials._service_account_id 175 ) 176 assert credentials.token == token 177 assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3) 178 assert credentials.valid 179 assert not credentials.expired 180 181 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 182 def test_refresh_with_default_scopes(self, utcnow, app_identity): 183 token = "token" 184 ttl = 643942923 185 app_identity.get_access_token.return_value = token, ttl 186 credentials = app_engine.Credentials(default_scopes=["email"]) 187 188 credentials.refresh(None) 189 190 app_identity.get_access_token.assert_called_with( 191 credentials.default_scopes, credentials._service_account_id 192 ) 193 assert credentials.token == token 194 assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3) 195 assert credentials.valid 196 assert not credentials.expired 197 198 def test_sign_bytes(self, app_identity): 199 app_identity.sign_blob.return_value = ( 200 mock.sentinel.key_id, 201 mock.sentinel.signature, 202 ) 203 credentials = app_engine.Credentials() 204 to_sign = b"123" 205 206 signature = credentials.sign_bytes(to_sign) 207 208 assert signature == mock.sentinel.signature 209 app_identity.sign_blob.assert_called_with(to_sign) 210 211 def test_signer(self, app_identity): 212 credentials = app_engine.Credentials() 213 assert isinstance(credentials.signer, app_engine.Signer) 214 215 def test_signer_email(self, app_identity): 216 credentials = app_engine.Credentials() 217 assert credentials.signer_email == credentials.service_account_email 218