# Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Unit tests for oauth2client.contrib.gce.""" import datetime import json import httplib2 import mock from six.moves import http_client from tests.contrib.test_metadata import request_mock import unittest2 from oauth2client import client from oauth2client.contrib import gce __author__ = 'jcgregorio@google.com (Joe Gregorio)' SERVICE_ACCOUNT_INFO = { 'scopes': ['a', 'b'], 'email': 'a@example.com', 'aliases': ['default'] } class AppAssertionCredentialsTests(unittest2.TestCase): def test_constructor(self): credentials = gce.AppAssertionCredentials() self.assertIsNone(credentials.assertion_type, None) self.assertIsNone(credentials.service_account_email) self.assertIsNone(credentials.scopes) self.assertTrue(credentials.invalid) @mock.patch('warnings.warn') def test_constructor_with_scopes(self, warn_mock): scope = 'http://example.com/a http://example.com/b' scopes = scope.split() credentials = gce.AppAssertionCredentials(scopes=scopes) self.assertEqual(credentials.scopes, None) self.assertEqual(credentials.assertion_type, None) warn_mock.assert_called_once_with(gce._SCOPES_WARNING) def test_to_json(self): credentials = gce.AppAssertionCredentials() with self.assertRaises(NotImplementedError): credentials.to_json() def test_from_json(self): with self.assertRaises(NotImplementedError): gce.AppAssertionCredentials.from_json({}) @mock.patch('oauth2client.contrib._metadata.get_token', side_effect=[('A', datetime.datetime.min), ('B', datetime.datetime.max)]) @mock.patch('oauth2client.contrib._metadata.get_service_account_info', return_value=SERVICE_ACCOUNT_INFO) def test_refresh_token(self, get_info, get_token): http_request = mock.MagicMock() http_mock = mock.MagicMock(request=http_request) credentials = gce.AppAssertionCredentials() credentials.invalid = False credentials.service_account_email = 'a@example.com' self.assertIsNone(credentials.access_token) credentials.get_access_token(http=http_mock) self.assertEqual(credentials.access_token, 'A') self.assertTrue(credentials.access_token_expired) get_token.assert_called_with(http_request, service_account='a@example.com') credentials.get_access_token(http=http_mock) self.assertEqual(credentials.access_token, 'B') self.assertFalse(credentials.access_token_expired) get_token.assert_called_with(http_request, service_account='a@example.com') get_info.assert_not_called() def test_refresh_token_failed_fetch(self): http_request = request_mock( http_client.NOT_FOUND, 'application/json', json.dumps({'access_token': 'a', 'expires_in': 100}) ) credentials = gce.AppAssertionCredentials() credentials.invalid = False credentials.service_account_email = 'a@example.com' with self.assertRaises(client.HttpAccessTokenRefreshError): credentials._refresh(http_request) def test_serialization_data(self): credentials = gce.AppAssertionCredentials() with self.assertRaises(NotImplementedError): getattr(credentials, 'serialization_data') def test_create_scoped_required(self): credentials = gce.AppAssertionCredentials() self.assertFalse(credentials.create_scoped_required()) def test_sign_blob_not_implemented(self): credentials = gce.AppAssertionCredentials([]) with self.assertRaises(NotImplementedError): credentials.sign_blob(b'blob') @mock.patch('oauth2client.contrib._metadata.get_service_account_info', return_value=SERVICE_ACCOUNT_INFO) def test_retrieve_scopes(self, metadata): http_request = mock.MagicMock() http_mock = mock.MagicMock(request=http_request) credentials = gce.AppAssertionCredentials() self.assertTrue(credentials.invalid) self.assertIsNone(credentials.scopes) scopes = credentials.retrieve_scopes(http_mock) self.assertEqual(scopes, SERVICE_ACCOUNT_INFO['scopes']) self.assertFalse(credentials.invalid) credentials.retrieve_scopes(http_mock) # Assert scopes weren't refetched metadata.assert_called_once_with(http_request, service_account='default') @mock.patch('oauth2client.contrib._metadata.get_service_account_info', side_effect=httplib2.HttpLib2Error('No Such Email')) def test_retrieve_scopes_bad_email(self, metadata): http_request = mock.MagicMock() http_mock = mock.MagicMock(request=http_request) credentials = gce.AppAssertionCredentials(email='b@example.com') with self.assertRaises(httplib2.HttpLib2Error): credentials.retrieve_scopes(http_mock) metadata.assert_called_once_with(http_request, service_account='b@example.com') def test_save_to_well_known_file(self): import os ORIGINAL_ISDIR = os.path.isdir try: os.path.isdir = lambda path: True credentials = gce.AppAssertionCredentials() with self.assertRaises(NotImplementedError): client.save_to_well_known_file(credentials) finally: os.path.isdir = ORIGINAL_ISDIR