• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Python KMS AEAD key manager."""
16
17import abc
18
19from typing import List, Type
20
21from tink.proto import kms_aead_pb2
22from tink.proto import kms_envelope_pb2
23from tink.proto import tink_pb2
24from tink import core
25from tink.aead import _aead
26from tink.aead import _kms_envelope_aead
27
28
29class KmsClient(metaclass=abc.ABCMeta):
30
31  @abc.abstractmethod
32  def does_support(self, key_uri: str) -> bool:
33    raise NotImplementedError()
34
35  @abc.abstractmethod
36  def get_aead(self, key_uri: str) -> _aead.Aead:
37    raise NotImplementedError()
38
39
40_kms_clients: List[KmsClient] = []
41
42
43def register_kms_client(client: KmsClient) -> None:
44  """Tink-internal function to register kms clients."""
45  _kms_clients.append(client)
46
47
48def reset_kms_clients() -> None:
49  """Removes all previously registered KMS clients. Used in tests."""
50  _kms_clients.clear()
51
52
53def _kms_client_from_uri(key_uri: str) -> KmsClient:
54  """Tink-internal function to get a KmsClient."""
55  for client in _kms_clients:
56    if client.does_support(key_uri):
57      return client
58  raise core.TinkError('No KMS client does support: ' + key_uri)
59
60
61_KMS_AEAD_KEY_TYPE_URL = 'type.googleapis.com/google.crypto.tink.KmsAeadKey'
62_KMS_ENVELOPE_AEAD_KEY_TYPE_URL = (
63    'type.googleapis.com/google.crypto.tink.KmsEnvelopeAeadKey'
64)
65
66
67class KmsAeadKeyManager(core.KeyManager[_aead.Aead]):
68  """KmsAeadKeyManager."""
69
70  def primitive_class(self) -> Type[_aead.Aead]:
71    return _aead.Aead
72
73  def primitive(self, key_data: tink_pb2.KeyData) -> _aead.Aead:
74    if key_data.type_url != _KMS_AEAD_KEY_TYPE_URL:
75      raise core.TinkError('wrong key type: ' + key_data.type_url)
76    kms_key = kms_aead_pb2.KmsAeadKey.FromString(key_data.value)
77    client = _kms_client_from_uri(kms_key.params.key_uri)
78    return client.get_aead(key_uri=kms_key.params.key_uri)
79
80  def key_type(self) -> str:
81    return _KMS_AEAD_KEY_TYPE_URL
82
83  def new_key_data(
84      self, key_template: tink_pb2.KeyTemplate
85  ) -> tink_pb2.KeyData:
86    if key_template.type_url != _KMS_AEAD_KEY_TYPE_URL:
87      raise core.TinkError('wrong key type: ' + key_template.type_url)
88    key = kms_aead_pb2.KmsAeadKey(
89        version=0,
90        params=kms_aead_pb2.KmsAeadKeyFormat.FromString(key_template.value),
91    )
92    return tink_pb2.KeyData(
93        type_url=_KMS_AEAD_KEY_TYPE_URL,
94        value=key.SerializeToString(),
95        key_material_type=tink_pb2.KeyData.REMOTE,
96    )
97
98  def does_support(self, type_url: str) -> bool:
99    return self.key_type() == type_url
100
101
102class KmsEnvelopeAeadKeyManager(core.KeyManager[_aead.Aead]):
103  """KmsEnvelopeAeadKeyManager."""
104
105  def primitive_class(self) -> Type[_aead.Aead]:
106    return _aead.Aead
107
108  def primitive(self, key_data: tink_pb2.KeyData) -> _aead.Aead:
109    if key_data.type_url != _KMS_ENVELOPE_AEAD_KEY_TYPE_URL:
110      raise core.TinkError('wrong key type: ' + key_data.type_url)
111    env_key = kms_envelope_pb2.KmsEnvelopeAeadKey.FromString(key_data.value)
112    client = _kms_client_from_uri(env_key.params.kek_uri)
113
114    return _kms_envelope_aead.KmsEnvelopeAead(
115        env_key.params.dek_template,
116        client.get_aead(key_uri=env_key.params.kek_uri),
117    )
118
119  def key_type(self) -> str:
120    return _KMS_ENVELOPE_AEAD_KEY_TYPE_URL
121
122  def new_key_data(
123      self, key_template: tink_pb2.KeyTemplate
124  ) -> tink_pb2.KeyData:
125    if key_template.type_url != _KMS_ENVELOPE_AEAD_KEY_TYPE_URL:
126      raise core.TinkError('wrong key type: ' + key_template.type_url)
127    params = kms_envelope_pb2.KmsEnvelopeAeadKeyFormat.FromString(
128        key_template.value
129    )
130    if not _kms_envelope_aead.is_supported_dek_key_type(
131        params.dek_template.type_url
132    ):
133      raise core.TinkError(
134          'Unsupported DEK key type: %s' % key_template.type_url
135      )
136    env_key = kms_envelope_pb2.KmsEnvelopeAeadKey(
137        version=0,
138        params=params,
139    )
140    return tink_pb2.KeyData(
141        type_url=_KMS_ENVELOPE_AEAD_KEY_TYPE_URL,
142        value=env_key.SerializeToString(),
143        key_material_type=tink_pb2.KeyData.REMOTE,
144    )
145
146  def does_support(self, type_url: str) -> bool:
147    return self.key_type() == type_url
148