1## This file is part of Scapy 2## Copyright (C) 2007, 2008, 2009 Arnaud Ebalard 3## 2015, 2016, 2017 Maxence Tury 4## This program is published under a GPLv2 license 5 6""" 7Authenticated Encryption with Associated Data ciphers. 8 9RFC 5288 introduces new ciphersuites for TLS 1.2 which are based on AES in 10Galois/Counter Mode (GCM). RFC 6655 in turn introduces AES_CCM ciphersuites. 11The related AEAD algorithms are defined in RFC 5116. Later on, RFC 7905 12introduced cipher suites based on a ChaCha20-Poly1305 construction. 13""" 14 15from __future__ import absolute_import 16import struct 17 18from scapy.config import conf 19from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp, pkcs_os2ip 20from scapy.layers.tls.crypto.ciphers import CipherError 21from scapy.utils import strxor 22import scapy.modules.six as six 23 24if conf.crypto_valid: 25 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 26 from cryptography.hazmat.backends import default_backend 27 from cryptography.exceptions import InvalidTag 28if conf.crypto_valid_advanced: 29 from cryptography.hazmat.primitives.ciphers.aead import (AESCCM, 30 ChaCha20Poly1305) 31 32 33_tls_aead_cipher_algs = {} 34 35class _AEADCipherMetaclass(type): 36 """ 37 Cipher classes are automatically registered through this metaclass. 38 Furthermore, their name attribute is extracted from their class name. 39 """ 40 def __new__(cls, ciph_name, bases, dct): 41 if not ciph_name.startswith("_AEADCipher"): 42 dct["name"] = ciph_name[7:] # remove leading "Cipher_" 43 the_class = super(_AEADCipherMetaclass, cls).__new__(cls, ciph_name, 44 bases, dct) 45 if not ciph_name.startswith("_AEADCipher"): 46 _tls_aead_cipher_algs[ciph_name[7:]] = the_class 47 return the_class 48 49 50class AEADTagError(Exception): 51 """ 52 Raised when MAC verification fails. 53 """ 54 pass 55 56class _AEADCipher(six.with_metaclass(_AEADCipherMetaclass, object)): 57 """ 58 The hasattr(self, "pc_cls") tests correspond to the legacy API of the 59 crypto library. With cryptography v2.0, both CCM and GCM should follow 60 the else case. 61 62 Note that the "fixed_iv" in TLS RFCs is called "salt" in the AEAD RFC 5116. 63 """ 64 type = "aead" 65 fixed_iv_len = 4 66 nonce_explicit_len = 8 67 68 def __init__(self, key=None, fixed_iv=None, nonce_explicit=None): 69 """ 70 'key' and 'fixed_iv' are to be provided as strings, whereas the internal 71 'nonce_explicit' is an integer (it is simpler for incrementation). 72 /!\ The whole 'nonce' may be called IV in certain RFCs. 73 """ 74 self.ready = {"key": True, "fixed_iv": True, "nonce_explicit": True} 75 if key is None: 76 self.ready["key"] = False 77 key = b"\0" * self.key_len 78 if fixed_iv is None: 79 self.ready["fixed_iv"] = False 80 fixed_iv = b"\0" * self.fixed_iv_len 81 if nonce_explicit is None: 82 self.ready["nonce_explicit"] = False 83 nonce_explicit = 0 84 85 if isinstance(nonce_explicit, str): 86 nonce_explicit = pkcs_os2ip(nonce_explicit) 87 88 # we use super() in order to avoid any deadlock with __setattr__ 89 super(_AEADCipher, self).__setattr__("key", key) 90 super(_AEADCipher, self).__setattr__("fixed_iv", fixed_iv) 91 super(_AEADCipher, self).__setattr__("nonce_explicit", nonce_explicit) 92 93 if hasattr(self, "pc_cls"): 94 self._cipher = Cipher(self.pc_cls(key), 95 self.pc_cls_mode(self._get_nonce()), 96 backend=default_backend()) 97 else: 98 self._cipher = self.cipher_cls(key) 99 100 def __setattr__(self, name, val): 101 if name == "key": 102 if self._cipher is not None: 103 if hasattr(self, "pc_cls"): 104 self._cipher.algorithm.key = val 105 else: 106 self._cipher._key = val 107 self.ready["key"] = True 108 elif name == "fixed_iv": 109 self.ready["fixed_iv"] = True 110 elif name == "nonce_explicit": 111 if isinstance(val, str): 112 val = pkcs_os2ip(val) 113 self.ready["nonce_explicit"] = True 114 super(_AEADCipher, self).__setattr__(name, val) 115 116 117 def _get_nonce(self): 118 return (self.fixed_iv + 119 pkcs_i2osp(self.nonce_explicit, self.nonce_explicit_len)) 120 121 def _update_nonce_explicit(self): 122 """ 123 Increment the explicit nonce while avoiding any overflow. 124 """ 125 ne = self.nonce_explicit + 1 126 self.nonce_explicit = ne % 2**(self.nonce_explicit_len*8) 127 128 def auth_encrypt(self, P, A, seq_num=None): 129 """ 130 Encrypt the data then prepend the explicit part of the nonce. The 131 authentication tag is directly appended with the most recent crypto 132 API. Additional data may be authenticated without encryption (as A). 133 134 The 'seq_num' should never be used here, it is only a safeguard needed 135 because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py 136 actually is a _AEADCipher_TLS13 (even though others are not). 137 """ 138 if False in six.itervalues(self.ready): 139 raise CipherError(P, A) 140 141 if hasattr(self, "pc_cls"): 142 self._cipher.mode._initialization_vector = self._get_nonce() 143 self._cipher.mode._tag = None 144 encryptor = self._cipher.encryptor() 145 encryptor.authenticate_additional_data(A) 146 res = encryptor.update(P) + encryptor.finalize() 147 res += encryptor.tag 148 else: 149 if isinstance(self._cipher, AESCCM): 150 res = self._cipher.encrypt(self._get_nonce(), P, A, 151 tag_length=self.tag_len) 152 else: 153 res = self._cipher.encrypt(self._get_nonce(), P, A) 154 155 nonce_explicit = pkcs_i2osp(self.nonce_explicit, 156 self.nonce_explicit_len) 157 self._update_nonce_explicit() 158 return nonce_explicit + res 159 160 def auth_decrypt(self, A, C, seq_num=None, add_length=True): 161 """ 162 Decrypt the data and authenticate the associated data (i.e. A). 163 If the verification fails, an AEADTagError is raised. It is the user's 164 responsibility to catch it if deemed useful. If we lack the key, we 165 raise a CipherError which contains the encrypted input. 166 167 Note that we add the TLSCiphertext length to A although we're supposed 168 to add the TLSCompressed length. Fortunately, they are the same, 169 but the specifications actually messed up here. :'( 170 171 The 'add_length' switch should always be True for TLS, but we provide 172 it anyway (mostly for test cases, hum). 173 174 The 'seq_num' should never be used here, it is only a safeguard needed 175 because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py 176 actually is a _AEADCipher_TLS13 (even though others are not). 177 """ 178 nonce_explicit_str, C, mac = (C[:self.nonce_explicit_len], 179 C[self.nonce_explicit_len:-self.tag_len], 180 C[-self.tag_len:]) 181 182 if False in six.itervalues(self.ready): 183 raise CipherError(nonce_explicit_str, C, mac) 184 185 self.nonce_explicit = pkcs_os2ip(nonce_explicit_str) 186 if add_length: 187 A += struct.pack("!H", len(C)) 188 189 if hasattr(self, "pc_cls"): 190 self._cipher.mode._initialization_vector = self._get_nonce() 191 self._cipher.mode._tag = mac 192 decryptor = self._cipher.decryptor() 193 decryptor.authenticate_additional_data(A) 194 P = decryptor.update(C) 195 try: 196 decryptor.finalize() 197 except InvalidTag: 198 raise AEADTagError(nonce_explicit_str, P, mac) 199 else: 200 try: 201 if isinstance(self._cipher, AESCCM): 202 P = self._cipher.decrypt(self._get_nonce(), C + mac, A, 203 tag_length=self.tag_len) 204 else: 205 P = self._cipher.decrypt(self._get_nonce(), C + mac, A) 206 except InvalidTag: 207 raise AEADTagError(nonce_explicit_str, 208 "<unauthenticated data>", 209 mac) 210 return nonce_explicit_str, P, mac 211 212 def snapshot(self): 213 c = self.__class__(self.key, self.fixed_iv, self.nonce_explicit) 214 c.ready = self.ready.copy() 215 return c 216 217 218if conf.crypto_valid: 219 class Cipher_AES_128_GCM(_AEADCipher): 220 #XXX use the new AESGCM if available 221 #if conf.crypto_valid_advanced: 222 # cipher_cls = AESGCM 223 #else: 224 pc_cls = algorithms.AES 225 pc_cls_mode = modes.GCM 226 key_len = 16 227 tag_len = 16 228 229 class Cipher_AES_256_GCM(Cipher_AES_128_GCM): 230 key_len = 32 231 232 233if conf.crypto_valid_advanced: 234 class Cipher_AES_128_CCM(_AEADCipher): 235 cipher_cls = AESCCM 236 key_len = 16 237 tag_len = 16 238 239 class Cipher_AES_256_CCM(Cipher_AES_128_CCM): 240 key_len = 32 241 242 class Cipher_AES_128_CCM_8(Cipher_AES_128_CCM): 243 tag_len = 8 244 245 class Cipher_AES_256_CCM_8(Cipher_AES_128_CCM_8): 246 key_len = 32 247 248 249class _AEADCipher_TLS13(six.with_metaclass(_AEADCipherMetaclass, object)): 250 """ 251 The hasattr(self, "pc_cls") enable support for the legacy implementation 252 of GCM in the cryptography library. They should not be used, and might 253 eventually be removed, with cryptography v2.0. XXX 254 """ 255 type = "aead" 256 257 def __init__(self, key=None, fixed_iv=None, nonce_explicit=None): 258 """ 259 'key' and 'fixed_iv' are to be provided as strings. This IV never 260 changes: it is either the client_write_IV or server_write_IV. 261 262 Note that 'nonce_explicit' is never used. It is only a safeguard for a 263 call in session.py to the TLS 1.2/ChaCha20Poly1305 case (see RFC 7905). 264 """ 265 self.ready = {"key": True, "fixed_iv": True} 266 if key is None: 267 self.ready["key"] = False 268 key = b"\0" * self.key_len 269 if fixed_iv is None: 270 self.ready["fixed_iv"] = False 271 fixed_iv = b"\0" * self.fixed_iv_len 272 273 # we use super() in order to avoid any deadlock with __setattr__ 274 super(_AEADCipher_TLS13, self).__setattr__("key", key) 275 super(_AEADCipher_TLS13, self).__setattr__("fixed_iv", fixed_iv) 276 277 if hasattr(self, "pc_cls"): 278 self._cipher = Cipher(self.pc_cls(key), 279 self.pc_cls_mode(fixed_iv), 280 backend=default_backend()) 281 else: 282 self._cipher = self.cipher_cls(key) 283 284 def __setattr__(self, name, val): 285 if name == "key": 286 if self._cipher is not None: 287 if hasattr(self, "pc_cls"): 288 self._cipher.algorithm.key = val 289 else: 290 self._cipher._key = val 291 self.ready["key"] = True 292 elif name == "fixed_iv": 293 self.ready["fixed_iv"] = True 294 super(_AEADCipher_TLS13, self).__setattr__(name, val) 295 296 def _get_nonce(self, seq_num): 297 padlen = self.fixed_iv_len - len(seq_num) 298 padded_seq_num = b"\x00" * padlen + seq_num 299 return strxor(padded_seq_num, self.fixed_iv) 300 301 def auth_encrypt(self, P, A, seq_num): 302 """ 303 Encrypt the data, and append the computed authentication code. 304 TLS 1.3 does not use additional data, but we leave this option to the 305 user nonetheless. 306 307 Note that the cipher's authentication tag must be None when encrypting. 308 """ 309 if False in six.itervalues(self.ready): 310 raise CipherError(P, A) 311 312 if hasattr(self, "pc_cls"): 313 self._cipher.mode._tag = None 314 self._cipher.mode._initialization_vector = self._get_nonce(seq_num) 315 encryptor = self._cipher.encryptor() 316 encryptor.authenticate_additional_data(A) 317 res = encryptor.update(P) + encryptor.finalize() 318 res += encryptor.tag 319 else: 320 if (conf.crypto_valid_advanced and 321 isinstance(self._cipher, AESCCM)): 322 res = self._cipher.encrypt(self._get_nonce(seq_num), P, A, 323 tag_length=self.tag_len) 324 else: 325 res = self._cipher.encrypt(self._get_nonce(seq_num), P, A) 326 return res 327 328 def auth_decrypt(self, A, C, seq_num): 329 """ 330 Decrypt the data and verify the authentication code (in this order). 331 Note that TLS 1.3 is not supposed to use any additional data A. 332 If the verification fails, an AEADTagError is raised. It is the user's 333 responsibility to catch it if deemed useful. If we lack the key, we 334 raise a CipherError which contains the encrypted input. 335 """ 336 C, mac = C[:-self.tag_len], C[-self.tag_len:] 337 if False in six.itervalues(self.ready): 338 raise CipherError(C, mac) 339 340 if hasattr(self, "pc_cls"): 341 self._cipher.mode._initialization_vector = self._get_nonce(seq_num) 342 self._cipher.mode._tag = mac 343 decryptor = self._cipher.decryptor() 344 decryptor.authenticate_additional_data(A) 345 P = decryptor.update(C) 346 try: 347 decryptor.finalize() 348 except InvalidTag: 349 raise AEADTagError(P, mac) 350 else: 351 try: 352 if (conf.crypto_valid_advanced and 353 isinstance(self._cipher, AESCCM)): 354 P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A, 355 tag_length=self.tag_len) 356 else: 357 if (conf.crypto_valid_advanced and 358 isinstance(self, Cipher_CHACHA20_POLY1305)): 359 A += struct.pack("!H", len(C)) 360 P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A) 361 except InvalidTag: 362 raise AEADTagError("<unauthenticated data>", mac) 363 return P, mac 364 365 def snapshot(self): 366 c = self.__class__(self.key, self.fixed_iv) 367 c.ready = self.ready.copy() 368 return c 369 370 371if conf.crypto_valid_advanced: 372 class Cipher_CHACHA20_POLY1305_TLS13(_AEADCipher_TLS13): 373 cipher_cls = ChaCha20Poly1305 374 key_len = 32 375 tag_len = 16 376 fixed_iv_len = 12 377 nonce_explicit_len = 0 378 379 class Cipher_CHACHA20_POLY1305(Cipher_CHACHA20_POLY1305_TLS13): 380 """ 381 This TLS 1.2 cipher actually uses TLS 1.3 logic, as per RFC 7905. 382 Changes occur at the record layer (in record.py). 383 """ 384 pass 385 386 387if conf.crypto_valid: 388 class Cipher_AES_128_GCM_TLS13(_AEADCipher_TLS13): 389 #XXX use the new AESGCM if available 390 #if conf.crypto_valid_advanced: 391 # cipher_cls = AESGCM 392 #else: 393 pc_cls = algorithms.AES 394 pc_cls_mode = modes.GCM 395 key_len = 16 396 fixed_iv_len = 12 397 tag_len = 16 398 399 class Cipher_AES_256_GCM_TLS13(Cipher_AES_128_GCM_TLS13): 400 key_len = 32 401 402 403if conf.crypto_valid_advanced: 404 class Cipher_AES_128_CCM_TLS13(_AEADCipher_TLS13): 405 cipher_cls = AESCCM 406 key_len = 16 407 tag_len = 16 408 409 class Cipher_AES_128_CCM_8_TLS13(Cipher_AES_128_CCM_TLS13): 410 tag_len = 8 411 412