• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17
18import mock
19import pytest
20from six.moves import http_client
21from six.moves import urllib
22
23from google.auth import _helpers
24from google.auth import credentials
25from google.auth import downscoped
26from google.auth import exceptions
27from google.auth import transport
28
29
30EXPRESSION = (
31    "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
32)
33TITLE = "customer-a-objects"
34DESCRIPTION = (
35    "Condition to make permissions available for objects starting with customer-a"
36)
37AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
38AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
39
40OTHER_EXPRESSION = (
41    "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
42)
43OTHER_TITLE = "customer-b-objects"
44OTHER_DESCRIPTION = (
45    "Condition to make permissions available for objects starting with customer-b"
46)
47OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
48OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
49QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
50GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
51REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
52TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
53SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
54SUCCESS_RESPONSE = {
55    "access_token": "ACCESS_TOKEN",
56    "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
57    "token_type": "Bearer",
58    "expires_in": 3600,
59}
60ERROR_RESPONSE = {
61    "error": "invalid_grant",
62    "error_description": "Subject token is invalid.",
63    "error_uri": "https://tools.ietf.org/html/rfc6749",
64}
65CREDENTIAL_ACCESS_BOUNDARY_JSON = {
66    "accessBoundary": {
67        "accessBoundaryRules": [
68            {
69                "availablePermissions": AVAILABLE_PERMISSIONS,
70                "availableResource": AVAILABLE_RESOURCE,
71                "availabilityCondition": {
72                    "expression": EXPRESSION,
73                    "title": TITLE,
74                    "description": DESCRIPTION,
75                },
76            }
77        ]
78    }
79}
80
81
82class SourceCredentials(credentials.Credentials):
83    def __init__(self, raise_error=False, expires_in=3600):
84        super(SourceCredentials, self).__init__()
85        self._counter = 0
86        self._raise_error = raise_error
87        self._expires_in = expires_in
88
89    def refresh(self, request):
90        if self._raise_error:
91            raise exceptions.RefreshError(
92                "Failed to refresh access token in source credentials."
93            )
94        now = _helpers.utcnow()
95        self._counter += 1
96        self.token = "ACCESS_TOKEN_{}".format(self._counter)
97        self.expiry = now + datetime.timedelta(seconds=self._expires_in)
98
99
100def make_availability_condition(expression, title=None, description=None):
101    return downscoped.AvailabilityCondition(expression, title, description)
102
103
104def make_access_boundary_rule(
105    available_resource, available_permissions, availability_condition=None
106):
107    return downscoped.AccessBoundaryRule(
108        available_resource, available_permissions, availability_condition
109    )
110
111
112def make_credential_access_boundary(rules):
113    return downscoped.CredentialAccessBoundary(rules)
114
115
116class TestAvailabilityCondition(object):
117    def test_constructor(self):
118        availability_condition = make_availability_condition(
119            EXPRESSION, TITLE, DESCRIPTION
120        )
121
122        assert availability_condition.expression == EXPRESSION
123        assert availability_condition.title == TITLE
124        assert availability_condition.description == DESCRIPTION
125
126    def test_constructor_required_params_only(self):
127        availability_condition = make_availability_condition(EXPRESSION)
128
129        assert availability_condition.expression == EXPRESSION
130        assert availability_condition.title is None
131        assert availability_condition.description is None
132
133    def test_setters(self):
134        availability_condition = make_availability_condition(
135            EXPRESSION, TITLE, DESCRIPTION
136        )
137        availability_condition.expression = OTHER_EXPRESSION
138        availability_condition.title = OTHER_TITLE
139        availability_condition.description = OTHER_DESCRIPTION
140
141        assert availability_condition.expression == OTHER_EXPRESSION
142        assert availability_condition.title == OTHER_TITLE
143        assert availability_condition.description == OTHER_DESCRIPTION
144
145    def test_invalid_expression_type(self):
146        with pytest.raises(TypeError) as excinfo:
147            make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
148
149        assert excinfo.match("The provided expression is not a string.")
150
151    def test_invalid_title_type(self):
152        with pytest.raises(TypeError) as excinfo:
153            make_availability_condition(EXPRESSION, False, DESCRIPTION)
154
155        assert excinfo.match("The provided title is not a string or None.")
156
157    def test_invalid_description_type(self):
158        with pytest.raises(TypeError) as excinfo:
159            make_availability_condition(EXPRESSION, TITLE, False)
160
161        assert excinfo.match("The provided description is not a string or None.")
162
163    def test_to_json_required_params_only(self):
164        availability_condition = make_availability_condition(EXPRESSION)
165
166        assert availability_condition.to_json() == {"expression": EXPRESSION}
167
168    def test_to_json_(self):
169        availability_condition = make_availability_condition(
170            EXPRESSION, TITLE, DESCRIPTION
171        )
172
173        assert availability_condition.to_json() == {
174            "expression": EXPRESSION,
175            "title": TITLE,
176            "description": DESCRIPTION,
177        }
178
179
180class TestAccessBoundaryRule(object):
181    def test_constructor(self):
182        availability_condition = make_availability_condition(
183            EXPRESSION, TITLE, DESCRIPTION
184        )
185        access_boundary_rule = make_access_boundary_rule(
186            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
187        )
188
189        assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
190        assert access_boundary_rule.available_permissions == tuple(
191            AVAILABLE_PERMISSIONS
192        )
193        assert access_boundary_rule.availability_condition == availability_condition
194
195    def test_constructor_required_params_only(self):
196        access_boundary_rule = make_access_boundary_rule(
197            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
198        )
199
200        assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
201        assert access_boundary_rule.available_permissions == tuple(
202            AVAILABLE_PERMISSIONS
203        )
204        assert access_boundary_rule.availability_condition is None
205
206    def test_setters(self):
207        availability_condition = make_availability_condition(
208            EXPRESSION, TITLE, DESCRIPTION
209        )
210        other_availability_condition = make_availability_condition(
211            OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
212        )
213        access_boundary_rule = make_access_boundary_rule(
214            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
215        )
216        access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
217        access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
218        access_boundary_rule.availability_condition = other_availability_condition
219
220        assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
221        assert access_boundary_rule.available_permissions == tuple(
222            OTHER_AVAILABLE_PERMISSIONS
223        )
224        assert (
225            access_boundary_rule.availability_condition == other_availability_condition
226        )
227
228    def test_invalid_available_resource_type(self):
229        availability_condition = make_availability_condition(
230            EXPRESSION, TITLE, DESCRIPTION
231        )
232        with pytest.raises(TypeError) as excinfo:
233            make_access_boundary_rule(
234                None, AVAILABLE_PERMISSIONS, availability_condition
235            )
236
237        assert excinfo.match("The provided available_resource is not a string.")
238
239    def test_invalid_available_permissions_type(self):
240        availability_condition = make_availability_condition(
241            EXPRESSION, TITLE, DESCRIPTION
242        )
243        with pytest.raises(TypeError) as excinfo:
244            make_access_boundary_rule(
245                AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
246            )
247
248        assert excinfo.match(
249            "Provided available_permissions are not a list of strings."
250        )
251
252    def test_invalid_available_permissions_value(self):
253        availability_condition = make_availability_condition(
254            EXPRESSION, TITLE, DESCRIPTION
255        )
256        with pytest.raises(ValueError) as excinfo:
257            make_access_boundary_rule(
258                AVAILABLE_RESOURCE,
259                ["roles/storage.objectViewer"],
260                availability_condition,
261            )
262
263        assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
264
265    def test_invalid_availability_condition_type(self):
266        with pytest.raises(TypeError) as excinfo:
267            make_access_boundary_rule(
268                AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
269            )
270
271        assert excinfo.match(
272            "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
273        )
274
275    def test_to_json(self):
276        availability_condition = make_availability_condition(
277            EXPRESSION, TITLE, DESCRIPTION
278        )
279        access_boundary_rule = make_access_boundary_rule(
280            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
281        )
282
283        assert access_boundary_rule.to_json() == {
284            "availablePermissions": AVAILABLE_PERMISSIONS,
285            "availableResource": AVAILABLE_RESOURCE,
286            "availabilityCondition": {
287                "expression": EXPRESSION,
288                "title": TITLE,
289                "description": DESCRIPTION,
290            },
291        }
292
293    def test_to_json_required_params_only(self):
294        access_boundary_rule = make_access_boundary_rule(
295            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
296        )
297
298        assert access_boundary_rule.to_json() == {
299            "availablePermissions": AVAILABLE_PERMISSIONS,
300            "availableResource": AVAILABLE_RESOURCE,
301        }
302
303
304class TestCredentialAccessBoundary(object):
305    def test_constructor(self):
306        availability_condition = make_availability_condition(
307            EXPRESSION, TITLE, DESCRIPTION
308        )
309        access_boundary_rule = make_access_boundary_rule(
310            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
311        )
312        rules = [access_boundary_rule]
313        credential_access_boundary = make_credential_access_boundary(rules)
314
315        assert credential_access_boundary.rules == tuple(rules)
316
317    def test_setters(self):
318        availability_condition = make_availability_condition(
319            EXPRESSION, TITLE, DESCRIPTION
320        )
321        access_boundary_rule = make_access_boundary_rule(
322            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
323        )
324        rules = [access_boundary_rule]
325        other_availability_condition = make_availability_condition(
326            OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
327        )
328        other_access_boundary_rule = make_access_boundary_rule(
329            OTHER_AVAILABLE_RESOURCE,
330            OTHER_AVAILABLE_PERMISSIONS,
331            other_availability_condition,
332        )
333        other_rules = [other_access_boundary_rule]
334        credential_access_boundary = make_credential_access_boundary(rules)
335        credential_access_boundary.rules = other_rules
336
337        assert credential_access_boundary.rules == tuple(other_rules)
338
339    def test_add_rule(self):
340        availability_condition = make_availability_condition(
341            EXPRESSION, TITLE, DESCRIPTION
342        )
343        access_boundary_rule = make_access_boundary_rule(
344            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
345        )
346        rules = [access_boundary_rule] * 9
347        credential_access_boundary = make_credential_access_boundary(rules)
348
349        # Add one more rule. This should not raise an error.
350        additional_access_boundary_rule = make_access_boundary_rule(
351            OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
352        )
353        credential_access_boundary.add_rule(additional_access_boundary_rule)
354
355        assert len(credential_access_boundary.rules) == 10
356        assert credential_access_boundary.rules[9] == additional_access_boundary_rule
357
358    def test_add_rule_invalid_value(self):
359        availability_condition = make_availability_condition(
360            EXPRESSION, TITLE, DESCRIPTION
361        )
362        access_boundary_rule = make_access_boundary_rule(
363            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
364        )
365        rules = [access_boundary_rule] * 10
366        credential_access_boundary = make_credential_access_boundary(rules)
367
368        # Add one more rule to exceed maximum allowed rules.
369        with pytest.raises(ValueError) as excinfo:
370            credential_access_boundary.add_rule(access_boundary_rule)
371
372        assert excinfo.match(
373            "Credential access boundary rules can have a maximum of 10 rules."
374        )
375        assert len(credential_access_boundary.rules) == 10
376
377    def test_add_rule_invalid_type(self):
378        availability_condition = make_availability_condition(
379            EXPRESSION, TITLE, DESCRIPTION
380        )
381        access_boundary_rule = make_access_boundary_rule(
382            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
383        )
384        rules = [access_boundary_rule]
385        credential_access_boundary = make_credential_access_boundary(rules)
386
387        # Add an invalid rule to exceed maximum allowed rules.
388        with pytest.raises(TypeError) as excinfo:
389            credential_access_boundary.add_rule("invalid")
390
391        assert excinfo.match(
392            "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
393        )
394        assert len(credential_access_boundary.rules) == 1
395        assert credential_access_boundary.rules[0] == access_boundary_rule
396
397    def test_invalid_rules_type(self):
398        with pytest.raises(TypeError) as excinfo:
399            make_credential_access_boundary(["invalid"])
400
401        assert excinfo.match(
402            "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
403        )
404
405    def test_invalid_rules_value(self):
406        availability_condition = make_availability_condition(
407            EXPRESSION, TITLE, DESCRIPTION
408        )
409        access_boundary_rule = make_access_boundary_rule(
410            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
411        )
412        too_many_rules = [access_boundary_rule] * 11
413        with pytest.raises(ValueError) as excinfo:
414            make_credential_access_boundary(too_many_rules)
415
416        assert excinfo.match(
417            "Credential access boundary rules can have a maximum of 10 rules."
418        )
419
420    def test_to_json(self):
421        availability_condition = make_availability_condition(
422            EXPRESSION, TITLE, DESCRIPTION
423        )
424        access_boundary_rule = make_access_boundary_rule(
425            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
426        )
427        rules = [access_boundary_rule]
428        credential_access_boundary = make_credential_access_boundary(rules)
429
430        assert credential_access_boundary.to_json() == {
431            "accessBoundary": {
432                "accessBoundaryRules": [
433                    {
434                        "availablePermissions": AVAILABLE_PERMISSIONS,
435                        "availableResource": AVAILABLE_RESOURCE,
436                        "availabilityCondition": {
437                            "expression": EXPRESSION,
438                            "title": TITLE,
439                            "description": DESCRIPTION,
440                        },
441                    }
442                ]
443            }
444        }
445
446
447class TestCredentials(object):
448    @staticmethod
449    def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
450        availability_condition = make_availability_condition(
451            EXPRESSION, TITLE, DESCRIPTION
452        )
453        access_boundary_rule = make_access_boundary_rule(
454            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
455        )
456        rules = [access_boundary_rule]
457        credential_access_boundary = make_credential_access_boundary(rules)
458
459        return downscoped.Credentials(
460            source_credentials, credential_access_boundary, quota_project_id
461        )
462
463    @staticmethod
464    def make_mock_request(data, status=http_client.OK):
465        response = mock.create_autospec(transport.Response, instance=True)
466        response.status = status
467        response.data = json.dumps(data).encode("utf-8")
468
469        request = mock.create_autospec(transport.Request)
470        request.return_value = response
471
472        return request
473
474    @staticmethod
475    def assert_request_kwargs(request_kwargs, headers, request_data):
476        """Asserts the request was called with the expected parameters.
477        """
478        assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
479        assert request_kwargs["method"] == "POST"
480        assert request_kwargs["headers"] == headers
481        assert request_kwargs["body"] is not None
482        body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
483        for (k, v) in body_tuples:
484            assert v.decode("utf-8") == request_data[k.decode("utf-8")]
485        assert len(body_tuples) == len(request_data.keys())
486
487    def test_default_state(self):
488        credentials = self.make_credentials()
489
490        # No token acquired yet.
491        assert not credentials.token
492        assert not credentials.valid
493        # Expiration hasn't been set yet.
494        assert not credentials.expiry
495        assert not credentials.expired
496        # No quota project ID set.
497        assert not credentials.quota_project_id
498
499    def test_with_quota_project(self):
500        credentials = self.make_credentials()
501
502        assert not credentials.quota_project_id
503
504        quota_project_creds = credentials.with_quota_project("project-foo")
505
506        assert quota_project_creds.quota_project_id == "project-foo"
507
508    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
509    def test_refresh(self, unused_utcnow):
510        response = SUCCESS_RESPONSE.copy()
511        # Test custom expiration to confirm expiry is set correctly.
512        response["expires_in"] = 2800
513        expected_expiry = datetime.datetime.min + datetime.timedelta(
514            seconds=response["expires_in"]
515        )
516        headers = {"Content-Type": "application/x-www-form-urlencoded"}
517        request_data = {
518            "grant_type": GRANT_TYPE,
519            "subject_token": "ACCESS_TOKEN_1",
520            "subject_token_type": SUBJECT_TOKEN_TYPE,
521            "requested_token_type": REQUESTED_TOKEN_TYPE,
522            "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
523        }
524        request = self.make_mock_request(status=http_client.OK, data=response)
525        source_credentials = SourceCredentials()
526        credentials = self.make_credentials(source_credentials=source_credentials)
527
528        # Spy on calls to source credentials refresh to confirm the expected request
529        # instance is used.
530        with mock.patch.object(
531            source_credentials, "refresh", wraps=source_credentials.refresh
532        ) as wrapped_souce_cred_refresh:
533            credentials.refresh(request)
534
535            self.assert_request_kwargs(request.call_args[1], headers, request_data)
536            assert credentials.valid
537            assert credentials.expiry == expected_expiry
538            assert not credentials.expired
539            assert credentials.token == response["access_token"]
540            # Confirm source credentials called with the same request instance.
541            wrapped_souce_cred_refresh.assert_called_with(request)
542
543    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
544    def test_refresh_without_response_expires_in(self, unused_utcnow):
545        response = SUCCESS_RESPONSE.copy()
546        # Simulate the response is missing the expires_in field.
547        # The downscoped token expiration should match the source credentials
548        # expiration.
549        del response["expires_in"]
550        expected_expires_in = 1800
551        # Simulate the source credentials generates a token with 1800 second
552        # expiration time. The generated downscoped token should have the same
553        # expiration time.
554        source_credentials = SourceCredentials(expires_in=expected_expires_in)
555        expected_expiry = datetime.datetime.min + datetime.timedelta(
556            seconds=expected_expires_in
557        )
558        headers = {"Content-Type": "application/x-www-form-urlencoded"}
559        request_data = {
560            "grant_type": GRANT_TYPE,
561            "subject_token": "ACCESS_TOKEN_1",
562            "subject_token_type": SUBJECT_TOKEN_TYPE,
563            "requested_token_type": REQUESTED_TOKEN_TYPE,
564            "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
565        }
566        request = self.make_mock_request(status=http_client.OK, data=response)
567        credentials = self.make_credentials(source_credentials=source_credentials)
568
569        # Spy on calls to source credentials refresh to confirm the expected request
570        # instance is used.
571        with mock.patch.object(
572            source_credentials, "refresh", wraps=source_credentials.refresh
573        ) as wrapped_souce_cred_refresh:
574            credentials.refresh(request)
575
576            self.assert_request_kwargs(request.call_args[1], headers, request_data)
577            assert credentials.valid
578            assert credentials.expiry == expected_expiry
579            assert not credentials.expired
580            assert credentials.token == response["access_token"]
581            # Confirm source credentials called with the same request instance.
582            wrapped_souce_cred_refresh.assert_called_with(request)
583
584    def test_refresh_token_exchange_error(self):
585        request = self.make_mock_request(
586            status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
587        )
588        credentials = self.make_credentials()
589
590        with pytest.raises(exceptions.OAuthError) as excinfo:
591            credentials.refresh(request)
592
593        assert excinfo.match(
594            r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
595        )
596        assert not credentials.expired
597        assert credentials.token is None
598
599    def test_refresh_source_credentials_refresh_error(self):
600        # Initialize downscoped credentials with source credentials that raise
601        # an error on refresh.
602        credentials = self.make_credentials(
603            source_credentials=SourceCredentials(raise_error=True)
604        )
605
606        with pytest.raises(exceptions.RefreshError) as excinfo:
607            credentials.refresh(mock.sentinel.request)
608
609        assert excinfo.match(r"Failed to refresh access token in source credentials.")
610        assert not credentials.expired
611        assert credentials.token is None
612
613    def test_apply_without_quota_project_id(self):
614        headers = {}
615        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
616        credentials = self.make_credentials()
617
618        credentials.refresh(request)
619        credentials.apply(headers)
620
621        assert headers == {
622            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
623        }
624
625    def test_apply_with_quota_project_id(self):
626        headers = {"other": "header-value"}
627        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
628        credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
629
630        credentials.refresh(request)
631        credentials.apply(headers)
632
633        assert headers == {
634            "other": "header-value",
635            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
636            "x-goog-user-project": QUOTA_PROJECT_ID,
637        }
638
639    def test_before_request(self):
640        headers = {"other": "header-value"}
641        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
642        credentials = self.make_credentials()
643
644        # First call should call refresh, setting the token.
645        credentials.before_request(request, "POST", "https://example.com/api", headers)
646
647        assert headers == {
648            "other": "header-value",
649            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
650        }
651
652        # Second call shouldn't call refresh (request should be untouched).
653        credentials.before_request(
654            mock.sentinel.request, "POST", "https://example.com/api", headers
655        )
656
657        assert headers == {
658            "other": "header-value",
659            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
660        }
661
662    @mock.patch("google.auth._helpers.utcnow")
663    def test_before_request_expired(self, utcnow):
664        headers = {}
665        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
666        credentials = self.make_credentials()
667        credentials.token = "token"
668        utcnow.return_value = datetime.datetime.min
669        # Set the expiration to one second more than now plus the clock skew
670        # accommodation. These credentials should be valid.
671        credentials.expiry = (
672            datetime.datetime.min
673            + _helpers.REFRESH_THRESHOLD
674            + datetime.timedelta(seconds=1)
675        )
676
677        assert credentials.valid
678        assert not credentials.expired
679
680        credentials.before_request(request, "POST", "https://example.com/api", headers)
681
682        # Cached token should be used.
683        assert headers == {"authorization": "Bearer token"}
684
685        # Next call should simulate 1 second passed.
686        utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
687
688        assert not credentials.valid
689        assert credentials.expired
690
691        credentials.before_request(request, "POST", "https://example.com/api", headers)
692
693        # New token should be retrieved.
694        assert headers == {
695            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
696        }
697