1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import os 16import re 17 18import mock 19from OpenSSL import crypto 20import pytest 21 22from google.auth import exceptions 23from google.auth.transport import _mtls_helper 24 25CONTEXT_AWARE_METADATA = {"cert_provider_command": ["some command"]} 26 27CONTEXT_AWARE_METADATA_NO_CERT_PROVIDER_COMMAND = {} 28 29ENCRYPTED_EC_PRIVATE_KEY = b"""-----BEGIN ENCRYPTED PRIVATE KEY----- 30MIHkME8GCSqGSIb3DQEFDTBCMCkGCSqGSIb3DQEFDDAcBAgl2/yVgs1h3QICCAAw 31DAYIKoZIhvcNAgkFADAVBgkrBgEEAZdVAQIECJk2GRrvxOaJBIGQXIBnMU4wmciT 32uA6yD8q0FxuIzjG7E2S6tc5VRgSbhRB00eBO3jWmO2pBybeQW+zVioDcn50zp2ts 33wYErWC+LCm1Zg3r+EGnT1E1GgNoODbVQ3AEHlKh1CGCYhEovxtn3G+Fjh7xOBrNB 34saVVeDb4tHD4tMkiVVUBrUcTZPndP73CtgyGHYEphasYPzEz3+AU 35-----END ENCRYPTED PRIVATE KEY-----""" 36 37EC_PUBLIC_KEY = b"""-----BEGIN PUBLIC KEY----- 38MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEvCNi1NoDY1oMqPHIgXI8RBbTYGi/ 39brEjbre1nSiQW11xRTJbVeETdsuP0EAu2tG3PcRhhwDfeJ8zXREgTBurNw== 40-----END PUBLIC KEY-----""" 41 42PASSPHRASE = b"""-----BEGIN PASSPHRASE----- 43password 44-----END PASSPHRASE-----""" 45PASSPHRASE_VALUE = b"password" 46 47 48def check_cert_and_key(content, expected_cert, expected_key): 49 success = True 50 51 cert_match = re.findall(_mtls_helper._CERT_REGEX, content) 52 success = success and len(cert_match) == 1 and cert_match[0] == expected_cert 53 54 key_match = re.findall(_mtls_helper._KEY_REGEX, content) 55 success = success and len(key_match) == 1 and key_match[0] == expected_key 56 57 return success 58 59 60class TestCertAndKeyRegex(object): 61 def test_cert_and_key(self): 62 # Test single cert and single key 63 check_cert_and_key( 64 pytest.public_cert_bytes + pytest.private_key_bytes, 65 pytest.public_cert_bytes, 66 pytest.private_key_bytes, 67 ) 68 check_cert_and_key( 69 pytest.private_key_bytes + pytest.public_cert_bytes, 70 pytest.public_cert_bytes, 71 pytest.private_key_bytes, 72 ) 73 74 # Test cert chain and single key 75 check_cert_and_key( 76 pytest.public_cert_bytes 77 + pytest.public_cert_bytes 78 + pytest.private_key_bytes, 79 pytest.public_cert_bytes + pytest.public_cert_bytes, 80 pytest.private_key_bytes, 81 ) 82 check_cert_and_key( 83 pytest.private_key_bytes 84 + pytest.public_cert_bytes 85 + pytest.public_cert_bytes, 86 pytest.public_cert_bytes + pytest.public_cert_bytes, 87 pytest.private_key_bytes, 88 ) 89 90 def test_key(self): 91 # Create some fake keys for regex check. 92 KEY = b"""-----BEGIN PRIVATE KEY----- 93 MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg 94 /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB 95 -----END PRIVATE KEY-----""" 96 RSA_KEY = b"""-----BEGIN RSA PRIVATE KEY----- 97 MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg 98 /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB 99 -----END RSA PRIVATE KEY-----""" 100 EC_KEY = b"""-----BEGIN EC PRIVATE KEY----- 101 MIIBCgKCAQEA4ej0p7bQ7L/r4rVGUz9RN4VQWoej1Bg1mYWIDYslvKrk1gpj7wZg 102 /fy3ZpsL7WqgsZS7Q+0VRK8gKfqkxg5OYQIDAQAB 103 -----END EC PRIVATE KEY-----""" 104 105 check_cert_and_key( 106 pytest.public_cert_bytes + KEY, pytest.public_cert_bytes, KEY 107 ) 108 check_cert_and_key( 109 pytest.public_cert_bytes + RSA_KEY, pytest.public_cert_bytes, RSA_KEY 110 ) 111 check_cert_and_key( 112 pytest.public_cert_bytes + EC_KEY, pytest.public_cert_bytes, EC_KEY 113 ) 114 115 116class TestCheckaMetadataPath(object): 117 def test_success(self): 118 metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json") 119 returned_path = _mtls_helper._check_dca_metadata_path(metadata_path) 120 assert returned_path is not None 121 122 def test_failure(self): 123 metadata_path = os.path.join(pytest.data_dir, "not_exists.json") 124 returned_path = _mtls_helper._check_dca_metadata_path(metadata_path) 125 assert returned_path is None 126 127 128class TestReadMetadataFile(object): 129 def test_success(self): 130 metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json") 131 metadata = _mtls_helper._read_dca_metadata_file(metadata_path) 132 133 assert "cert_provider_command" in metadata 134 135 def test_file_not_json(self): 136 # read a file which is not json format. 137 metadata_path = os.path.join(pytest.data_dir, "privatekey.pem") 138 with pytest.raises(exceptions.ClientCertError): 139 _mtls_helper._read_dca_metadata_file(metadata_path) 140 141 142class TestRunCertProviderCommand(object): 143 def create_mock_process(self, output, error): 144 # There are two steps to execute a script with subprocess.Popen. 145 # (1) process = subprocess.Popen([comannds]) 146 # (2) stdout, stderr = process.communicate() 147 # This function creates a mock process which can be returned by a mock 148 # subprocess.Popen. The mock process returns the given output and error 149 # when mock_process.communicate() is called. 150 mock_process = mock.Mock() 151 attrs = {"communicate.return_value": (output, error), "returncode": 0} 152 mock_process.configure_mock(**attrs) 153 return mock_process 154 155 @mock.patch("subprocess.Popen", autospec=True) 156 def test_success(self, mock_popen): 157 mock_popen.return_value = self.create_mock_process( 158 pytest.public_cert_bytes + pytest.private_key_bytes, b"" 159 ) 160 cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"]) 161 assert cert == pytest.public_cert_bytes 162 assert key == pytest.private_key_bytes 163 assert passphrase is None 164 165 mock_popen.return_value = self.create_mock_process( 166 pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b"" 167 ) 168 cert, key, passphrase = _mtls_helper._run_cert_provider_command( 169 ["command"], expect_encrypted_key=True 170 ) 171 assert cert == pytest.public_cert_bytes 172 assert key == ENCRYPTED_EC_PRIVATE_KEY 173 assert passphrase == PASSPHRASE_VALUE 174 175 @mock.patch("subprocess.Popen", autospec=True) 176 def test_success_with_cert_chain(self, mock_popen): 177 PUBLIC_CERT_CHAIN_BYTES = pytest.public_cert_bytes + pytest.public_cert_bytes 178 mock_popen.return_value = self.create_mock_process( 179 PUBLIC_CERT_CHAIN_BYTES + pytest.private_key_bytes, b"" 180 ) 181 cert, key, passphrase = _mtls_helper._run_cert_provider_command(["command"]) 182 assert cert == PUBLIC_CERT_CHAIN_BYTES 183 assert key == pytest.private_key_bytes 184 assert passphrase is None 185 186 mock_popen.return_value = self.create_mock_process( 187 PUBLIC_CERT_CHAIN_BYTES + ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b"" 188 ) 189 cert, key, passphrase = _mtls_helper._run_cert_provider_command( 190 ["command"], expect_encrypted_key=True 191 ) 192 assert cert == PUBLIC_CERT_CHAIN_BYTES 193 assert key == ENCRYPTED_EC_PRIVATE_KEY 194 assert passphrase == PASSPHRASE_VALUE 195 196 @mock.patch("subprocess.Popen", autospec=True) 197 def test_missing_cert(self, mock_popen): 198 mock_popen.return_value = self.create_mock_process( 199 pytest.private_key_bytes, b"" 200 ) 201 with pytest.raises(exceptions.ClientCertError): 202 _mtls_helper._run_cert_provider_command(["command"]) 203 204 mock_popen.return_value = self.create_mock_process( 205 ENCRYPTED_EC_PRIVATE_KEY + PASSPHRASE, b"" 206 ) 207 with pytest.raises(exceptions.ClientCertError): 208 _mtls_helper._run_cert_provider_command( 209 ["command"], expect_encrypted_key=True 210 ) 211 212 @mock.patch("subprocess.Popen", autospec=True) 213 def test_missing_key(self, mock_popen): 214 mock_popen.return_value = self.create_mock_process( 215 pytest.public_cert_bytes, b"" 216 ) 217 with pytest.raises(exceptions.ClientCertError): 218 _mtls_helper._run_cert_provider_command(["command"]) 219 220 mock_popen.return_value = self.create_mock_process( 221 pytest.public_cert_bytes + PASSPHRASE, b"" 222 ) 223 with pytest.raises(exceptions.ClientCertError): 224 _mtls_helper._run_cert_provider_command( 225 ["command"], expect_encrypted_key=True 226 ) 227 228 @mock.patch("subprocess.Popen", autospec=True) 229 def test_missing_passphrase(self, mock_popen): 230 mock_popen.return_value = self.create_mock_process( 231 pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b"" 232 ) 233 with pytest.raises(exceptions.ClientCertError): 234 _mtls_helper._run_cert_provider_command( 235 ["command"], expect_encrypted_key=True 236 ) 237 238 @mock.patch("subprocess.Popen", autospec=True) 239 def test_passphrase_not_expected(self, mock_popen): 240 mock_popen.return_value = self.create_mock_process( 241 pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b"" 242 ) 243 with pytest.raises(exceptions.ClientCertError): 244 _mtls_helper._run_cert_provider_command(["command"]) 245 246 @mock.patch("subprocess.Popen", autospec=True) 247 def test_encrypted_key_expected(self, mock_popen): 248 mock_popen.return_value = self.create_mock_process( 249 pytest.public_cert_bytes + pytest.private_key_bytes + PASSPHRASE, b"" 250 ) 251 with pytest.raises(exceptions.ClientCertError): 252 _mtls_helper._run_cert_provider_command( 253 ["command"], expect_encrypted_key=True 254 ) 255 256 @mock.patch("subprocess.Popen", autospec=True) 257 def test_unencrypted_key_expected(self, mock_popen): 258 mock_popen.return_value = self.create_mock_process( 259 pytest.public_cert_bytes + ENCRYPTED_EC_PRIVATE_KEY, b"" 260 ) 261 with pytest.raises(exceptions.ClientCertError): 262 _mtls_helper._run_cert_provider_command(["command"]) 263 264 @mock.patch("subprocess.Popen", autospec=True) 265 def test_cert_provider_returns_error(self, mock_popen): 266 mock_popen.return_value = self.create_mock_process(b"", b"some error") 267 mock_popen.return_value.returncode = 1 268 with pytest.raises(exceptions.ClientCertError): 269 _mtls_helper._run_cert_provider_command(["command"]) 270 271 @mock.patch("subprocess.Popen", autospec=True) 272 def test_popen_raise_exception(self, mock_popen): 273 mock_popen.side_effect = OSError() 274 with pytest.raises(exceptions.ClientCertError): 275 _mtls_helper._run_cert_provider_command(["command"]) 276 277 278class TestGetClientSslCredentials(object): 279 @mock.patch( 280 "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True 281 ) 282 @mock.patch( 283 "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True 284 ) 285 @mock.patch( 286 "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True 287 ) 288 def test_success( 289 self, 290 mock_check_dca_metadata_path, 291 mock_read_dca_metadata_file, 292 mock_run_cert_provider_command, 293 ): 294 mock_check_dca_metadata_path.return_value = True 295 mock_read_dca_metadata_file.return_value = { 296 "cert_provider_command": ["command"] 297 } 298 mock_run_cert_provider_command.return_value = (b"cert", b"key", None) 299 has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials() 300 assert has_cert 301 assert cert == b"cert" 302 assert key == b"key" 303 assert passphrase is None 304 305 @mock.patch( 306 "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True 307 ) 308 def test_success_without_metadata(self, mock_check_dca_metadata_path): 309 mock_check_dca_metadata_path.return_value = False 310 has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials() 311 assert not has_cert 312 assert cert is None 313 assert key is None 314 assert passphrase is None 315 316 @mock.patch( 317 "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True 318 ) 319 @mock.patch( 320 "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True 321 ) 322 @mock.patch( 323 "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True 324 ) 325 def test_success_with_encrypted_key( 326 self, 327 mock_check_dca_metadata_path, 328 mock_read_dca_metadata_file, 329 mock_run_cert_provider_command, 330 ): 331 mock_check_dca_metadata_path.return_value = True 332 mock_read_dca_metadata_file.return_value = { 333 "cert_provider_command": ["command"] 334 } 335 mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase") 336 has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials( 337 generate_encrypted_key=True 338 ) 339 assert has_cert 340 assert cert == b"cert" 341 assert key == b"key" 342 assert passphrase == b"passphrase" 343 mock_run_cert_provider_command.assert_called_once_with( 344 ["command", "--with_passphrase"], expect_encrypted_key=True 345 ) 346 347 @mock.patch( 348 "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True 349 ) 350 @mock.patch( 351 "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True 352 ) 353 def test_missing_cert_command( 354 self, mock_check_dca_metadata_path, mock_read_dca_metadata_file 355 ): 356 mock_check_dca_metadata_path.return_value = True 357 mock_read_dca_metadata_file.return_value = {} 358 with pytest.raises(exceptions.ClientCertError): 359 _mtls_helper.get_client_ssl_credentials() 360 361 @mock.patch( 362 "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True 363 ) 364 @mock.patch( 365 "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True 366 ) 367 @mock.patch( 368 "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True 369 ) 370 def test_customize_context_aware_metadata_path( 371 self, 372 mock_check_dca_metadata_path, 373 mock_read_dca_metadata_file, 374 mock_run_cert_provider_command, 375 ): 376 context_aware_metadata_path = "/path/to/metata/data" 377 mock_check_dca_metadata_path.return_value = context_aware_metadata_path 378 mock_read_dca_metadata_file.return_value = { 379 "cert_provider_command": ["command"] 380 } 381 mock_run_cert_provider_command.return_value = (b"cert", b"key", None) 382 383 has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials( 384 context_aware_metadata_path=context_aware_metadata_path 385 ) 386 387 assert has_cert 388 assert cert == b"cert" 389 assert key == b"key" 390 assert passphrase is None 391 mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path) 392 mock_read_dca_metadata_file.assert_called_with(context_aware_metadata_path) 393 394 395class TestGetClientCertAndKey(object): 396 def test_callback_success(self): 397 callback = mock.Mock() 398 callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes) 399 400 found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key(callback) 401 assert found_cert_key 402 assert cert == pytest.public_cert_bytes 403 assert key == pytest.private_key_bytes 404 405 @mock.patch( 406 "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True 407 ) 408 def test_use_metadata(self, mock_get_client_ssl_credentials): 409 mock_get_client_ssl_credentials.return_value = ( 410 True, 411 pytest.public_cert_bytes, 412 pytest.private_key_bytes, 413 None, 414 ) 415 416 found_cert_key, cert, key = _mtls_helper.get_client_cert_and_key() 417 assert found_cert_key 418 assert cert == pytest.public_cert_bytes 419 assert key == pytest.private_key_bytes 420 421 422class TestDecryptPrivateKey(object): 423 def test_success(self): 424 decrypted_key = _mtls_helper.decrypt_private_key( 425 ENCRYPTED_EC_PRIVATE_KEY, PASSPHRASE_VALUE 426 ) 427 private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, decrypted_key) 428 public_key = crypto.load_publickey(crypto.FILETYPE_PEM, EC_PUBLIC_KEY) 429 x509 = crypto.X509() 430 x509.set_pubkey(public_key) 431 432 # Test the decrypted key works by signing and verification. 433 signature = crypto.sign(private_key, b"data", "sha256") 434 crypto.verify(x509, signature, b"data", "sha256") 435 436 def test_crypto_error(self): 437 with pytest.raises(crypto.Error): 438 _mtls_helper.decrypt_private_key( 439 ENCRYPTED_EC_PRIVATE_KEY, b"wrong_password" 440 ) 441