• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16
17import pytest
18
19from google.auth import _credentials_async as credentials
20from google.auth import _helpers
21
22
23class CredentialsImpl(credentials.Credentials):
24    def refresh(self, request):
25        self.token = request
26
27    def with_quota_project(self, quota_project_id):
28        raise NotImplementedError()
29
30
31def test_credentials_constructor():
32    credentials = CredentialsImpl()
33    assert not credentials.token
34    assert not credentials.expiry
35    assert not credentials.expired
36    assert not credentials.valid
37
38
39def test_expired_and_valid():
40    credentials = CredentialsImpl()
41    credentials.token = "token"
42
43    assert credentials.valid
44    assert not credentials.expired
45
46    # Set the expiration to one second more than now plus the clock skew
47    # accomodation. These credentials should be valid.
48    credentials.expiry = (
49        datetime.datetime.utcnow()
50        + _helpers.REFRESH_THRESHOLD
51        + datetime.timedelta(seconds=1)
52    )
53
54    assert credentials.valid
55    assert not credentials.expired
56
57    # Set the credentials expiration to now. Because of the clock skew
58    # accomodation, these credentials should report as expired.
59    credentials.expiry = datetime.datetime.utcnow()
60
61    assert not credentials.valid
62    assert credentials.expired
63
64
65@pytest.mark.asyncio
66async def test_before_request():
67    credentials = CredentialsImpl()
68    request = "token"
69    headers = {}
70
71    # First call should call refresh, setting the token.
72    await credentials.before_request(request, "http://example.com", "GET", headers)
73    assert credentials.valid
74    assert credentials.token == "token"
75    assert headers["authorization"] == "Bearer token"
76
77    request = "token2"
78    headers = {}
79
80    # Second call shouldn't call refresh.
81    credentials.before_request(request, "http://example.com", "GET", headers)
82
83    assert credentials.valid
84    assert credentials.token == "token"
85
86
87def test_anonymous_credentials_ctor():
88    anon = credentials.AnonymousCredentials()
89
90    assert anon.token is None
91    assert anon.expiry is None
92    assert not anon.expired
93    assert anon.valid
94
95
96def test_anonymous_credentials_refresh():
97    anon = credentials.AnonymousCredentials()
98
99    request = object()
100    with pytest.raises(ValueError):
101        anon.refresh(request)
102
103
104def test_anonymous_credentials_apply_default():
105    anon = credentials.AnonymousCredentials()
106    headers = {}
107    anon.apply(headers)
108    assert headers == {}
109    with pytest.raises(ValueError):
110        anon.apply(headers, token="TOKEN")
111
112
113def test_anonymous_credentials_before_request():
114    anon = credentials.AnonymousCredentials()
115    request = object()
116    method = "GET"
117    url = "https://example.com/api/endpoint"
118    headers = {}
119    anon.before_request(request, method, url, headers)
120    assert headers == {}
121
122
123class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl):
124    @property
125    def requires_scopes(self):
126        return super(ReadOnlyScopedCredentialsImpl, self).requires_scopes
127
128
129def test_readonly_scoped_credentials_constructor():
130    credentials = ReadOnlyScopedCredentialsImpl()
131    assert credentials._scopes is None
132
133
134def test_readonly_scoped_credentials_scopes():
135    credentials = ReadOnlyScopedCredentialsImpl()
136    credentials._scopes = ["one", "two"]
137    assert credentials.scopes == ["one", "two"]
138    assert credentials.has_scopes(["one"])
139    assert credentials.has_scopes(["two"])
140    assert credentials.has_scopes(["one", "two"])
141    assert not credentials.has_scopes(["three"])
142
143
144def test_readonly_scoped_credentials_requires_scopes():
145    credentials = ReadOnlyScopedCredentialsImpl()
146    assert not credentials.requires_scopes
147
148
149class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl):
150    def __init__(self, scopes=None):
151        super(RequiresScopedCredentialsImpl, self).__init__()
152        self._scopes = scopes
153
154    @property
155    def requires_scopes(self):
156        return not self.scopes
157
158    def with_scopes(self, scopes):
159        return RequiresScopedCredentialsImpl(scopes=scopes)
160
161
162def test_create_scoped_if_required_scoped():
163    unscoped_credentials = RequiresScopedCredentialsImpl()
164    scoped_credentials = credentials.with_scopes_if_required(
165        unscoped_credentials, ["one", "two"]
166    )
167
168    assert scoped_credentials is not unscoped_credentials
169    assert not scoped_credentials.requires_scopes
170    assert scoped_credentials.has_scopes(["one", "two"])
171
172
173def test_create_scoped_if_required_not_scopes():
174    unscoped_credentials = CredentialsImpl()
175    scoped_credentials = credentials.with_scopes_if_required(
176        unscoped_credentials, ["one", "two"]
177    )
178
179    assert scoped_credentials is unscoped_credentials
180