1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Helper functions for getting mTLS cert and key.""" 16 17import json 18import logging 19from os import path 20import re 21import subprocess 22 23import six 24 25from google.auth import exceptions 26 27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 28_CERT_PROVIDER_COMMAND = "cert_provider_command" 29_CERT_REGEX = re.compile( 30 b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 31) 32 33# support various format of key files, e.g. 34# "-----BEGIN PRIVATE KEY-----...", 35# "-----BEGIN EC PRIVATE KEY-----...", 36# "-----BEGIN RSA PRIVATE KEY-----..." 37# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 38_KEY_REGEX = re.compile( 39 b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 40 re.DOTALL, 41) 42 43_LOGGER = logging.getLogger(__name__) 44 45 46_PASSPHRASE_REGEX = re.compile( 47 b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 48) 49 50 51def _check_dca_metadata_path(metadata_path): 52 """Checks for context aware metadata. If it exists, returns the absolute path; 53 otherwise returns None. 54 55 Args: 56 metadata_path (str): context aware metadata path. 57 58 Returns: 59 str: absolute path if exists and None otherwise. 60 """ 61 metadata_path = path.expanduser(metadata_path) 62 if not path.exists(metadata_path): 63 _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path) 64 return None 65 return metadata_path 66 67 68def _read_dca_metadata_file(metadata_path): 69 """Loads context aware metadata from the given path. 70 71 Args: 72 metadata_path (str): context aware metadata path. 73 74 Returns: 75 Dict[str, str]: The metadata. 76 77 Raises: 78 google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON. 79 """ 80 try: 81 with open(metadata_path) as f: 82 metadata = json.load(f) 83 except ValueError as caught_exc: 84 new_exc = exceptions.ClientCertError(caught_exc) 85 six.raise_from(new_exc, caught_exc) 86 87 return metadata 88 89 90def _run_cert_provider_command(command, expect_encrypted_key=False): 91 """Run the provided command, and return client side mTLS cert, key and 92 passphrase. 93 94 Args: 95 command (List[str]): cert provider command. 96 expect_encrypted_key (bool): If encrypted private key is expected. 97 98 Returns: 99 Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 100 bytes in PEM format and passphrase bytes. 101 102 Raises: 103 google.auth.exceptions.ClientCertError: if problems occurs when running 104 the cert provider command or generating cert, key and passphrase. 105 """ 106 try: 107 process = subprocess.Popen( 108 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 109 ) 110 stdout, stderr = process.communicate() 111 except OSError as caught_exc: 112 new_exc = exceptions.ClientCertError(caught_exc) 113 six.raise_from(new_exc, caught_exc) 114 115 # Check cert provider command execution error. 116 if process.returncode != 0: 117 raise exceptions.ClientCertError( 118 "Cert provider command returns non-zero status code %s" % process.returncode 119 ) 120 121 # Extract certificate (chain), key and passphrase. 122 cert_match = re.findall(_CERT_REGEX, stdout) 123 if len(cert_match) != 1: 124 raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 125 key_match = re.findall(_KEY_REGEX, stdout) 126 if len(key_match) != 1: 127 raise exceptions.ClientCertError("Client SSL key is missing or invalid") 128 passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 129 130 if expect_encrypted_key: 131 if len(passphrase_match) != 1: 132 raise exceptions.ClientCertError("Passphrase is missing or invalid") 133 if b"ENCRYPTED" not in key_match[0]: 134 raise exceptions.ClientCertError("Encrypted private key is expected") 135 return cert_match[0], key_match[0], passphrase_match[0].strip() 136 137 if b"ENCRYPTED" in key_match[0]: 138 raise exceptions.ClientCertError("Encrypted private key is not expected") 139 if len(passphrase_match) > 0: 140 raise exceptions.ClientCertError("Passphrase is not expected") 141 return cert_match[0], key_match[0], None 142 143 144def get_client_ssl_credentials( 145 generate_encrypted_key=False, 146 context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 147): 148 """Returns the client side certificate, private key and passphrase. 149 150 Args: 151 generate_encrypted_key (bool): If set to True, encrypted private key 152 and passphrase will be generated; otherwise, unencrypted private key 153 will be generated and passphrase will be None. 154 context_aware_metadata_path (str): The context_aware_metadata.json file path. 155 156 Returns: 157 Tuple[bool, bytes, bytes, bytes]: 158 A boolean indicating if cert, key and passphrase are obtained, the 159 cert bytes and key bytes both in PEM format, and passphrase bytes. 160 161 Raises: 162 google.auth.exceptions.ClientCertError: if problems occurs when getting 163 the cert, key and passphrase. 164 """ 165 metadata_path = _check_dca_metadata_path(context_aware_metadata_path) 166 167 if metadata_path: 168 metadata_json = _read_dca_metadata_file(metadata_path) 169 170 if _CERT_PROVIDER_COMMAND not in metadata_json: 171 raise exceptions.ClientCertError("Cert provider command is not found") 172 173 command = metadata_json[_CERT_PROVIDER_COMMAND] 174 175 if generate_encrypted_key and "--with_passphrase" not in command: 176 command.append("--with_passphrase") 177 178 # Execute the command. 179 cert, key, passphrase = _run_cert_provider_command( 180 command, expect_encrypted_key=generate_encrypted_key 181 ) 182 return True, cert, key, passphrase 183 184 return False, None, None, None 185 186 187def get_client_cert_and_key(client_cert_callback=None): 188 """Returns the client side certificate and private key. The function first 189 tries to get certificate and key from client_cert_callback; if the callback 190 is None or doesn't provide certificate and key, the function tries application 191 default SSL credentials. 192 193 Args: 194 client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 195 optional callback which returns client certificate bytes and private 196 key bytes both in PEM format. 197 198 Returns: 199 Tuple[bool, bytes, bytes]: 200 A boolean indicating if cert and key are obtained, the cert bytes 201 and key bytes both in PEM format. 202 203 Raises: 204 google.auth.exceptions.ClientCertError: if problems occurs when getting 205 the cert and key. 206 """ 207 if client_cert_callback: 208 cert, key = client_cert_callback() 209 return True, cert, key 210 211 has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 212 return has_cert, cert, key 213 214 215def decrypt_private_key(key, passphrase): 216 """A helper function to decrypt the private key with the given passphrase. 217 google-auth library doesn't support passphrase protected private key for 218 mutual TLS channel. This helper function can be used to decrypt the 219 passphrase protected private key in order to estalish mutual TLS channel. 220 221 For example, if you have a function which produces client cert, passphrase 222 protected private key and passphrase, you can convert it to a client cert 223 callback function accepted by google-auth:: 224 225 from google.auth.transport import _mtls_helper 226 227 def your_client_cert_function(): 228 return cert, encrypted_key, passphrase 229 230 # callback accepted by google-auth for mutual TLS channel. 231 def client_cert_callback(): 232 cert, encrypted_key, passphrase = your_client_cert_function() 233 decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 234 passphrase) 235 return cert, decrypted_key 236 237 Args: 238 key (bytes): The private key bytes in PEM format. 239 passphrase (bytes): The passphrase bytes. 240 241 Returns: 242 bytes: The decrypted private key in PEM format. 243 244 Raises: 245 ImportError: If pyOpenSSL is not installed. 246 OpenSSL.crypto.Error: If there is any problem decrypting the private key. 247 """ 248 from OpenSSL import crypto 249 250 # First convert encrypted_key_bytes to PKey object 251 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 252 253 # Then dump the decrypted key bytes 254 return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) 255