• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16
17import mock
18import pytest
19
20from google.auth import app_engine
21
22
23class _AppIdentityModule(object):
24    """The interface of the App Idenity app engine module.
25    See https://cloud.google.com/appengine/docs/standard/python/refdocs
26    /google.appengine.api.app_identity.app_identity
27    """
28
29    def get_application_id(self):
30        raise NotImplementedError()
31
32    def sign_blob(self, bytes_to_sign, deadline=None):
33        raise NotImplementedError()
34
35    def get_service_account_name(self, deadline=None):
36        raise NotImplementedError()
37
38    def get_access_token(self, scopes, service_account_id=None):
39        raise NotImplementedError()
40
41
42@pytest.fixture
43def app_identity(monkeypatch):
44    """Mocks the app_identity module for google.auth.app_engine."""
45    app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True)
46    monkeypatch.setattr(app_engine, "app_identity", app_identity_module)
47    yield app_identity_module
48
49
50def test_get_project_id(app_identity):
51    app_identity.get_application_id.return_value = mock.sentinel.project
52    assert app_engine.get_project_id() == mock.sentinel.project
53
54
55@mock.patch.object(app_engine, "app_identity", new=None)
56def test_get_project_id_missing_apis():
57    with pytest.raises(EnvironmentError) as excinfo:
58        assert app_engine.get_project_id()
59
60    assert excinfo.match(r"App Engine APIs are not available")
61
62
63class TestSigner(object):
64    def test_key_id(self, app_identity):
65        app_identity.sign_blob.return_value = (
66            mock.sentinel.key_id,
67            mock.sentinel.signature,
68        )
69
70        signer = app_engine.Signer()
71
72        assert signer.key_id is None
73
74    def test_sign(self, app_identity):
75        app_identity.sign_blob.return_value = (
76            mock.sentinel.key_id,
77            mock.sentinel.signature,
78        )
79
80        signer = app_engine.Signer()
81        to_sign = b"123"
82
83        signature = signer.sign(to_sign)
84
85        assert signature == mock.sentinel.signature
86        app_identity.sign_blob.assert_called_with(to_sign)
87
88
89class TestCredentials(object):
90    @mock.patch.object(app_engine, "app_identity", new=None)
91    def test_missing_apis(self):
92        with pytest.raises(EnvironmentError) as excinfo:
93            app_engine.Credentials()
94
95        assert excinfo.match(r"App Engine APIs are not available")
96
97    def test_default_state(self, app_identity):
98        credentials = app_engine.Credentials()
99
100        # Not token acquired yet
101        assert not credentials.valid
102        # Expiration hasn't been set yet
103        assert not credentials.expired
104        # Scopes are required
105        assert not credentials.scopes
106        assert not credentials.default_scopes
107        assert credentials.requires_scopes
108        assert not credentials.quota_project_id
109
110    def test_with_scopes(self, app_identity):
111        credentials = app_engine.Credentials()
112
113        assert not credentials.scopes
114        assert credentials.requires_scopes
115
116        scoped_credentials = credentials.with_scopes(["email"])
117
118        assert scoped_credentials.has_scopes(["email"])
119        assert not scoped_credentials.requires_scopes
120
121    def test_with_default_scopes(self, app_identity):
122        credentials = app_engine.Credentials()
123
124        assert not credentials.scopes
125        assert not credentials.default_scopes
126        assert credentials.requires_scopes
127
128        scoped_credentials = credentials.with_scopes(
129            scopes=None, default_scopes=["email"]
130        )
131
132        assert scoped_credentials.has_scopes(["email"])
133        assert not scoped_credentials.requires_scopes
134
135    def test_with_quota_project(self, app_identity):
136        credentials = app_engine.Credentials()
137
138        assert not credentials.scopes
139        assert not credentials.quota_project_id
140
141        quota_project_creds = credentials.with_quota_project("project-foo")
142
143        assert quota_project_creds.quota_project_id == "project-foo"
144
145    def test_service_account_email_implicit(self, app_identity):
146        app_identity.get_service_account_name.return_value = (
147            mock.sentinel.service_account_email
148        )
149        credentials = app_engine.Credentials()
150
151        assert credentials.service_account_email == mock.sentinel.service_account_email
152        assert app_identity.get_service_account_name.called
153
154    def test_service_account_email_explicit(self, app_identity):
155        credentials = app_engine.Credentials(
156            service_account_id=mock.sentinel.service_account_email
157        )
158
159        assert credentials.service_account_email == mock.sentinel.service_account_email
160        assert not app_identity.get_service_account_name.called
161
162    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
163    def test_refresh(self, utcnow, app_identity):
164        token = "token"
165        ttl = 643942923
166        app_identity.get_access_token.return_value = token, ttl
167        credentials = app_engine.Credentials(
168            scopes=["email"], default_scopes=["profile"]
169        )
170
171        credentials.refresh(None)
172
173        app_identity.get_access_token.assert_called_with(
174            credentials.scopes, credentials._service_account_id
175        )
176        assert credentials.token == token
177        assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3)
178        assert credentials.valid
179        assert not credentials.expired
180
181    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
182    def test_refresh_with_default_scopes(self, utcnow, app_identity):
183        token = "token"
184        ttl = 643942923
185        app_identity.get_access_token.return_value = token, ttl
186        credentials = app_engine.Credentials(default_scopes=["email"])
187
188        credentials.refresh(None)
189
190        app_identity.get_access_token.assert_called_with(
191            credentials.default_scopes, credentials._service_account_id
192        )
193        assert credentials.token == token
194        assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3)
195        assert credentials.valid
196        assert not credentials.expired
197
198    def test_sign_bytes(self, app_identity):
199        app_identity.sign_blob.return_value = (
200            mock.sentinel.key_id,
201            mock.sentinel.signature,
202        )
203        credentials = app_engine.Credentials()
204        to_sign = b"123"
205
206        signature = credentials.sign_bytes(to_sign)
207
208        assert signature == mock.sentinel.signature
209        app_identity.sign_blob.assert_called_with(to_sign)
210
211    def test_signer(self, app_identity):
212        credentials = app_engine.Credentials()
213        assert isinstance(credentials.signer, app_engine.Signer)
214
215    def test_signer_email(self, app_identity):
216        credentials = app_engine.Credentials()
217        assert credentials.signer_email == credentials.service_account_email
218