1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import json 17 18import mock 19import pytest 20from six.moves import http_client 21from six.moves import urllib 22 23from google.auth import _helpers 24from google.auth import credentials 25from google.auth import downscoped 26from google.auth import exceptions 27from google.auth import transport 28 29 30EXPRESSION = ( 31 "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')" 32) 33TITLE = "customer-a-objects" 34DESCRIPTION = ( 35 "Condition to make permissions available for objects starting with customer-a" 36) 37AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket" 38AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"] 39 40OTHER_EXPRESSION = ( 41 "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')" 42) 43OTHER_TITLE = "customer-b-objects" 44OTHER_DESCRIPTION = ( 45 "Condition to make permissions available for objects starting with customer-b" 46) 47OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket" 48OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"] 49QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" 50GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" 51REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" 52TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token" 53SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" 54SUCCESS_RESPONSE = { 55 "access_token": "ACCESS_TOKEN", 56 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", 57 "token_type": "Bearer", 58 "expires_in": 3600, 59} 60ERROR_RESPONSE = { 61 "error": "invalid_grant", 62 "error_description": "Subject token is invalid.", 63 "error_uri": "https://tools.ietf.org/html/rfc6749", 64} 65CREDENTIAL_ACCESS_BOUNDARY_JSON = { 66 "accessBoundary": { 67 "accessBoundaryRules": [ 68 { 69 "availablePermissions": AVAILABLE_PERMISSIONS, 70 "availableResource": AVAILABLE_RESOURCE, 71 "availabilityCondition": { 72 "expression": EXPRESSION, 73 "title": TITLE, 74 "description": DESCRIPTION, 75 }, 76 } 77 ] 78 } 79} 80 81 82class SourceCredentials(credentials.Credentials): 83 def __init__(self, raise_error=False, expires_in=3600): 84 super(SourceCredentials, self).__init__() 85 self._counter = 0 86 self._raise_error = raise_error 87 self._expires_in = expires_in 88 89 def refresh(self, request): 90 if self._raise_error: 91 raise exceptions.RefreshError( 92 "Failed to refresh access token in source credentials." 93 ) 94 now = _helpers.utcnow() 95 self._counter += 1 96 self.token = "ACCESS_TOKEN_{}".format(self._counter) 97 self.expiry = now + datetime.timedelta(seconds=self._expires_in) 98 99 100def make_availability_condition(expression, title=None, description=None): 101 return downscoped.AvailabilityCondition(expression, title, description) 102 103 104def make_access_boundary_rule( 105 available_resource, available_permissions, availability_condition=None 106): 107 return downscoped.AccessBoundaryRule( 108 available_resource, available_permissions, availability_condition 109 ) 110 111 112def make_credential_access_boundary(rules): 113 return downscoped.CredentialAccessBoundary(rules) 114 115 116class TestAvailabilityCondition(object): 117 def test_constructor(self): 118 availability_condition = make_availability_condition( 119 EXPRESSION, TITLE, DESCRIPTION 120 ) 121 122 assert availability_condition.expression == EXPRESSION 123 assert availability_condition.title == TITLE 124 assert availability_condition.description == DESCRIPTION 125 126 def test_constructor_required_params_only(self): 127 availability_condition = make_availability_condition(EXPRESSION) 128 129 assert availability_condition.expression == EXPRESSION 130 assert availability_condition.title is None 131 assert availability_condition.description is None 132 133 def test_setters(self): 134 availability_condition = make_availability_condition( 135 EXPRESSION, TITLE, DESCRIPTION 136 ) 137 availability_condition.expression = OTHER_EXPRESSION 138 availability_condition.title = OTHER_TITLE 139 availability_condition.description = OTHER_DESCRIPTION 140 141 assert availability_condition.expression == OTHER_EXPRESSION 142 assert availability_condition.title == OTHER_TITLE 143 assert availability_condition.description == OTHER_DESCRIPTION 144 145 def test_invalid_expression_type(self): 146 with pytest.raises(TypeError) as excinfo: 147 make_availability_condition([EXPRESSION], TITLE, DESCRIPTION) 148 149 assert excinfo.match("The provided expression is not a string.") 150 151 def test_invalid_title_type(self): 152 with pytest.raises(TypeError) as excinfo: 153 make_availability_condition(EXPRESSION, False, DESCRIPTION) 154 155 assert excinfo.match("The provided title is not a string or None.") 156 157 def test_invalid_description_type(self): 158 with pytest.raises(TypeError) as excinfo: 159 make_availability_condition(EXPRESSION, TITLE, False) 160 161 assert excinfo.match("The provided description is not a string or None.") 162 163 def test_to_json_required_params_only(self): 164 availability_condition = make_availability_condition(EXPRESSION) 165 166 assert availability_condition.to_json() == {"expression": EXPRESSION} 167 168 def test_to_json_(self): 169 availability_condition = make_availability_condition( 170 EXPRESSION, TITLE, DESCRIPTION 171 ) 172 173 assert availability_condition.to_json() == { 174 "expression": EXPRESSION, 175 "title": TITLE, 176 "description": DESCRIPTION, 177 } 178 179 180class TestAccessBoundaryRule(object): 181 def test_constructor(self): 182 availability_condition = make_availability_condition( 183 EXPRESSION, TITLE, DESCRIPTION 184 ) 185 access_boundary_rule = make_access_boundary_rule( 186 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 187 ) 188 189 assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE 190 assert access_boundary_rule.available_permissions == tuple( 191 AVAILABLE_PERMISSIONS 192 ) 193 assert access_boundary_rule.availability_condition == availability_condition 194 195 def test_constructor_required_params_only(self): 196 access_boundary_rule = make_access_boundary_rule( 197 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS 198 ) 199 200 assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE 201 assert access_boundary_rule.available_permissions == tuple( 202 AVAILABLE_PERMISSIONS 203 ) 204 assert access_boundary_rule.availability_condition is None 205 206 def test_setters(self): 207 availability_condition = make_availability_condition( 208 EXPRESSION, TITLE, DESCRIPTION 209 ) 210 other_availability_condition = make_availability_condition( 211 OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION 212 ) 213 access_boundary_rule = make_access_boundary_rule( 214 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 215 ) 216 access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE 217 access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS 218 access_boundary_rule.availability_condition = other_availability_condition 219 220 assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE 221 assert access_boundary_rule.available_permissions == tuple( 222 OTHER_AVAILABLE_PERMISSIONS 223 ) 224 assert ( 225 access_boundary_rule.availability_condition == other_availability_condition 226 ) 227 228 def test_invalid_available_resource_type(self): 229 availability_condition = make_availability_condition( 230 EXPRESSION, TITLE, DESCRIPTION 231 ) 232 with pytest.raises(TypeError) as excinfo: 233 make_access_boundary_rule( 234 None, AVAILABLE_PERMISSIONS, availability_condition 235 ) 236 237 assert excinfo.match("The provided available_resource is not a string.") 238 239 def test_invalid_available_permissions_type(self): 240 availability_condition = make_availability_condition( 241 EXPRESSION, TITLE, DESCRIPTION 242 ) 243 with pytest.raises(TypeError) as excinfo: 244 make_access_boundary_rule( 245 AVAILABLE_RESOURCE, [0, 1, 2], availability_condition 246 ) 247 248 assert excinfo.match( 249 "Provided available_permissions are not a list of strings." 250 ) 251 252 def test_invalid_available_permissions_value(self): 253 availability_condition = make_availability_condition( 254 EXPRESSION, TITLE, DESCRIPTION 255 ) 256 with pytest.raises(ValueError) as excinfo: 257 make_access_boundary_rule( 258 AVAILABLE_RESOURCE, 259 ["roles/storage.objectViewer"], 260 availability_condition, 261 ) 262 263 assert excinfo.match("available_permissions must be prefixed with 'inRole:'.") 264 265 def test_invalid_availability_condition_type(self): 266 with pytest.raises(TypeError) as excinfo: 267 make_access_boundary_rule( 268 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"} 269 ) 270 271 assert excinfo.match( 272 "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None." 273 ) 274 275 def test_to_json(self): 276 availability_condition = make_availability_condition( 277 EXPRESSION, TITLE, DESCRIPTION 278 ) 279 access_boundary_rule = make_access_boundary_rule( 280 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 281 ) 282 283 assert access_boundary_rule.to_json() == { 284 "availablePermissions": AVAILABLE_PERMISSIONS, 285 "availableResource": AVAILABLE_RESOURCE, 286 "availabilityCondition": { 287 "expression": EXPRESSION, 288 "title": TITLE, 289 "description": DESCRIPTION, 290 }, 291 } 292 293 def test_to_json_required_params_only(self): 294 access_boundary_rule = make_access_boundary_rule( 295 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS 296 ) 297 298 assert access_boundary_rule.to_json() == { 299 "availablePermissions": AVAILABLE_PERMISSIONS, 300 "availableResource": AVAILABLE_RESOURCE, 301 } 302 303 304class TestCredentialAccessBoundary(object): 305 def test_constructor(self): 306 availability_condition = make_availability_condition( 307 EXPRESSION, TITLE, DESCRIPTION 308 ) 309 access_boundary_rule = make_access_boundary_rule( 310 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 311 ) 312 rules = [access_boundary_rule] 313 credential_access_boundary = make_credential_access_boundary(rules) 314 315 assert credential_access_boundary.rules == tuple(rules) 316 317 def test_setters(self): 318 availability_condition = make_availability_condition( 319 EXPRESSION, TITLE, DESCRIPTION 320 ) 321 access_boundary_rule = make_access_boundary_rule( 322 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 323 ) 324 rules = [access_boundary_rule] 325 other_availability_condition = make_availability_condition( 326 OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION 327 ) 328 other_access_boundary_rule = make_access_boundary_rule( 329 OTHER_AVAILABLE_RESOURCE, 330 OTHER_AVAILABLE_PERMISSIONS, 331 other_availability_condition, 332 ) 333 other_rules = [other_access_boundary_rule] 334 credential_access_boundary = make_credential_access_boundary(rules) 335 credential_access_boundary.rules = other_rules 336 337 assert credential_access_boundary.rules == tuple(other_rules) 338 339 def test_add_rule(self): 340 availability_condition = make_availability_condition( 341 EXPRESSION, TITLE, DESCRIPTION 342 ) 343 access_boundary_rule = make_access_boundary_rule( 344 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 345 ) 346 rules = [access_boundary_rule] * 9 347 credential_access_boundary = make_credential_access_boundary(rules) 348 349 # Add one more rule. This should not raise an error. 350 additional_access_boundary_rule = make_access_boundary_rule( 351 OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS 352 ) 353 credential_access_boundary.add_rule(additional_access_boundary_rule) 354 355 assert len(credential_access_boundary.rules) == 10 356 assert credential_access_boundary.rules[9] == additional_access_boundary_rule 357 358 def test_add_rule_invalid_value(self): 359 availability_condition = make_availability_condition( 360 EXPRESSION, TITLE, DESCRIPTION 361 ) 362 access_boundary_rule = make_access_boundary_rule( 363 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 364 ) 365 rules = [access_boundary_rule] * 10 366 credential_access_boundary = make_credential_access_boundary(rules) 367 368 # Add one more rule to exceed maximum allowed rules. 369 with pytest.raises(ValueError) as excinfo: 370 credential_access_boundary.add_rule(access_boundary_rule) 371 372 assert excinfo.match( 373 "Credential access boundary rules can have a maximum of 10 rules." 374 ) 375 assert len(credential_access_boundary.rules) == 10 376 377 def test_add_rule_invalid_type(self): 378 availability_condition = make_availability_condition( 379 EXPRESSION, TITLE, DESCRIPTION 380 ) 381 access_boundary_rule = make_access_boundary_rule( 382 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 383 ) 384 rules = [access_boundary_rule] 385 credential_access_boundary = make_credential_access_boundary(rules) 386 387 # Add an invalid rule to exceed maximum allowed rules. 388 with pytest.raises(TypeError) as excinfo: 389 credential_access_boundary.add_rule("invalid") 390 391 assert excinfo.match( 392 "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." 393 ) 394 assert len(credential_access_boundary.rules) == 1 395 assert credential_access_boundary.rules[0] == access_boundary_rule 396 397 def test_invalid_rules_type(self): 398 with pytest.raises(TypeError) as excinfo: 399 make_credential_access_boundary(["invalid"]) 400 401 assert excinfo.match( 402 "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'." 403 ) 404 405 def test_invalid_rules_value(self): 406 availability_condition = make_availability_condition( 407 EXPRESSION, TITLE, DESCRIPTION 408 ) 409 access_boundary_rule = make_access_boundary_rule( 410 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 411 ) 412 too_many_rules = [access_boundary_rule] * 11 413 with pytest.raises(ValueError) as excinfo: 414 make_credential_access_boundary(too_many_rules) 415 416 assert excinfo.match( 417 "Credential access boundary rules can have a maximum of 10 rules." 418 ) 419 420 def test_to_json(self): 421 availability_condition = make_availability_condition( 422 EXPRESSION, TITLE, DESCRIPTION 423 ) 424 access_boundary_rule = make_access_boundary_rule( 425 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 426 ) 427 rules = [access_boundary_rule] 428 credential_access_boundary = make_credential_access_boundary(rules) 429 430 assert credential_access_boundary.to_json() == { 431 "accessBoundary": { 432 "accessBoundaryRules": [ 433 { 434 "availablePermissions": AVAILABLE_PERMISSIONS, 435 "availableResource": AVAILABLE_RESOURCE, 436 "availabilityCondition": { 437 "expression": EXPRESSION, 438 "title": TITLE, 439 "description": DESCRIPTION, 440 }, 441 } 442 ] 443 } 444 } 445 446 447class TestCredentials(object): 448 @staticmethod 449 def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None): 450 availability_condition = make_availability_condition( 451 EXPRESSION, TITLE, DESCRIPTION 452 ) 453 access_boundary_rule = make_access_boundary_rule( 454 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition 455 ) 456 rules = [access_boundary_rule] 457 credential_access_boundary = make_credential_access_boundary(rules) 458 459 return downscoped.Credentials( 460 source_credentials, credential_access_boundary, quota_project_id 461 ) 462 463 @staticmethod 464 def make_mock_request(data, status=http_client.OK): 465 response = mock.create_autospec(transport.Response, instance=True) 466 response.status = status 467 response.data = json.dumps(data).encode("utf-8") 468 469 request = mock.create_autospec(transport.Request) 470 request.return_value = response 471 472 return request 473 474 @staticmethod 475 def assert_request_kwargs(request_kwargs, headers, request_data): 476 """Asserts the request was called with the expected parameters. 477 """ 478 assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT 479 assert request_kwargs["method"] == "POST" 480 assert request_kwargs["headers"] == headers 481 assert request_kwargs["body"] is not None 482 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) 483 for (k, v) in body_tuples: 484 assert v.decode("utf-8") == request_data[k.decode("utf-8")] 485 assert len(body_tuples) == len(request_data.keys()) 486 487 def test_default_state(self): 488 credentials = self.make_credentials() 489 490 # No token acquired yet. 491 assert not credentials.token 492 assert not credentials.valid 493 # Expiration hasn't been set yet. 494 assert not credentials.expiry 495 assert not credentials.expired 496 # No quota project ID set. 497 assert not credentials.quota_project_id 498 499 def test_with_quota_project(self): 500 credentials = self.make_credentials() 501 502 assert not credentials.quota_project_id 503 504 quota_project_creds = credentials.with_quota_project("project-foo") 505 506 assert quota_project_creds.quota_project_id == "project-foo" 507 508 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 509 def test_refresh(self, unused_utcnow): 510 response = SUCCESS_RESPONSE.copy() 511 # Test custom expiration to confirm expiry is set correctly. 512 response["expires_in"] = 2800 513 expected_expiry = datetime.datetime.min + datetime.timedelta( 514 seconds=response["expires_in"] 515 ) 516 headers = {"Content-Type": "application/x-www-form-urlencoded"} 517 request_data = { 518 "grant_type": GRANT_TYPE, 519 "subject_token": "ACCESS_TOKEN_1", 520 "subject_token_type": SUBJECT_TOKEN_TYPE, 521 "requested_token_type": REQUESTED_TOKEN_TYPE, 522 "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)), 523 } 524 request = self.make_mock_request(status=http_client.OK, data=response) 525 source_credentials = SourceCredentials() 526 credentials = self.make_credentials(source_credentials=source_credentials) 527 528 # Spy on calls to source credentials refresh to confirm the expected request 529 # instance is used. 530 with mock.patch.object( 531 source_credentials, "refresh", wraps=source_credentials.refresh 532 ) as wrapped_souce_cred_refresh: 533 credentials.refresh(request) 534 535 self.assert_request_kwargs(request.call_args[1], headers, request_data) 536 assert credentials.valid 537 assert credentials.expiry == expected_expiry 538 assert not credentials.expired 539 assert credentials.token == response["access_token"] 540 # Confirm source credentials called with the same request instance. 541 wrapped_souce_cred_refresh.assert_called_with(request) 542 543 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 544 def test_refresh_without_response_expires_in(self, unused_utcnow): 545 response = SUCCESS_RESPONSE.copy() 546 # Simulate the response is missing the expires_in field. 547 # The downscoped token expiration should match the source credentials 548 # expiration. 549 del response["expires_in"] 550 expected_expires_in = 1800 551 # Simulate the source credentials generates a token with 1800 second 552 # expiration time. The generated downscoped token should have the same 553 # expiration time. 554 source_credentials = SourceCredentials(expires_in=expected_expires_in) 555 expected_expiry = datetime.datetime.min + datetime.timedelta( 556 seconds=expected_expires_in 557 ) 558 headers = {"Content-Type": "application/x-www-form-urlencoded"} 559 request_data = { 560 "grant_type": GRANT_TYPE, 561 "subject_token": "ACCESS_TOKEN_1", 562 "subject_token_type": SUBJECT_TOKEN_TYPE, 563 "requested_token_type": REQUESTED_TOKEN_TYPE, 564 "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)), 565 } 566 request = self.make_mock_request(status=http_client.OK, data=response) 567 credentials = self.make_credentials(source_credentials=source_credentials) 568 569 # Spy on calls to source credentials refresh to confirm the expected request 570 # instance is used. 571 with mock.patch.object( 572 source_credentials, "refresh", wraps=source_credentials.refresh 573 ) as wrapped_souce_cred_refresh: 574 credentials.refresh(request) 575 576 self.assert_request_kwargs(request.call_args[1], headers, request_data) 577 assert credentials.valid 578 assert credentials.expiry == expected_expiry 579 assert not credentials.expired 580 assert credentials.token == response["access_token"] 581 # Confirm source credentials called with the same request instance. 582 wrapped_souce_cred_refresh.assert_called_with(request) 583 584 def test_refresh_token_exchange_error(self): 585 request = self.make_mock_request( 586 status=http_client.BAD_REQUEST, data=ERROR_RESPONSE 587 ) 588 credentials = self.make_credentials() 589 590 with pytest.raises(exceptions.OAuthError) as excinfo: 591 credentials.refresh(request) 592 593 assert excinfo.match( 594 r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749" 595 ) 596 assert not credentials.expired 597 assert credentials.token is None 598 599 def test_refresh_source_credentials_refresh_error(self): 600 # Initialize downscoped credentials with source credentials that raise 601 # an error on refresh. 602 credentials = self.make_credentials( 603 source_credentials=SourceCredentials(raise_error=True) 604 ) 605 606 with pytest.raises(exceptions.RefreshError) as excinfo: 607 credentials.refresh(mock.sentinel.request) 608 609 assert excinfo.match(r"Failed to refresh access token in source credentials.") 610 assert not credentials.expired 611 assert credentials.token is None 612 613 def test_apply_without_quota_project_id(self): 614 headers = {} 615 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) 616 credentials = self.make_credentials() 617 618 credentials.refresh(request) 619 credentials.apply(headers) 620 621 assert headers == { 622 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]) 623 } 624 625 def test_apply_with_quota_project_id(self): 626 headers = {"other": "header-value"} 627 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) 628 credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID) 629 630 credentials.refresh(request) 631 credentials.apply(headers) 632 633 assert headers == { 634 "other": "header-value", 635 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), 636 "x-goog-user-project": QUOTA_PROJECT_ID, 637 } 638 639 def test_before_request(self): 640 headers = {"other": "header-value"} 641 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) 642 credentials = self.make_credentials() 643 644 # First call should call refresh, setting the token. 645 credentials.before_request(request, "POST", "https://example.com/api", headers) 646 647 assert headers == { 648 "other": "header-value", 649 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), 650 } 651 652 # Second call shouldn't call refresh (request should be untouched). 653 credentials.before_request( 654 mock.sentinel.request, "POST", "https://example.com/api", headers 655 ) 656 657 assert headers == { 658 "other": "header-value", 659 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]), 660 } 661 662 @mock.patch("google.auth._helpers.utcnow") 663 def test_before_request_expired(self, utcnow): 664 headers = {} 665 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE) 666 credentials = self.make_credentials() 667 credentials.token = "token" 668 utcnow.return_value = datetime.datetime.min 669 # Set the expiration to one second more than now plus the clock skew 670 # accommodation. These credentials should be valid. 671 credentials.expiry = ( 672 datetime.datetime.min 673 + _helpers.REFRESH_THRESHOLD 674 + datetime.timedelta(seconds=1) 675 ) 676 677 assert credentials.valid 678 assert not credentials.expired 679 680 credentials.before_request(request, "POST", "https://example.com/api", headers) 681 682 # Cached token should be used. 683 assert headers == {"authorization": "Bearer token"} 684 685 # Next call should simulate 1 second passed. 686 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1) 687 688 assert not credentials.valid 689 assert credentials.expired 690 691 credentials.before_request(request, "POST", "https://example.com/api", headers) 692 693 # New token should be retrieved. 694 assert headers == { 695 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]) 696 } 697