• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper functions for getting mTLS cert and key."""
16
17import json
18import logging
19from os import path
20import re
21import subprocess
22
23import six
24
25from google.auth import exceptions
26
27CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
28_CERT_PROVIDER_COMMAND = "cert_provider_command"
29_CERT_REGEX = re.compile(
30    b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
31)
32
33# support various format of key files, e.g.
34# "-----BEGIN PRIVATE KEY-----...",
35# "-----BEGIN EC PRIVATE KEY-----...",
36# "-----BEGIN RSA PRIVATE KEY-----..."
37# "-----BEGIN ENCRYPTED PRIVATE KEY-----"
38_KEY_REGEX = re.compile(
39    b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?",
40    re.DOTALL,
41)
42
43_LOGGER = logging.getLogger(__name__)
44
45
46_PASSPHRASE_REGEX = re.compile(
47    b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL
48)
49
50
51def _check_dca_metadata_path(metadata_path):
52    """Checks for context aware metadata. If it exists, returns the absolute path;
53    otherwise returns None.
54
55    Args:
56        metadata_path (str): context aware metadata path.
57
58    Returns:
59        str: absolute path if exists and None otherwise.
60    """
61    metadata_path = path.expanduser(metadata_path)
62    if not path.exists(metadata_path):
63        _LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path)
64        return None
65    return metadata_path
66
67
68def _read_dca_metadata_file(metadata_path):
69    """Loads context aware metadata from the given path.
70
71    Args:
72        metadata_path (str): context aware metadata path.
73
74    Returns:
75        Dict[str, str]: The metadata.
76
77    Raises:
78        google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON.
79    """
80    try:
81        with open(metadata_path) as f:
82            metadata = json.load(f)
83    except ValueError as caught_exc:
84        new_exc = exceptions.ClientCertError(caught_exc)
85        six.raise_from(new_exc, caught_exc)
86
87    return metadata
88
89
90def _run_cert_provider_command(command, expect_encrypted_key=False):
91    """Run the provided command, and return client side mTLS cert, key and
92    passphrase.
93
94    Args:
95        command (List[str]): cert provider command.
96        expect_encrypted_key (bool): If encrypted private key is expected.
97
98    Returns:
99        Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key
100            bytes in PEM format and passphrase bytes.
101
102    Raises:
103        google.auth.exceptions.ClientCertError: if problems occurs when running
104            the cert provider command or generating cert, key and passphrase.
105    """
106    try:
107        process = subprocess.Popen(
108            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
109        )
110        stdout, stderr = process.communicate()
111    except OSError as caught_exc:
112        new_exc = exceptions.ClientCertError(caught_exc)
113        six.raise_from(new_exc, caught_exc)
114
115    # Check cert provider command execution error.
116    if process.returncode != 0:
117        raise exceptions.ClientCertError(
118            "Cert provider command returns non-zero status code %s" % process.returncode
119        )
120
121    # Extract certificate (chain), key and passphrase.
122    cert_match = re.findall(_CERT_REGEX, stdout)
123    if len(cert_match) != 1:
124        raise exceptions.ClientCertError("Client SSL certificate is missing or invalid")
125    key_match = re.findall(_KEY_REGEX, stdout)
126    if len(key_match) != 1:
127        raise exceptions.ClientCertError("Client SSL key is missing or invalid")
128    passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout)
129
130    if expect_encrypted_key:
131        if len(passphrase_match) != 1:
132            raise exceptions.ClientCertError("Passphrase is missing or invalid")
133        if b"ENCRYPTED" not in key_match[0]:
134            raise exceptions.ClientCertError("Encrypted private key is expected")
135        return cert_match[0], key_match[0], passphrase_match[0].strip()
136
137    if b"ENCRYPTED" in key_match[0]:
138        raise exceptions.ClientCertError("Encrypted private key is not expected")
139    if len(passphrase_match) > 0:
140        raise exceptions.ClientCertError("Passphrase is not expected")
141    return cert_match[0], key_match[0], None
142
143
144def get_client_ssl_credentials(
145    generate_encrypted_key=False,
146    context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
147):
148    """Returns the client side certificate, private key and passphrase.
149
150    Args:
151        generate_encrypted_key (bool): If set to True, encrypted private key
152            and passphrase will be generated; otherwise, unencrypted private key
153            will be generated and passphrase will be None.
154        context_aware_metadata_path (str): The context_aware_metadata.json file path.
155
156    Returns:
157        Tuple[bool, bytes, bytes, bytes]:
158            A boolean indicating if cert, key and passphrase are obtained, the
159            cert bytes and key bytes both in PEM format, and passphrase bytes.
160
161    Raises:
162        google.auth.exceptions.ClientCertError: if problems occurs when getting
163            the cert, key and passphrase.
164    """
165    metadata_path = _check_dca_metadata_path(context_aware_metadata_path)
166
167    if metadata_path:
168        metadata_json = _read_dca_metadata_file(metadata_path)
169
170        if _CERT_PROVIDER_COMMAND not in metadata_json:
171            raise exceptions.ClientCertError("Cert provider command is not found")
172
173        command = metadata_json[_CERT_PROVIDER_COMMAND]
174
175        if generate_encrypted_key and "--with_passphrase" not in command:
176            command.append("--with_passphrase")
177
178        # Execute the command.
179        cert, key, passphrase = _run_cert_provider_command(
180            command, expect_encrypted_key=generate_encrypted_key
181        )
182        return True, cert, key, passphrase
183
184    return False, None, None, None
185
186
187def get_client_cert_and_key(client_cert_callback=None):
188    """Returns the client side certificate and private key. The function first
189    tries to get certificate and key from client_cert_callback; if the callback
190    is None or doesn't provide certificate and key, the function tries application
191    default SSL credentials.
192
193    Args:
194        client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
195            optional callback which returns client certificate bytes and private
196            key bytes both in PEM format.
197
198    Returns:
199        Tuple[bool, bytes, bytes]:
200            A boolean indicating if cert and key are obtained, the cert bytes
201            and key bytes both in PEM format.
202
203    Raises:
204        google.auth.exceptions.ClientCertError: if problems occurs when getting
205            the cert and key.
206    """
207    if client_cert_callback:
208        cert, key = client_cert_callback()
209        return True, cert, key
210
211    has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
212    return has_cert, cert, key
213
214
215def decrypt_private_key(key, passphrase):
216    """A helper function to decrypt the private key with the given passphrase.
217    google-auth library doesn't support passphrase protected private key for
218    mutual TLS channel. This helper function can be used to decrypt the
219    passphrase protected private key in order to estalish mutual TLS channel.
220
221    For example, if you have a function which produces client cert, passphrase
222    protected private key and passphrase, you can convert it to a client cert
223    callback function accepted by google-auth::
224
225        from google.auth.transport import _mtls_helper
226
227        def your_client_cert_function():
228            return cert, encrypted_key, passphrase
229
230        # callback accepted by google-auth for mutual TLS channel.
231        def client_cert_callback():
232            cert, encrypted_key, passphrase = your_client_cert_function()
233            decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key,
234                passphrase)
235            return cert, decrypted_key
236
237    Args:
238        key (bytes): The private key bytes in PEM format.
239        passphrase (bytes): The passphrase bytes.
240
241    Returns:
242        bytes: The decrypted private key in PEM format.
243
244    Raises:
245        ImportError: If pyOpenSSL is not installed.
246        OpenSSL.crypto.Error: If there is any problem decrypting the private key.
247    """
248    from OpenSSL import crypto
249
250    # First convert encrypted_key_bytes to PKey object
251    pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase)
252
253    # Then dump the decrypted key bytes
254    return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
255