• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A client for AWS KMS."""
15
16import binascii
17import configparser
18import re
19
20from typing import Optional, Tuple, Any, Dict
21
22import boto3
23from botocore import exceptions
24
25import tink
26from tink import aead
27from tink.aead import _kms_aead_key_manager
28
29
30AWS_KEYURI_PREFIX = 'aws-kms://'
31
32
33def _encryption_context(associated_data: bytes) -> Dict[str, str]:
34  if associated_data:
35    hex_associated_data = binascii.hexlify(associated_data).decode('utf-8')
36    return {'associatedData': hex_associated_data}
37  else:
38    return dict()
39
40
41class _AwsKmsAead(aead.Aead):
42  """Implements the Aead interface for AWS KMS."""
43
44  def __init__(self, client: Any, key_arn: str) -> None:
45    self.client = client
46    self.key_arn = key_arn
47
48  def encrypt(self, plaintext: bytes, associated_data: bytes) -> bytes:
49    try:
50      response = self.client.encrypt(
51          KeyId=self.key_arn,
52          Plaintext=plaintext,
53          EncryptionContext=_encryption_context(associated_data),
54      )
55      return response['CiphertextBlob']
56    except exceptions.ClientError as e:
57      raise tink.TinkError(e)
58
59  def decrypt(self, ciphertext: bytes, associated_data: bytes) -> bytes:
60    try:
61      response = self.client.decrypt(
62          KeyId=self.key_arn,
63          CiphertextBlob=ciphertext,
64          EncryptionContext=_encryption_context(associated_data),
65      )
66      return response['Plaintext']
67    except exceptions.ClientError as e:
68      raise tink.TinkError(e)
69
70
71def _key_uri_to_key_arn(key_uri: str) -> str:
72  if not key_uri.startswith(AWS_KEYURI_PREFIX):
73    raise tink.TinkError('invalid key URI')
74  return key_uri[len(AWS_KEYURI_PREFIX) :]
75
76
77def _parse_config(config_path: str) -> Tuple[str, str]:
78  """Returns ('aws_access_key_id', 'aws_secret_access_key') from a config."""
79  config = configparser.ConfigParser()
80  config.read(config_path)
81  if 'default' not in config:
82    raise ValueError('invalid config: default not found')
83  default = config['default']
84  if 'aws_access_key_id' not in default:
85    raise ValueError('invalid config: aws_access_key_id not found')
86  aws_access_key_id = default['aws_access_key_id']
87  if 'aws_secret_access_key' not in default:
88    raise ValueError('invalid config: aws_secret_access_key not found')
89  aws_secret_access_key = default['aws_secret_access_key']
90  return (aws_access_key_id, aws_secret_access_key)
91
92
93def _get_region_from_key_arn(key_arn: str) -> str:
94  # An AWS key ARN is of the form
95  # arn:aws:kms:us-west-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab.
96  key_arn_parts = key_arn.split(':')
97  if len(key_arn_parts) < 6:
98    raise tink.TinkError('invalid key id')
99  return key_arn_parts[3]
100
101
102class AwsKmsClient(_kms_aead_key_manager.KmsClient):
103  """Basic AWS client for AEAD."""
104
105  def __init__(self, key_uri: Optional[str], credentials_path: Optional[str]):
106    """Creates a new AwsKmsClient that is bound to the key specified in 'key_uri'.
107
108    For more information on credentials and in which order they are loaded see
109    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html.
110
111    Args:
112      key_uri: The URI of the key the client should be bound to. If it is None
113          or empty, then the client is not bound to any particular key.
114      credentials_path: Path to the file with the access credentials. If it is
115          None or empty, then default credentials will be used.
116
117    Raises:
118      ValueError: If the path or filename of the credentials is invalid.
119      TinkError: If the key uri is not valid.
120    """
121    if not key_uri:
122      self._key_arn = None
123    else:
124      match = re.match('aws-kms://arn:aws:kms:([a-z0-9-]+):', key_uri)
125      if not match:
126        raise tink.TinkError('invalid key URI')
127      self._key_arn = _key_uri_to_key_arn(key_uri)
128    if not credentials_path:
129      self._aws_access_key_id = None
130      self._aws_secret_access_key = None
131    else:
132      aws_access_key_id, aws_secret_access_key = _parse_config(credentials_path)
133      self._aws_access_key_id = aws_access_key_id
134      self._aws_secret_access_key = aws_secret_access_key
135
136  def does_support(self, key_uri: str) -> bool:
137    """Returns true if this client supports KMS key specified in 'key_uri'.
138
139    Args:
140      key_uri: Text, URI of the key to be checked.
141
142    Returns: A boolean value which is true if the key is supported and false
143      otherwise.
144    """
145    if not key_uri.startswith(AWS_KEYURI_PREFIX):
146      return False
147    if not self._key_arn:
148      return True
149    return _key_uri_to_key_arn(key_uri) == self._key_arn
150
151  def get_aead(self, key_uri: str) -> aead.Aead:
152    """Returns an Aead-primitive backed by KMS key specified by 'key_uri'.
153
154    Args:
155      key_uri: Text, URI of the key which should be used.
156
157    Returns:
158      An AEAD primitive which uses the specified key.
159
160    Raises:
161      TinkError: If the key_uri is not supported.
162    """
163    if not self.does_support(key_uri):
164      if self._key_arn:
165        raise tink.TinkError(
166            'This client is bound to %s and cannot use key %s' %
167            (self._key_arn, key_uri))
168      raise tink.TinkError(
169          'This client does not support key %s' % key_uri)
170    key_arn = _key_uri_to_key_arn(key_uri)
171    session = boto3.session.Session(
172        aws_access_key_id=self._aws_access_key_id,
173        aws_secret_access_key=self._aws_secret_access_key,
174        region_name=_get_region_from_key_arn(key_arn),
175    )
176    return _AwsKmsAead(session.client('kms'), key_arn)
177
178  @classmethod
179  def register_client(
180      cls, key_uri: Optional[str], credentials_path: Optional[str]
181  ) -> None:
182    """Registers the KMS client internally."""
183    _kms_aead_key_manager.register_kms_client(  # pylint: disable=protected-access
184        AwsKmsClient(key_uri, credentials_path)
185    )
186