# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import mock import pytest from six.moves import http_client from six.moves import urllib from google.auth import exceptions from google.auth import transport from google.oauth2 import sts from google.oauth2 import utils CLIENT_ID = "username" CLIENT_SECRET = "password" # Base64 encoding of "username:password" BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" class TestStsClient(object): GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" RESOURCE = "https://api.example.com/" AUDIENCE = "urn:example:cooperation-context" SCOPES = ["scope1", "scope2"] REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE" SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE" ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2" ADDON_HEADERS = {"x-client-version": "0.1.2"} ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}} SUCCESS_RESPONSE = { "access_token": "ACCESS_TOKEN", "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", "token_type": "Bearer", "expires_in": 3600, "scope": "scope1 scope2", } ERROR_RESPONSE = { "error": "invalid_request", "error_description": "Invalid subject token", "error_uri": "https://tools.ietf.org/html/rfc6749", } CLIENT_AUTH_BASIC = utils.ClientAuthentication( utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET ) CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication( utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET ) @classmethod def make_client(cls, client_auth=None): return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth) @classmethod def make_mock_request(cls, data, status=http_client.OK): response = mock.create_autospec(transport.Response, instance=True) response.status = status response.data = json.dumps(data).encode("utf-8") request = mock.create_autospec(transport.Request) request.return_value = response return request @classmethod def assert_request_kwargs(cls, request_kwargs, headers, request_data): """Asserts the request was called with the expected parameters. """ assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT assert request_kwargs["method"] == "POST" assert request_kwargs["headers"] == headers assert request_kwargs["body"] is not None body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) for (k, v) in body_tuples: assert v.decode("utf-8") == request_data[k.decode("utf-8")] assert len(body_tuples) == len(request_data.keys()) def test_exchange_token_full_success_without_auth(self): """Test token exchange success without client authentication using full parameters. """ client = self.make_client() headers = self.ADDON_HEADERS.copy() headers["Content-Type"] = "application/x-www-form-urlencoded" request_data = { "grant_type": self.GRANT_TYPE, "resource": self.RESOURCE, "audience": self.AUDIENCE, "scope": " ".join(self.SCOPES), "requested_token_type": self.REQUESTED_TOKEN_TYPE, "subject_token": self.SUBJECT_TOKEN, "subject_token_type": self.SUBJECT_TOKEN_TYPE, "actor_token": self.ACTOR_TOKEN, "actor_token_type": self.ACTOR_TOKEN_TYPE, "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), } request = self.make_mock_request( status=http_client.OK, data=self.SUCCESS_RESPONSE ) response = client.exchange_token( request, self.GRANT_TYPE, self.SUBJECT_TOKEN, self.SUBJECT_TOKEN_TYPE, self.RESOURCE, self.AUDIENCE, self.SCOPES, self.REQUESTED_TOKEN_TYPE, self.ACTOR_TOKEN, self.ACTOR_TOKEN_TYPE, self.ADDON_OPTIONS, self.ADDON_HEADERS, ) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_partial_success_without_auth(self): """Test token exchange success without client authentication using partial (required only) parameters. """ client = self.make_client() headers = {"Content-Type": "application/x-www-form-urlencoded"} request_data = { "grant_type": self.GRANT_TYPE, "audience": self.AUDIENCE, "requested_token_type": self.REQUESTED_TOKEN_TYPE, "subject_token": self.SUBJECT_TOKEN, "subject_token_type": self.SUBJECT_TOKEN_TYPE, } request = self.make_mock_request( status=http_client.OK, data=self.SUCCESS_RESPONSE ) response = client.exchange_token( request, grant_type=self.GRANT_TYPE, subject_token=self.SUBJECT_TOKEN, subject_token_type=self.SUBJECT_TOKEN_TYPE, audience=self.AUDIENCE, requested_token_type=self.REQUESTED_TOKEN_TYPE, ) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_non200_without_auth(self): """Test token exchange without client auth responding with non-200 status. """ client = self.make_client() request = self.make_mock_request( status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE ) with pytest.raises(exceptions.OAuthError) as excinfo: client.exchange_token( request, self.GRANT_TYPE, self.SUBJECT_TOKEN, self.SUBJECT_TOKEN_TYPE, self.RESOURCE, self.AUDIENCE, self.SCOPES, self.REQUESTED_TOKEN_TYPE, self.ACTOR_TOKEN, self.ACTOR_TOKEN_TYPE, self.ADDON_OPTIONS, self.ADDON_HEADERS, ) assert excinfo.match( r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" ) def test_exchange_token_full_success_with_basic_auth(self): """Test token exchange success with basic client authentication using full parameters. """ client = self.make_client(self.CLIENT_AUTH_BASIC) headers = self.ADDON_HEADERS.copy() headers["Content-Type"] = "application/x-www-form-urlencoded" headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING) request_data = { "grant_type": self.GRANT_TYPE, "resource": self.RESOURCE, "audience": self.AUDIENCE, "scope": " ".join(self.SCOPES), "requested_token_type": self.REQUESTED_TOKEN_TYPE, "subject_token": self.SUBJECT_TOKEN, "subject_token_type": self.SUBJECT_TOKEN_TYPE, "actor_token": self.ACTOR_TOKEN, "actor_token_type": self.ACTOR_TOKEN_TYPE, "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), } request = self.make_mock_request( status=http_client.OK, data=self.SUCCESS_RESPONSE ) response = client.exchange_token( request, self.GRANT_TYPE, self.SUBJECT_TOKEN, self.SUBJECT_TOKEN_TYPE, self.RESOURCE, self.AUDIENCE, self.SCOPES, self.REQUESTED_TOKEN_TYPE, self.ACTOR_TOKEN, self.ACTOR_TOKEN_TYPE, self.ADDON_OPTIONS, self.ADDON_HEADERS, ) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_partial_success_with_basic_auth(self): """Test token exchange success with basic client authentication using partial (required only) parameters. """ client = self.make_client(self.CLIENT_AUTH_BASIC) headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), } request_data = { "grant_type": self.GRANT_TYPE, "audience": self.AUDIENCE, "requested_token_type": self.REQUESTED_TOKEN_TYPE, "subject_token": self.SUBJECT_TOKEN, "subject_token_type": self.SUBJECT_TOKEN_TYPE, } request = self.make_mock_request( status=http_client.OK, data=self.SUCCESS_RESPONSE ) response = client.exchange_token( request, grant_type=self.GRANT_TYPE, subject_token=self.SUBJECT_TOKEN, subject_token_type=self.SUBJECT_TOKEN_TYPE, audience=self.AUDIENCE, requested_token_type=self.REQUESTED_TOKEN_TYPE, ) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_non200_with_basic_auth(self): """Test token exchange with basic client auth responding with non-200 status. """ client = self.make_client(self.CLIENT_AUTH_BASIC) request = self.make_mock_request( status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE ) with pytest.raises(exceptions.OAuthError) as excinfo: client.exchange_token( request, self.GRANT_TYPE, self.SUBJECT_TOKEN, self.SUBJECT_TOKEN_TYPE, self.RESOURCE, self.AUDIENCE, self.SCOPES, self.REQUESTED_TOKEN_TYPE, self.ACTOR_TOKEN, self.ACTOR_TOKEN_TYPE, self.ADDON_OPTIONS, self.ADDON_HEADERS, ) assert excinfo.match( r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" ) def test_exchange_token_full_success_with_reqbody_auth(self): """Test token exchange success with request body client authenticaiton using full parameters. """ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) headers = self.ADDON_HEADERS.copy() headers["Content-Type"] = "application/x-www-form-urlencoded" request_data = { "grant_type": self.GRANT_TYPE, "resource": self.RESOURCE, "audience": self.AUDIENCE, "scope": " ".join(self.SCOPES), "requested_token_type": self.REQUESTED_TOKEN_TYPE, "subject_token": self.SUBJECT_TOKEN, "subject_token_type": self.SUBJECT_TOKEN_TYPE, "actor_token": self.ACTOR_TOKEN, "actor_token_type": self.ACTOR_TOKEN_TYPE, "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, } request = self.make_mock_request( status=http_client.OK, data=self.SUCCESS_RESPONSE ) response = client.exchange_token( request, self.GRANT_TYPE, self.SUBJECT_TOKEN, self.SUBJECT_TOKEN_TYPE, self.RESOURCE, self.AUDIENCE, self.SCOPES, self.REQUESTED_TOKEN_TYPE, self.ACTOR_TOKEN, self.ACTOR_TOKEN_TYPE, self.ADDON_OPTIONS, self.ADDON_HEADERS, ) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_partial_success_with_reqbody_auth(self): """Test token exchange success with request body client authentication using partial (required only) parameters. """ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) headers = {"Content-Type": "application/x-www-form-urlencoded"} request_data = { "grant_type": self.GRANT_TYPE, "audience": self.AUDIENCE, "requested_token_type": self.REQUESTED_TOKEN_TYPE, "subject_token": self.SUBJECT_TOKEN, "subject_token_type": self.SUBJECT_TOKEN_TYPE, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET, } request = self.make_mock_request( status=http_client.OK, data=self.SUCCESS_RESPONSE ) response = client.exchange_token( request, grant_type=self.GRANT_TYPE, subject_token=self.SUBJECT_TOKEN, subject_token_type=self.SUBJECT_TOKEN_TYPE, audience=self.AUDIENCE, requested_token_type=self.REQUESTED_TOKEN_TYPE, ) self.assert_request_kwargs(request.call_args[1], headers, request_data) assert response == self.SUCCESS_RESPONSE def test_exchange_token_non200_with_reqbody_auth(self): """Test token exchange with POST request body client auth responding with non-200 status. """ client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) request = self.make_mock_request( status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE ) with pytest.raises(exceptions.OAuthError) as excinfo: client.exchange_token( request, self.GRANT_TYPE, self.SUBJECT_TOKEN, self.SUBJECT_TOKEN_TYPE, self.RESOURCE, self.AUDIENCE, self.SCOPES, self.REQUESTED_TOKEN_TYPE, self.ACTOR_TOKEN, self.ACTOR_TOKEN_TYPE, self.ADDON_OPTIONS, self.ADDON_HEADERS, ) assert excinfo.match( r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" )