1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Python KMS AEAD key manager.""" 16 17import abc 18 19from typing import List, Type 20 21from tink.proto import kms_aead_pb2 22from tink.proto import kms_envelope_pb2 23from tink.proto import tink_pb2 24from tink import core 25from tink.aead import _aead 26from tink.aead import _kms_envelope_aead 27 28 29class KmsClient(metaclass=abc.ABCMeta): 30 31 @abc.abstractmethod 32 def does_support(self, key_uri: str) -> bool: 33 raise NotImplementedError() 34 35 @abc.abstractmethod 36 def get_aead(self, key_uri: str) -> _aead.Aead: 37 raise NotImplementedError() 38 39 40_kms_clients: List[KmsClient] = [] 41 42 43def register_kms_client(client: KmsClient) -> None: 44 """Tink-internal function to register kms clients.""" 45 _kms_clients.append(client) 46 47 48def reset_kms_clients() -> None: 49 """Removes all previously registered KMS clients. Used in tests.""" 50 _kms_clients.clear() 51 52 53def _kms_client_from_uri(key_uri: str) -> KmsClient: 54 """Tink-internal function to get a KmsClient.""" 55 for client in _kms_clients: 56 if client.does_support(key_uri): 57 return client 58 raise core.TinkError('No KMS client does support: ' + key_uri) 59 60 61_KMS_AEAD_KEY_TYPE_URL = 'type.googleapis.com/google.crypto.tink.KmsAeadKey' 62_KMS_ENVELOPE_AEAD_KEY_TYPE_URL = ( 63 'type.googleapis.com/google.crypto.tink.KmsEnvelopeAeadKey' 64) 65 66 67class KmsAeadKeyManager(core.KeyManager[_aead.Aead]): 68 """KmsAeadKeyManager.""" 69 70 def primitive_class(self) -> Type[_aead.Aead]: 71 return _aead.Aead 72 73 def primitive(self, key_data: tink_pb2.KeyData) -> _aead.Aead: 74 if key_data.type_url != _KMS_AEAD_KEY_TYPE_URL: 75 raise core.TinkError('wrong key type: ' + key_data.type_url) 76 kms_key = kms_aead_pb2.KmsAeadKey.FromString(key_data.value) 77 client = _kms_client_from_uri(kms_key.params.key_uri) 78 return client.get_aead(key_uri=kms_key.params.key_uri) 79 80 def key_type(self) -> str: 81 return _KMS_AEAD_KEY_TYPE_URL 82 83 def new_key_data( 84 self, key_template: tink_pb2.KeyTemplate 85 ) -> tink_pb2.KeyData: 86 if key_template.type_url != _KMS_AEAD_KEY_TYPE_URL: 87 raise core.TinkError('wrong key type: ' + key_template.type_url) 88 key = kms_aead_pb2.KmsAeadKey( 89 version=0, 90 params=kms_aead_pb2.KmsAeadKeyFormat.FromString(key_template.value), 91 ) 92 return tink_pb2.KeyData( 93 type_url=_KMS_AEAD_KEY_TYPE_URL, 94 value=key.SerializeToString(), 95 key_material_type=tink_pb2.KeyData.REMOTE, 96 ) 97 98 def does_support(self, type_url: str) -> bool: 99 return self.key_type() == type_url 100 101 102class KmsEnvelopeAeadKeyManager(core.KeyManager[_aead.Aead]): 103 """KmsEnvelopeAeadKeyManager.""" 104 105 def primitive_class(self) -> Type[_aead.Aead]: 106 return _aead.Aead 107 108 def primitive(self, key_data: tink_pb2.KeyData) -> _aead.Aead: 109 if key_data.type_url != _KMS_ENVELOPE_AEAD_KEY_TYPE_URL: 110 raise core.TinkError('wrong key type: ' + key_data.type_url) 111 env_key = kms_envelope_pb2.KmsEnvelopeAeadKey.FromString(key_data.value) 112 client = _kms_client_from_uri(env_key.params.kek_uri) 113 114 return _kms_envelope_aead.KmsEnvelopeAead( 115 env_key.params.dek_template, 116 client.get_aead(key_uri=env_key.params.kek_uri), 117 ) 118 119 def key_type(self) -> str: 120 return _KMS_ENVELOPE_AEAD_KEY_TYPE_URL 121 122 def new_key_data( 123 self, key_template: tink_pb2.KeyTemplate 124 ) -> tink_pb2.KeyData: 125 if key_template.type_url != _KMS_ENVELOPE_AEAD_KEY_TYPE_URL: 126 raise core.TinkError('wrong key type: ' + key_template.type_url) 127 params = kms_envelope_pb2.KmsEnvelopeAeadKeyFormat.FromString( 128 key_template.value 129 ) 130 if not _kms_envelope_aead.is_supported_dek_key_type( 131 params.dek_template.type_url 132 ): 133 raise core.TinkError( 134 'Unsupported DEK key type: %s' % key_template.type_url 135 ) 136 env_key = kms_envelope_pb2.KmsEnvelopeAeadKey( 137 version=0, 138 params=params, 139 ) 140 return tink_pb2.KeyData( 141 type_url=_KMS_ENVELOPE_AEAD_KEY_TYPE_URL, 142 value=env_key.SerializeToString(), 143 key_material_type=tink_pb2.KeyData.REMOTE, 144 ) 145 146 def does_support(self, type_url: str) -> bool: 147 return self.key_type() == type_url 148