1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) 2007, 2008, 2009 Arnaud Ebalard 5# 2015, 2016, 2017 Maxence Tury 6 7""" 8Authenticated Encryption with Associated Data ciphers. 9 10RFC 5288 introduces new ciphersuites for TLS 1.2 which are based on AES in 11Galois/Counter Mode (GCM). RFC 6655 in turn introduces AES_CCM ciphersuites. 12The related AEAD algorithms are defined in RFC 5116. Later on, RFC 7905 13introduced cipher suites based on a ChaCha20-Poly1305 construction. 14""" 15 16import struct 17 18from scapy.config import conf 19from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp, pkcs_os2ip 20from scapy.layers.tls.crypto.common import CipherError 21from scapy.utils import strxor 22 23if conf.crypto_valid: 24 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes # noqa: E501 25 from cryptography.hazmat.backends import default_backend 26 from cryptography.exceptions import InvalidTag 27if conf.crypto_valid_advanced: 28 from cryptography.hazmat.primitives.ciphers.aead import (AESCCM, 29 ChaCha20Poly1305) 30else: 31 class AESCCM: 32 pass 33 34_tls_aead_cipher_algs = {} 35 36 37class _AEADCipherMetaclass(type): 38 """ 39 Cipher classes are automatically registered through this metaclass. 40 Furthermore, their name attribute is extracted from their class name. 41 """ 42 def __new__(cls, ciph_name, bases, dct): 43 if not ciph_name.startswith("_AEADCipher"): 44 dct["name"] = ciph_name[7:] # remove leading "Cipher_" 45 the_class = super(_AEADCipherMetaclass, cls).__new__(cls, ciph_name, 46 bases, dct) 47 if not ciph_name.startswith("_AEADCipher"): 48 _tls_aead_cipher_algs[ciph_name[7:]] = the_class 49 return the_class 50 51 52class AEADTagError(Exception): 53 """ 54 Raised when MAC verification fails. 55 """ 56 pass 57 58 59class _AEADCipher(metaclass=_AEADCipherMetaclass): 60 """ 61 The hasattr(self, "pc_cls") tests correspond to the legacy API of the 62 crypto library. With cryptography v2.0, both CCM and GCM should follow 63 the else case. 64 65 Note that the "fixed_iv" in TLS RFCs is called "salt" in the AEAD RFC 5116. 66 """ 67 type = "aead" 68 fixed_iv_len = 4 69 nonce_explicit_len = 8 70 71 def __init__(self, key=None, fixed_iv=None, nonce_explicit=None): 72 """ 73 'key' and 'fixed_iv' are to be provided as strings, whereas the internal # noqa: E501 74 'nonce_explicit' is an integer (it is simpler for incrementation). 75 !! The whole 'nonce' may be called IV in certain RFCs. 76 """ 77 self.ready = {"key": True, "fixed_iv": True, "nonce_explicit": True} 78 if key is None: 79 self.ready["key"] = False 80 key = b"\0" * self.key_len 81 if fixed_iv is None: 82 self.ready["fixed_iv"] = False 83 fixed_iv = b"\0" * self.fixed_iv_len 84 if nonce_explicit is None: 85 self.ready["nonce_explicit"] = False 86 nonce_explicit = 0 87 88 if isinstance(nonce_explicit, str): 89 nonce_explicit = pkcs_os2ip(nonce_explicit) 90 91 # we use super() in order to avoid any deadlock with __setattr__ 92 super(_AEADCipher, self).__setattr__("key", key) 93 super(_AEADCipher, self).__setattr__("fixed_iv", fixed_iv) 94 super(_AEADCipher, self).__setattr__("nonce_explicit", nonce_explicit) 95 96 if hasattr(self, "pc_cls"): 97 if isinstance(self.pc_cls, AESCCM): 98 self._cipher = Cipher(self.pc_cls(key), 99 self.pc_cls_mode(self._get_nonce()), 100 backend=default_backend(), 101 tag_length=self.tag_len) 102 else: 103 self._cipher = Cipher(self.pc_cls(key), 104 self.pc_cls_mode(self._get_nonce()), 105 backend=default_backend()) 106 else: 107 self._cipher = self.cipher_cls(key) 108 109 def __setattr__(self, name, val): 110 if name == "key": 111 if self._cipher is not None: 112 if hasattr(self, "pc_cls"): 113 self._cipher.algorithm.key = val 114 else: 115 self._cipher._key = val 116 self.ready["key"] = True 117 elif name == "fixed_iv": 118 self.ready["fixed_iv"] = True 119 elif name == "nonce_explicit": 120 if isinstance(val, str): 121 val = pkcs_os2ip(val) 122 self.ready["nonce_explicit"] = True 123 super(_AEADCipher, self).__setattr__(name, val) 124 125 def _get_nonce(self): 126 return (self.fixed_iv + 127 pkcs_i2osp(self.nonce_explicit, self.nonce_explicit_len)) 128 129 def _update_nonce_explicit(self): 130 """ 131 Increment the explicit nonce while avoiding any overflow. 132 """ 133 ne = self.nonce_explicit + 1 134 self.nonce_explicit = ne % 2**(self.nonce_explicit_len * 8) 135 136 def auth_encrypt(self, P, A, seq_num=None): 137 """ 138 Encrypt the data then prepend the explicit part of the nonce. The 139 authentication tag is directly appended with the most recent crypto 140 API. Additional data may be authenticated without encryption (as A). 141 142 The 'seq_num' should never be used here, it is only a safeguard needed 143 because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py 144 actually is a _AEADCipher_TLS13 (even though others are not). 145 """ 146 if False in self.ready.values(): 147 raise CipherError(P, A) 148 149 if hasattr(self, "pc_cls"): 150 self._cipher.mode._initialization_vector = self._get_nonce() 151 self._cipher.mode._tag = None 152 encryptor = self._cipher.encryptor() 153 encryptor.authenticate_additional_data(A) 154 res = encryptor.update(P) + encryptor.finalize() 155 res += encryptor.tag 156 else: 157 res = self._cipher.encrypt(self._get_nonce(), P, A) 158 159 nonce_explicit = pkcs_i2osp(self.nonce_explicit, 160 self.nonce_explicit_len) 161 self._update_nonce_explicit() 162 return nonce_explicit + res 163 164 def auth_decrypt(self, A, C, seq_num=None, add_length=True): 165 """ 166 Decrypt the data and authenticate the associated data (i.e. A). 167 If the verification fails, an AEADTagError is raised. It is the user's 168 responsibility to catch it if deemed useful. If we lack the key, we 169 raise a CipherError which contains the encrypted input. 170 171 Note that we add the TLSCiphertext length to A although we're supposed 172 to add the TLSCompressed length. Fortunately, they are the same, 173 but the specifications actually messed up here. :'( 174 175 The 'add_length' switch should always be True for TLS, but we provide 176 it anyway (mostly for test cases, hum). 177 178 The 'seq_num' should never be used here, it is only a safeguard needed 179 because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py 180 actually is a _AEADCipher_TLS13 (even though others are not). 181 """ 182 nonce_explicit_str, C, mac = (C[:self.nonce_explicit_len], 183 C[self.nonce_explicit_len:-self.tag_len], 184 C[-self.tag_len:]) 185 186 if False in self.ready.values(): 187 raise CipherError(nonce_explicit_str, C, mac) 188 189 self.nonce_explicit = pkcs_os2ip(nonce_explicit_str) 190 if add_length: 191 A += struct.pack("!H", len(C)) 192 193 if hasattr(self, "pc_cls"): 194 self._cipher.mode._initialization_vector = self._get_nonce() 195 self._cipher.mode._tag = mac 196 decryptor = self._cipher.decryptor() 197 decryptor.authenticate_additional_data(A) 198 P = decryptor.update(C) 199 try: 200 decryptor.finalize() 201 except InvalidTag: 202 raise AEADTagError(nonce_explicit_str, P, mac) 203 else: 204 try: 205 P = self._cipher.decrypt(self._get_nonce(), C + mac, A) 206 except InvalidTag: 207 raise AEADTagError(nonce_explicit_str, 208 "<unauthenticated data>", 209 mac) 210 return nonce_explicit_str, P, mac 211 212 def snapshot(self): 213 c = self.__class__(self.key, self.fixed_iv, self.nonce_explicit) 214 c.ready = self.ready.copy() 215 return c 216 217 218if conf.crypto_valid: 219 class Cipher_AES_128_GCM(_AEADCipher): 220 # XXX use the new AESGCM if available 221 # if conf.crypto_valid_advanced: 222 # cipher_cls = AESGCM 223 # else: 224 pc_cls = algorithms.AES 225 pc_cls_mode = modes.GCM 226 key_len = 16 227 tag_len = 16 228 229 class Cipher_AES_256_GCM(Cipher_AES_128_GCM): 230 key_len = 32 231 232 233if conf.crypto_valid_advanced: 234 class Cipher_AES_128_CCM(_AEADCipher): 235 cipher_cls = AESCCM 236 key_len = 16 237 tag_len = 16 238 239 class Cipher_AES_256_CCM(Cipher_AES_128_CCM): 240 key_len = 32 241 242 class Cipher_AES_128_CCM_8(Cipher_AES_128_CCM): 243 tag_len = 8 244 245 class Cipher_AES_256_CCM_8(Cipher_AES_128_CCM_8): 246 key_len = 32 247 248 249class _AEADCipher_TLS13(metaclass=_AEADCipherMetaclass): 250 """ 251 The hasattr(self, "pc_cls") enable support for the legacy implementation 252 of GCM in the cryptography library. They should not be used, and might 253 eventually be removed, with cryptography v2.0. XXX 254 """ 255 type = "aead" 256 257 def __init__(self, key=None, fixed_iv=None, nonce_explicit=None): 258 """ 259 'key' and 'fixed_iv' are to be provided as strings. This IV never 260 changes: it is either the client_write_IV or server_write_IV. 261 262 Note that 'nonce_explicit' is never used. It is only a safeguard for a 263 call in session.py to the TLS 1.2/ChaCha20Poly1305 case (see RFC 7905). 264 """ 265 self.ready = {"key": True, "fixed_iv": True} 266 if key is None: 267 self.ready["key"] = False 268 key = b"\0" * self.key_len 269 if fixed_iv is None: 270 self.ready["fixed_iv"] = False 271 fixed_iv = b"\0" * self.fixed_iv_len 272 273 # we use super() in order to avoid any deadlock with __setattr__ 274 super(_AEADCipher_TLS13, self).__setattr__("key", key) 275 super(_AEADCipher_TLS13, self).__setattr__("fixed_iv", fixed_iv) 276 277 if hasattr(self, "pc_cls"): 278 if isinstance(self.pc_cls, AESCCM): 279 self._cipher = Cipher(self.pc_cls(key), 280 self.pc_cls_mode(fixed_iv), 281 backend=default_backend(), 282 tag_length=self.tag_len) 283 else: 284 self._cipher = Cipher(self.pc_cls(key), 285 self.pc_cls_mode(fixed_iv), 286 backend=default_backend()) 287 else: 288 if self.cipher_cls == ChaCha20Poly1305: 289 # ChaCha20Poly1305 doesn't have a tag_length argument... 290 self._cipher = self.cipher_cls(key) 291 else: 292 self._cipher = self.cipher_cls(key, tag_length=self.tag_len) 293 294 def __setattr__(self, name, val): 295 if name == "key": 296 if self._cipher is not None: 297 if hasattr(self, "pc_cls"): 298 self._cipher.algorithm.key = val 299 else: 300 self._cipher._key = val 301 self.ready["key"] = True 302 elif name == "fixed_iv": 303 self.ready["fixed_iv"] = True 304 super(_AEADCipher_TLS13, self).__setattr__(name, val) 305 306 def _get_nonce(self, seq_num): 307 padlen = self.fixed_iv_len - len(seq_num) 308 padded_seq_num = b"\x00" * padlen + seq_num 309 return strxor(padded_seq_num, self.fixed_iv) 310 311 def auth_encrypt(self, P, A, seq_num): 312 """ 313 Encrypt the data, and append the computed authentication code. 314 The additional data for TLS 1.3 is the record header. 315 316 Note that the cipher's authentication tag must be None when encrypting. 317 """ 318 if False in self.ready.values(): 319 raise CipherError(P, A) 320 321 if hasattr(self, "pc_cls"): 322 self._cipher.mode._tag = None 323 self._cipher.mode._initialization_vector = self._get_nonce(seq_num) 324 encryptor = self._cipher.encryptor() 325 encryptor.authenticate_additional_data(A) 326 res = encryptor.update(P) + encryptor.finalize() 327 res += encryptor.tag 328 else: 329 if (conf.crypto_valid_advanced and 330 isinstance(self._cipher, AESCCM)): 331 res = self._cipher.encrypt(self._get_nonce(seq_num), P, A) 332 else: 333 res = self._cipher.encrypt(self._get_nonce(seq_num), P, A) 334 return res 335 336 def auth_decrypt(self, A, C, seq_num): 337 """ 338 Decrypt the data and verify the authentication code (in this order). 339 If the verification fails, an AEADTagError is raised. It is the user's 340 responsibility to catch it if deemed useful. If we lack the key, we 341 raise a CipherError which contains the encrypted input. 342 """ 343 C, mac = C[:-self.tag_len], C[-self.tag_len:] 344 if False in self.ready.values(): 345 raise CipherError(C, mac) 346 347 if hasattr(self, "pc_cls"): 348 self._cipher.mode._initialization_vector = self._get_nonce(seq_num) 349 self._cipher.mode._tag = mac 350 decryptor = self._cipher.decryptor() 351 decryptor.authenticate_additional_data(A) 352 P = decryptor.update(C) 353 try: 354 decryptor.finalize() 355 except InvalidTag: 356 raise AEADTagError(P, mac) 357 else: 358 try: 359 if (conf.crypto_valid_advanced and 360 isinstance(self._cipher, AESCCM)): 361 P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A) # noqa: E501 362 else: 363 if (conf.crypto_valid_advanced and 364 isinstance(self, Cipher_CHACHA20_POLY1305)): 365 A += struct.pack("!H", len(C)) 366 P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A) # noqa: E501 367 except InvalidTag: 368 raise AEADTagError("<unauthenticated data>", mac) 369 return P, mac 370 371 def snapshot(self): 372 c = self.__class__(self.key, self.fixed_iv) 373 c.ready = self.ready.copy() 374 return c 375 376 377if conf.crypto_valid_advanced: 378 class Cipher_CHACHA20_POLY1305_TLS13(_AEADCipher_TLS13): 379 cipher_cls = ChaCha20Poly1305 380 key_len = 32 381 tag_len = 16 382 fixed_iv_len = 12 383 nonce_explicit_len = 0 384 385 class Cipher_CHACHA20_POLY1305(Cipher_CHACHA20_POLY1305_TLS13): 386 """ 387 This TLS 1.2 cipher actually uses TLS 1.3 logic, as per RFC 7905. 388 Changes occur at the record layer (in record.py). 389 """ 390 pass 391 392 393if conf.crypto_valid: 394 class Cipher_AES_128_GCM_TLS13(_AEADCipher_TLS13): 395 # XXX use the new AESGCM if available 396 # if conf.crypto_valid_advanced: 397 # cipher_cls = AESGCM 398 # else: 399 pc_cls = algorithms.AES 400 pc_cls_mode = modes.GCM 401 key_len = 16 402 fixed_iv_len = 12 403 tag_len = 16 404 405 class Cipher_AES_256_GCM_TLS13(Cipher_AES_128_GCM_TLS13): 406 key_len = 32 407 408 409if conf.crypto_valid_advanced: 410 class Cipher_AES_128_CCM_TLS13(_AEADCipher_TLS13): 411 cipher_cls = AESCCM 412 key_len = 16 413 tag_len = 16 414 fixed_iv_len = 12 415 416 class Cipher_AES_128_CCM_8_TLS13(Cipher_AES_128_CCM_TLS13): 417 tag_len = 8 418