1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) 2014 6WIND 5 6r""" 7IPsec layer 8=========== 9 10Example of use: 11 12>>> sa = SecurityAssociation(ESP, spi=0xdeadbeef, crypt_algo='AES-CBC', 13... crypt_key=b'sixteenbytes key') 14>>> p = IP(src='1.1.1.1', dst='2.2.2.2') 15>>> p /= TCP(sport=45012, dport=80) 16>>> p /= Raw('testdata') 17>>> p = IP(raw(p)) 18>>> p 19<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 options=[] |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> # noqa: E501 20>>> 21>>> e = sa.encrypt(p) 22>>> e 23<IP version=4L ihl=5L tos=0x0 len=76 id=1 flags= frag=0L ttl=64 proto=esp chksum=0x747a src=1.1.1.1 dst=2.2.2.2 |<ESP spi=0xdeadbeef seq=1 data=b'\xf8\xdb\x1e\x83[T\xab\\\xd2\x1b\xed\xd1\xe5\xc8Y\xc2\xa5d\x92\xc1\x05\x17\xa6\x92\x831\xe6\xc1]\x9a\xd6K}W\x8bFfd\xa5B*+\xde\xc8\x89\xbf{\xa9' |>> # noqa: E501 24>>> 25>>> d = sa.decrypt(e) 26>>> d 27<IP version=4L ihl=5L tos=0x0 len=48 id=1 flags= frag=0L ttl=64 proto=tcp chksum=0x74c2 src=1.1.1.1 dst=2.2.2.2 |<TCP sport=45012 dport=http seq=0 ack=0 dataofs=5L reserved=0L flags=S window=8192 chksum=0x1914 urgptr=0 options=[] |<Raw load='testdata' |>>> # noqa: E501 28>>> 29>>> d == p 30True 31""" 32 33try: 34 from math import gcd 35except ImportError: 36 from fractions import gcd 37import os 38import socket 39import struct 40import warnings 41 42from scapy.config import conf, crypto_validator 43from scapy.compat import orb, raw 44from scapy.data import IP_PROTOS 45from scapy.error import log_loading 46from scapy.fields import ( 47 ByteEnumField, 48 ByteField, 49 IntField, 50 PacketField, 51 ShortField, 52 StrField, 53 XByteField, 54 XIntField, 55 XStrField, 56 XStrLenField, 57) 58from scapy.packet import ( 59 Packet, 60 Raw, 61 bind_bottom_up, 62 bind_layers, 63 bind_top_down, 64) 65from scapy.layers.inet import IP, UDP 66from scapy.layers.inet6 import IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \ 67 IPv6ExtHdrRouting 68 69 70############################################################################### 71class AH(Packet): 72 """ 73 Authentication Header 74 75 See https://tools.ietf.org/rfc/rfc4302.txt 76 """ 77 78 name = 'AH' 79 80 def __get_icv_len(self): 81 """ 82 Compute the size of the ICV based on the payloadlen field. 83 Padding size is included as it can only be known from the authentication # noqa: E501 84 algorithm provided by the Security Association. 85 """ 86 # payloadlen = length of AH in 32-bit words (4-byte units), minus "2" 87 # payloadlen = 3 32-bit word fixed fields + ICV + padding - 2 88 # ICV = (payloadlen + 2 - 3 - padding) in 32-bit words 89 return (self.payloadlen - 1) * 4 90 91 fields_desc = [ 92 ByteEnumField('nh', None, IP_PROTOS), 93 ByteField('payloadlen', None), 94 ShortField('reserved', None), 95 XIntField('spi', 0x00000001), 96 IntField('seq', 0), 97 XStrLenField('icv', None, length_from=__get_icv_len), 98 # Padding len can only be known with the SecurityAssociation.auth_algo 99 XStrLenField('padding', None, length_from=lambda x: 0), 100 ] 101 102 overload_fields = { 103 IP: {'proto': socket.IPPROTO_AH}, 104 IPv6: {'nh': socket.IPPROTO_AH}, 105 IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_AH}, 106 IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_AH}, 107 IPv6ExtHdrRouting: {'nh': socket.IPPROTO_AH}, 108 } 109 110 111bind_layers(IP, AH, proto=socket.IPPROTO_AH) 112bind_layers(IPv6, AH, nh=socket.IPPROTO_AH) 113bind_layers(AH, IP, nh=socket.IPPROTO_IP) 114bind_layers(AH, IPv6, nh=socket.IPPROTO_IPV6) 115 116############################################################################### 117 118 119class ESP(Packet): 120 """ 121 Encapsulated Security Payload 122 123 See https://tools.ietf.org/rfc/rfc4303.txt 124 """ 125 name = 'ESP' 126 127 fields_desc = [ 128 XIntField('spi', 0x00000001), 129 IntField('seq', 0), 130 XStrField('data', None), 131 ] 132 133 @classmethod 134 def dispatch_hook(cls, _pkt=None, *args, **kargs): 135 if _pkt: 136 if len(_pkt) >= 4 and struct.unpack("!I", _pkt[0:4])[0] == 0x00: 137 return NON_ESP 138 elif len(_pkt) == 1 and struct.unpack("!B", _pkt)[0] == 0xff: 139 return NAT_KEEPALIVE 140 else: 141 return ESP 142 return cls 143 144 overload_fields = { 145 IP: {'proto': socket.IPPROTO_ESP}, 146 IPv6: {'nh': socket.IPPROTO_ESP}, 147 IPv6ExtHdrHopByHop: {'nh': socket.IPPROTO_ESP}, 148 IPv6ExtHdrDestOpt: {'nh': socket.IPPROTO_ESP}, 149 IPv6ExtHdrRouting: {'nh': socket.IPPROTO_ESP}, 150 } 151 152 153class NON_ESP(Packet): # RFC 3948, section 2.2 154 fields_desc = [ 155 XIntField("non_esp", 0x0) 156 ] 157 158 159class NAT_KEEPALIVE(Packet): # RFC 3948, section 2.2 160 fields_desc = [ 161 XByteField("nat_keepalive", 0xFF) 162 ] 163 164 165bind_layers(IP, ESP, proto=socket.IPPROTO_ESP) 166bind_layers(IPv6, ESP, nh=socket.IPPROTO_ESP) 167 168# NAT-Traversal encapsulation 169bind_bottom_up(UDP, ESP, dport=4500) 170bind_bottom_up(UDP, ESP, sport=4500) 171bind_top_down(UDP, ESP, dport=4500, sport=4500) 172bind_top_down(UDP, NON_ESP, dport=4500, sport=4500) 173bind_top_down(UDP, NAT_KEEPALIVE, dport=4500, sport=4500) 174 175############################################################################### 176 177 178class _ESPPlain(Packet): 179 """ 180 Internal class to represent unencrypted ESP packets. 181 """ 182 name = 'ESP' 183 184 fields_desc = [ 185 XIntField('spi', 0x0), 186 IntField('seq', 0), 187 188 StrField('iv', ''), 189 PacketField('data', '', Raw), 190 StrField('padding', ''), 191 192 ByteField('padlen', 0), 193 ByteEnumField('nh', 0, IP_PROTOS), 194 StrField('icv', ''), 195 ] 196 197 def data_for_encryption(self): 198 return raw(self.data) + self.padding + struct.pack("BB", self.padlen, self.nh) # noqa: E501 199 200 201############################################################################### 202if conf.crypto_valid: 203 from cryptography.exceptions import InvalidTag 204 from cryptography.hazmat.backends import default_backend 205 from cryptography.hazmat.primitives.ciphers import ( 206 aead, 207 Cipher, 208 algorithms, 209 modes, 210 ) 211 try: 212 # cryptography > 43.0 213 from cryptography.hazmat.decrepit.ciphers import ( 214 algorithms as decrepit_algorithms 215 ) 216 except ImportError: 217 decrepit_algorithms = algorithms 218else: 219 log_loading.info("Can't import python-cryptography v1.7+. " 220 "Disabled IPsec encryption/authentication.") 221 default_backend = None 222 InvalidTag = Exception 223 Cipher = algorithms = modes = None 224 225############################################################################### 226 227 228def _lcm(a, b): 229 """ 230 Least Common Multiple between 2 integers. 231 """ 232 if a == 0 or b == 0: 233 return 0 234 else: 235 return abs(a * b) // gcd(a, b) 236 237 238class CryptAlgo(object): 239 """ 240 IPsec encryption algorithm 241 """ 242 243 def __init__(self, name, cipher, mode, block_size=None, iv_size=None, 244 key_size=None, icv_size=None, salt_size=None, format_mode_iv=None): # noqa: E501 245 """ 246 :param name: the name of this encryption algorithm 247 :param cipher: a Cipher module 248 :param mode: the mode used with the cipher module 249 :param block_size: the length a block for this algo. Defaults to the 250 `block_size` of the cipher. 251 :param iv_size: the length of the initialization vector of this algo. 252 Defaults to the `block_size` of the cipher. 253 :param key_size: an integer or list/tuple of integers. If specified, 254 force the secret keys length to one of the values. 255 Defaults to the `key_size` of the cipher. 256 :param icv_size: the length of the Integrity Check Value of this algo. 257 Used by Combined Mode Algorithms e.g. GCM 258 :param salt_size: the length of the salt to use as the IV prefix. 259 Usually used by Counter modes e.g. CTR 260 :param format_mode_iv: function to format the Initialization Vector 261 e.g. handle the salt value 262 Default is the random buffer from `generate_iv` 263 """ 264 self.name = name 265 self.cipher = cipher 266 self.mode = mode 267 self.icv_size = icv_size 268 269 self.is_aead = False 270 # If using cryptography.hazmat.primitives.cipher.aead 271 self.ciphers_aead_api = False 272 273 if modes: 274 if self.mode is not None: 275 self.is_aead = issubclass(self.mode, 276 modes.ModeWithAuthenticationTag) 277 elif self.cipher in (aead.AESGCM, aead.AESCCM, 278 aead.ChaCha20Poly1305): 279 self.is_aead = True 280 self.ciphers_aead_api = True 281 282 if block_size is not None: 283 self.block_size = block_size 284 elif cipher is not None: 285 self.block_size = cipher.block_size // 8 286 else: 287 self.block_size = 1 288 289 if iv_size is None: 290 self.iv_size = self.block_size 291 else: 292 self.iv_size = iv_size 293 294 if key_size is not None: 295 self.key_size = key_size 296 elif cipher is not None: 297 self.key_size = tuple(i // 8 for i in cipher.key_sizes) 298 else: 299 self.key_size = None 300 301 if salt_size is None: 302 self.salt_size = 0 303 else: 304 self.salt_size = salt_size 305 306 if format_mode_iv is None: 307 self._format_mode_iv = lambda iv, **kw: iv 308 else: 309 self._format_mode_iv = format_mode_iv 310 311 def check_key(self, key): 312 """ 313 Check that the key length is valid. 314 315 :param key: a byte string 316 """ 317 if self.key_size and not (len(key) == self.key_size or len(key) in self.key_size): # noqa: E501 318 raise TypeError('invalid key size %s, must be %s' % 319 (len(key), self.key_size)) 320 321 def generate_iv(self): 322 """ 323 Generate a random initialization vector. 324 """ 325 # XXX: Handle counter modes with real counters? RFCs allow the use of 326 # XXX: random bytes for counters, so it is not wrong to do it that way 327 return os.urandom(self.iv_size) 328 329 @crypto_validator 330 def new_cipher(self, key, mode_iv, digest=None): 331 """ 332 :param key: the secret key, a byte string 333 :param mode_iv: the initialization vector or nonce, a byte string. 334 Formatted by `format_mode_iv`. 335 :param digest: also known as tag or icv. A byte string containing the 336 digest of the encrypted data. Only use this during 337 decryption! 338 339 :returns: an initialized cipher object for this algo 340 """ 341 if self.is_aead and digest is not None: 342 # With AEAD, the mode needs the digest during decryption. 343 return Cipher( 344 self.cipher(key), 345 self.mode(mode_iv, digest, len(digest)), 346 default_backend(), 347 ) 348 else: 349 return Cipher( 350 self.cipher(key), 351 self.mode(mode_iv), 352 default_backend(), 353 ) 354 355 def pad(self, esp): 356 """ 357 Add the correct amount of padding so that the data to encrypt is 358 exactly a multiple of the algorithm's block size. 359 360 Also, make sure that the total ESP packet length is a multiple of 4 361 bytes. 362 363 :param esp: an unencrypted _ESPPlain packet 364 365 :returns: an unencrypted _ESPPlain packet with valid padding 366 """ 367 # 2 extra bytes for padlen and nh 368 data_len = len(esp.data) + 2 369 370 # according to the RFC4303, section 2.4. Padding (for Encryption) 371 # the size of the ESP payload must be a multiple of 32 bits 372 align = _lcm(self.block_size, 4) 373 374 # pad for block size 375 esp.padlen = -data_len % align 376 377 # Still according to the RFC, the default value for padding *MUST* be an # noqa: E501 378 # array of bytes starting from 1 to padlen 379 # TODO: Handle padding function according to the encryption algo 380 esp.padding = struct.pack("B" * esp.padlen, *range(1, esp.padlen + 1)) 381 382 # If the following test fails, it means that this algo does not comply 383 # with the RFC 384 payload_len = len(esp.iv) + len(esp.data) + len(esp.padding) + 2 385 if payload_len % 4 != 0: 386 raise ValueError('The size of the ESP data is not aligned to 32 bits after padding.') # noqa: E501 387 388 return esp 389 390 def encrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0): 391 """ 392 Encrypt an ESP packet 393 394 :param sa: the SecurityAssociation associated with the ESP packet. 395 :param esp: an unencrypted _ESPPlain packet with valid padding 396 :param key: the secret key used for encryption 397 :param icv_size: the length of the icv used for integrity check 398 :esn_en: extended sequence number enable which allows to use 64-bit 399 sequence number instead of 32-bit when using an AEAD 400 algorithm 401 :esn: extended sequence number (32 MSB) 402 :return: a valid ESP packet encrypted with this algorithm 403 """ 404 if icv_size is None: 405 icv_size = self.icv_size if self.is_aead else 0 406 data = esp.data_for_encryption() 407 408 if self.cipher: 409 mode_iv = self._format_mode_iv(algo=self, sa=sa, iv=esp.iv) 410 aad = None 411 if self.is_aead: 412 if esn_en: 413 aad = struct.pack('!LLL', esp.spi, esn, esp.seq) 414 else: 415 aad = struct.pack('!LL', esp.spi, esp.seq) 416 if self.ciphers_aead_api: 417 # New API 418 if self.cipher == aead.AESCCM: 419 cipher = self.cipher(key, tag_length=icv_size) 420 else: 421 cipher = self.cipher(key) 422 if self.name == 'AES-NULL-GMAC': 423 # Special case for GMAC (rfc 4543 sect 3) 424 data = data + cipher.encrypt(mode_iv, b"", aad + esp.iv + data) 425 else: 426 data = cipher.encrypt(mode_iv, data, aad) 427 else: 428 cipher = self.new_cipher(key, mode_iv) 429 encryptor = cipher.encryptor() 430 431 if self.is_aead: 432 encryptor.authenticate_additional_data(aad) 433 data = encryptor.update(data) + encryptor.finalize() 434 data += encryptor.tag[:icv_size] 435 else: 436 data = encryptor.update(data) + encryptor.finalize() 437 438 return ESP(spi=esp.spi, seq=esp.seq, data=esp.iv + data) 439 440 def decrypt(self, sa, esp, key, icv_size=None, esn_en=False, esn=0): 441 """ 442 Decrypt an ESP packet 443 444 :param sa: the SecurityAssociation associated with the ESP packet. 445 :param esp: an encrypted ESP packet 446 :param key: the secret key used for encryption 447 :param icv_size: the length of the icv used for integrity check 448 :param esn_en: extended sequence number enable which allows to use 449 64-bit sequence number instead of 32-bit when using an 450 AEAD algorithm 451 :param esn: extended sequence number (32 MSB) 452 :returns: a valid ESP packet encrypted with this algorithm 453 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check 454 fails with an AEAD algorithm 455 """ 456 if icv_size is None: 457 icv_size = self.icv_size if self.is_aead else 0 458 459 iv = esp.data[:self.iv_size] 460 data = esp.data[self.iv_size:len(esp.data) - icv_size] 461 icv = esp.data[len(esp.data) - icv_size:] 462 463 if self.cipher: 464 mode_iv = self._format_mode_iv(sa=sa, iv=iv) 465 aad = None 466 if self.is_aead: 467 if esn_en: 468 aad = struct.pack('!LLL', esp.spi, esn, esp.seq) 469 else: 470 aad = struct.pack('!LL', esp.spi, esp.seq) 471 if self.ciphers_aead_api: 472 # New API 473 if self.cipher == aead.AESCCM: 474 cipher = self.cipher(key, tag_length=icv_size) 475 else: 476 cipher = self.cipher(key) 477 try: 478 if self.name == 'AES-NULL-GMAC': 479 # Special case for GMAC (rfc 4543 sect 3) 480 data = data + cipher.decrypt(mode_iv, icv, aad + iv + data) 481 else: 482 data = cipher.decrypt(mode_iv, data + icv, aad) 483 except InvalidTag as err: 484 raise IPSecIntegrityError(err) 485 else: 486 cipher = self.new_cipher(key, mode_iv, icv) 487 decryptor = cipher.decryptor() 488 489 if self.is_aead: 490 # Tag value check is done during the finalize method 491 decryptor.authenticate_additional_data(aad) 492 try: 493 data = decryptor.update(data) + decryptor.finalize() 494 except InvalidTag as err: 495 raise IPSecIntegrityError(err) 496 497 # extract padlen and nh 498 padlen = orb(data[-2]) 499 nh = orb(data[-1]) 500 501 # then use padlen to determine data and padding 502 padding = data[len(data) - padlen - 2: len(data) - 2] 503 data = data[:len(data) - padlen - 2] 504 505 return _ESPPlain(spi=esp.spi, 506 seq=esp.seq, 507 iv=iv, 508 data=data, 509 padding=padding, 510 padlen=padlen, 511 nh=nh, 512 icv=icv) 513 514############################################################################### 515# The names of the encryption algorithms are the same than in scapy.contrib.ikev2 # noqa: E501 516# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml 517 518 519CRYPT_ALGOS = { 520 'NULL': CryptAlgo('NULL', cipher=None, mode=None, iv_size=0), 521} 522 523if algorithms: 524 CRYPT_ALGOS['AES-CBC'] = CryptAlgo('AES-CBC', 525 cipher=algorithms.AES, 526 mode=modes.CBC) 527 _aes_ctr_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv + b'\x00\x00\x00\x01' # noqa: E501 528 CRYPT_ALGOS['AES-CTR'] = CryptAlgo('AES-CTR', 529 cipher=algorithms.AES, 530 mode=modes.CTR, 531 block_size=1, 532 iv_size=8, 533 salt_size=4, 534 format_mode_iv=_aes_ctr_format_mode_iv) 535 _salt_format_mode_iv = lambda sa, iv, **kw: sa.crypt_salt + iv 536 CRYPT_ALGOS['AES-GCM'] = CryptAlgo('AES-GCM', 537 cipher=aead.AESGCM, 538 key_size=(16, 24, 32), 539 mode=None, 540 salt_size=4, 541 block_size=1, 542 iv_size=8, 543 icv_size=16, 544 format_mode_iv=_salt_format_mode_iv) 545 # GMAC: rfc 4543, "companion to the AES Galois/Counter Mode ESP" 546 # This is defined as a crypt_algo by rfc, but has the role of an auth_algo 547 CRYPT_ALGOS['AES-NULL-GMAC'] = CryptAlgo('AES-NULL-GMAC', 548 cipher=aead.AESGCM, 549 key_size=(16, 24, 32), 550 mode=None, 551 salt_size=4, 552 block_size=1, 553 iv_size=8, 554 icv_size=16, 555 format_mode_iv=_salt_format_mode_iv) 556 CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM', 557 cipher=aead.AESCCM, 558 mode=None, 559 key_size=(16, 24, 32), 560 block_size=1, 561 iv_size=8, 562 salt_size=3, 563 icv_size=16, 564 format_mode_iv=_salt_format_mode_iv) 565 CRYPT_ALGOS['CHACHA20-POLY1305'] = CryptAlgo('CHACHA20-POLY1305', 566 cipher=aead.ChaCha20Poly1305, 567 mode=None, 568 key_size=32, 569 block_size=1, 570 iv_size=8, 571 salt_size=4, 572 icv_size=16, 573 format_mode_iv=_salt_format_mode_iv) # noqa: E501 574 575 # Using a TripleDES cipher algorithm for DES is done by using the same 64 576 # bits key 3 times (done by cryptography when given a 64 bits key) 577 CRYPT_ALGOS['DES'] = CryptAlgo('DES', 578 cipher=decrepit_algorithms.TripleDES, 579 mode=modes.CBC, 580 key_size=(8,)) 581 CRYPT_ALGOS['3DES'] = CryptAlgo('3DES', 582 cipher=decrepit_algorithms.TripleDES, 583 mode=modes.CBC) 584 if decrepit_algorithms is algorithms: 585 # cryptography < 43 raises a DeprecationWarning 586 from cryptography.utils import CryptographyDeprecationWarning 587 with warnings.catch_warnings(): 588 # Hide deprecation warnings 589 warnings.filterwarnings("ignore", 590 category=CryptographyDeprecationWarning) 591 CRYPT_ALGOS['CAST'] = CryptAlgo('CAST', 592 cipher=decrepit_algorithms.CAST5, 593 mode=modes.CBC) 594 CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish', 595 cipher=decrepit_algorithms.Blowfish, 596 mode=modes.CBC) 597 else: 598 CRYPT_ALGOS['CAST'] = CryptAlgo('CAST', 599 cipher=decrepit_algorithms.CAST5, 600 mode=modes.CBC) 601 CRYPT_ALGOS['Blowfish'] = CryptAlgo('Blowfish', 602 cipher=decrepit_algorithms.Blowfish, 603 mode=modes.CBC) 604 605 606############################################################################### 607if conf.crypto_valid: 608 from cryptography.hazmat.primitives.hmac import HMAC 609 from cryptography.hazmat.primitives.cmac import CMAC 610 from cryptography.hazmat.primitives import hashes 611else: 612 # no error if cryptography is not available but authentication won't be supported # noqa: E501 613 HMAC = CMAC = hashes = None 614 615############################################################################### 616 617 618class IPSecIntegrityError(Exception): 619 """ 620 Error risen when the integrity check fails. 621 """ 622 pass 623 624 625class AuthAlgo(object): 626 """ 627 IPsec integrity algorithm 628 """ 629 630 def __init__(self, name, mac, digestmod, icv_size, key_size=None): 631 """ 632 :param name: the name of this integrity algorithm 633 :param mac: a Message Authentication Code module 634 :param digestmod: a Hash or Cipher module 635 :param icv_size: the length of the integrity check value of this algo 636 :param key_size: an integer or list/tuple of integers. If specified, 637 force the secret keys length to one of the values. 638 Defaults to the `key_size` of the cipher. 639 """ 640 self.name = name 641 self.mac = mac 642 self.digestmod = digestmod 643 self.icv_size = icv_size 644 self.key_size = key_size 645 646 def check_key(self, key): 647 """ 648 Check that the key length is valid. 649 650 :param key: a byte string 651 """ 652 if self.key_size and len(key) not in self.key_size: 653 raise TypeError('invalid key size %s, must be one of %s' % 654 (len(key), self.key_size)) 655 656 @crypto_validator 657 def new_mac(self, key): 658 """ 659 :param key: a byte string 660 :returns: an initialized mac object for this algo 661 """ 662 if self.mac is CMAC: 663 return self.mac(self.digestmod(key), default_backend()) 664 else: 665 return self.mac(key, self.digestmod(), default_backend()) 666 667 def sign(self, pkt, key, esn_en=False, esn=0): 668 """ 669 Sign an IPsec (ESP or AH) packet with this algo. 670 671 :param pkt: a packet that contains a valid encrypted ESP or AH layer 672 :param key: the authentication key, a byte string 673 :param esn_en: extended sequence number enable which allows to use 674 64-bit sequence number instead of 32-bit 675 :param esn: extended sequence number (32 MSB) 676 677 :returns: the signed packet 678 """ 679 if not self.mac: 680 return pkt 681 682 mac = self.new_mac(key) 683 684 if pkt.haslayer(ESP): 685 mac.update(bytes(pkt[ESP])) 686 if esn_en: 687 # RFC4303 sect 2.2.1 688 mac.update(struct.pack('!L', esn)) 689 pkt[ESP].data += mac.finalize()[:self.icv_size] 690 691 elif pkt.haslayer(AH): 692 mac.update(bytes(zero_mutable_fields(pkt.copy(), sending=True))) 693 if esn_en: 694 # RFC4302 sect 2.5.1 695 mac.update(struct.pack('!L', esn)) 696 pkt[AH].icv = mac.finalize()[:self.icv_size] 697 698 return pkt 699 700 def verify(self, pkt, key, esn_en=False, esn=0): 701 """ 702 Check that the integrity check value (icv) of a packet is valid. 703 704 :param pkt: a packet that contains a valid encrypted ESP or AH layer 705 :param key: the authentication key, a byte string 706 :param esn_en: extended sequence number enable which allows to use 707 64-bit sequence number instead of 32-bit 708 :param esn: extended sequence number (32 MSB) 709 710 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check 711 fails 712 """ 713 if not self.mac or self.icv_size == 0: 714 return 715 716 mac = self.new_mac(key) 717 718 pkt_icv = 'not found' 719 720 if isinstance(pkt, ESP): 721 pkt_icv = pkt.data[len(pkt.data) - self.icv_size:] 722 clone = pkt.copy() 723 clone.data = clone.data[:len(clone.data) - self.icv_size] 724 mac.update(bytes(clone)) 725 if esn_en: 726 # RFC4303 sect 2.2.1 727 mac.update(struct.pack('!L', esn)) 728 729 elif pkt.haslayer(AH): 730 if len(pkt[AH].icv) != self.icv_size: 731 # Fill padding since we know the actual icv_size 732 pkt[AH].padding = pkt[AH].icv[self.icv_size:] 733 pkt[AH].icv = pkt[AH].icv[:self.icv_size] 734 pkt_icv = pkt[AH].icv 735 clone = zero_mutable_fields(pkt.copy(), sending=False) 736 mac.update(bytes(clone)) 737 if esn_en: 738 # RFC4302 sect 2.5.1 739 mac.update(struct.pack('!L', esn)) 740 741 computed_icv = mac.finalize()[:self.icv_size] 742 743 # XXX: Cannot use mac.verify because the ICV can be truncated 744 if pkt_icv != computed_icv: 745 raise IPSecIntegrityError('pkt_icv=%r, computed_icv=%r' % 746 (pkt_icv, computed_icv)) 747 748############################################################################### 749# The names of the integrity algorithms are the same than in scapy.contrib.ikev2 # noqa: E501 750# see http://www.iana.org/assignments/ikev2-parameters/ikev2-parameters.xhtml 751 752 753AUTH_ALGOS = { 754 'NULL': AuthAlgo('NULL', mac=None, digestmod=None, icv_size=0), 755} 756 757if HMAC and hashes: 758 # XXX: NIST has deprecated SHA1 but is required by RFC7321 759 AUTH_ALGOS['HMAC-SHA1-96'] = AuthAlgo('HMAC-SHA1-96', 760 mac=HMAC, 761 digestmod=hashes.SHA1, 762 icv_size=12) 763 AUTH_ALGOS['SHA2-256-128'] = AuthAlgo('SHA2-256-128', 764 mac=HMAC, 765 digestmod=hashes.SHA256, 766 icv_size=16) 767 AUTH_ALGOS['SHA2-384-192'] = AuthAlgo('SHA2-384-192', 768 mac=HMAC, 769 digestmod=hashes.SHA384, 770 icv_size=24) 771 AUTH_ALGOS['SHA2-512-256'] = AuthAlgo('SHA2-512-256', 772 mac=HMAC, 773 digestmod=hashes.SHA512, 774 icv_size=32) 775 # XXX:Flagged as deprecated by 'cryptography'. Kept for backward compat 776 AUTH_ALGOS['HMAC-MD5-96'] = AuthAlgo('HMAC-MD5-96', 777 mac=HMAC, 778 digestmod=hashes.MD5, 779 icv_size=12) 780if CMAC and algorithms: 781 AUTH_ALGOS['AES-CMAC-96'] = AuthAlgo('AES-CMAC-96', 782 mac=CMAC, 783 digestmod=algorithms.AES, 784 icv_size=12, 785 key_size=(16,)) 786 787############################################################################### 788 789 790def split_for_transport(orig_pkt, transport_proto): 791 """ 792 Split an IP(v6) packet in the correct location to insert an ESP or AH 793 header. 794 795 :param orig_pkt: the packet to split. Must be an IP or IPv6 packet 796 :param transport_proto: the IPsec protocol number that will be inserted 797 at the split position. 798 :returns: a tuple (header, nh, payload) where nh is the protocol number of 799 payload. 800 """ 801 # force resolution of default fields to avoid padding errors 802 header = orig_pkt.__class__(raw(orig_pkt)) 803 next_hdr = header.payload 804 nh = None 805 806 if header.version == 4: 807 nh = header.proto 808 header.proto = transport_proto 809 header.remove_payload() 810 del header.chksum 811 del header.len 812 813 return header, nh, next_hdr 814 else: 815 found_rt_hdr = False 816 prev = header 817 818 # Since the RFC 4302 is vague about where the ESP/AH headers should be 819 # inserted in IPv6, I chose to follow the linux implementation. 820 while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): # noqa: E501 821 if isinstance(next_hdr, IPv6ExtHdrHopByHop): 822 pass 823 if isinstance(next_hdr, IPv6ExtHdrRouting): 824 found_rt_hdr = True 825 elif isinstance(next_hdr, IPv6ExtHdrDestOpt) and found_rt_hdr: 826 break 827 828 prev = next_hdr 829 next_hdr = next_hdr.payload 830 831 nh = prev.nh 832 prev.nh = transport_proto 833 prev.remove_payload() 834 del header.plen 835 836 return header, nh, next_hdr 837 838 839############################################################################### 840# see RFC 4302 - Appendix A. Mutability of IP Options/Extension Headers 841IMMUTABLE_IPV4_OPTIONS = ( 842 0, # End Of List 843 1, # No OPeration 844 2, # Security 845 5, # Extended Security 846 6, # Commercial Security 847 20, # Router Alert 848 21, # Sender Directed Multi-Destination Delivery 849) 850 851 852def zero_mutable_fields(pkt, sending=False): 853 """ 854 When using AH, all "mutable" fields must be "zeroed" before calculating 855 the ICV. See RFC 4302, Section 3.3.3.1. Handling Mutable Fields. 856 857 :param pkt: an IP(v6) packet containing an AH layer. 858 NOTE: The packet will be modified 859 :param sending: if true, ipv6 routing headers will not be reordered 860 """ 861 862 if pkt.haslayer(AH): 863 pkt[AH].icv = b"\x00" * len(pkt[AH].icv) 864 else: 865 raise TypeError('no AH layer found') 866 867 if pkt.version == 4: 868 # the tos field has been replaced by DSCP and ECN 869 # Routers may rewrite the DS field as needed to provide a 870 # desired local or end-to-end service 871 pkt.tos = 0 872 # an intermediate router might set the DF bit, even if the source 873 # did not select it. 874 pkt.flags = 0 875 # changed en route as a normal course of processing by routers 876 pkt.ttl = 0 877 # will change if any of these other fields change 878 pkt.chksum = 0 879 880 immutable_opts = [] 881 for opt in pkt.options: 882 if opt.option in IMMUTABLE_IPV4_OPTIONS: 883 immutable_opts.append(opt) 884 else: 885 immutable_opts.append(Raw(b"\x00" * len(opt))) 886 pkt.options = immutable_opts 887 888 else: 889 # holds DSCP and ECN 890 pkt.tc = 0 891 # The flow label described in AHv1 was mutable, and in RFC 2460 [DH98] 892 # was potentially mutable. To retain compatibility with existing AH 893 # implementations, the flow label is not included in the ICV in AHv2. 894 pkt.fl = 0 895 # same as ttl 896 pkt.hlim = 0 897 898 next_hdr = pkt.payload 899 900 while isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrRouting, IPv6ExtHdrDestOpt)): # noqa: E501 901 if isinstance(next_hdr, (IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt)): 902 for opt in next_hdr.options: 903 if opt.otype & 0x20: 904 # option data can change en-route and must be zeroed 905 opt.optdata = b"\x00" * opt.optlen 906 elif isinstance(next_hdr, IPv6ExtHdrRouting) and sending: 907 # The sender must order the field so that it appears as it 908 # will at the receiver, prior to performing the ICV computation. # noqa: E501 909 next_hdr.segleft = 0 910 if next_hdr.addresses: 911 final = next_hdr.addresses.pop() 912 next_hdr.addresses.insert(0, pkt.dst) 913 pkt.dst = final 914 else: 915 break 916 917 next_hdr = next_hdr.payload 918 919 return pkt 920 921############################################################################### 922 923 924class SecurityAssociation(object): 925 """ 926 This class is responsible of "encryption" and "decryption" of IPsec packets. # noqa: E501 927 """ 928 929 SUPPORTED_PROTOS = (IP, IPv6) 930 931 def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None, 932 crypt_icv_size=None, 933 auth_algo=None, auth_key=None, 934 tunnel_header=None, nat_t_header=None, esn_en=False, esn=0): 935 """ 936 :param proto: the IPsec proto to use (ESP or AH) 937 :param spi: the Security Parameters Index of this SA 938 :param seq_num: the initial value for the sequence number on encrypted 939 packets 940 :param crypt_algo: the encryption algorithm name (only used with ESP) 941 :param crypt_key: the encryption key (only used with ESP) 942 :param crypt_icv_size: change the default size of the crypt_algo 943 (only used with ESP) 944 :param auth_algo: the integrity algorithm name 945 :param auth_key: the integrity key 946 :param tunnel_header: an instance of a IP(v6) header that will be used 947 to encapsulate the encrypted packets. 948 :param nat_t_header: an instance of a UDP header that will be used 949 for NAT-Traversal. 950 :param esn_en: extended sequence number enable which allows to use 951 64-bit sequence number instead of 32-bit when using an 952 AEAD algorithm 953 :param esn: extended sequence number (32 MSB) 954 """ 955 956 if proto not in {ESP, AH, ESP.name, AH.name}: 957 raise ValueError("proto must be either ESP or AH") 958 if isinstance(proto, str): 959 self.proto = {ESP.name: ESP, AH.name: AH}[proto] 960 else: 961 self.proto = proto 962 963 self.spi = spi 964 self.seq_num = seq_num 965 self.esn_en = esn_en 966 # Get Extended Sequence (32 MSB) 967 self.esn = esn 968 if crypt_algo: 969 if crypt_algo not in CRYPT_ALGOS: 970 raise TypeError('unsupported encryption algo %r, try %r' % 971 (crypt_algo, list(CRYPT_ALGOS))) 972 self.crypt_algo = CRYPT_ALGOS[crypt_algo] 973 974 if crypt_key: 975 salt_size = self.crypt_algo.salt_size 976 self.crypt_key = crypt_key[:len(crypt_key) - salt_size] 977 self.crypt_salt = crypt_key[len(crypt_key) - salt_size:] 978 else: 979 self.crypt_key = None 980 self.crypt_salt = None 981 982 else: 983 self.crypt_algo = CRYPT_ALGOS['NULL'] 984 self.crypt_key = None 985 self.crypt_salt = None 986 self.crypt_icv_size = crypt_icv_size 987 988 if auth_algo: 989 if auth_algo not in AUTH_ALGOS: 990 raise TypeError('unsupported integrity algo %r, try %r' % 991 (auth_algo, list(AUTH_ALGOS))) 992 self.auth_algo = AUTH_ALGOS[auth_algo] 993 self.auth_key = auth_key 994 else: 995 self.auth_algo = AUTH_ALGOS['NULL'] 996 self.auth_key = None 997 998 if tunnel_header and not isinstance(tunnel_header, (IP, IPv6)): 999 raise TypeError('tunnel_header must be %s or %s' % (IP.name, IPv6.name)) # noqa: E501 1000 self.tunnel_header = tunnel_header 1001 1002 if nat_t_header: 1003 if proto is not ESP: 1004 raise TypeError('nat_t_header is only allowed with ESP') 1005 if not isinstance(nat_t_header, UDP): 1006 raise TypeError('nat_t_header must be %s' % UDP.name) 1007 self.nat_t_header = nat_t_header 1008 1009 def check_spi(self, pkt): 1010 if pkt.spi != self.spi: 1011 raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' % 1012 (pkt.spi, self.spi)) 1013 1014 def _encrypt_esp(self, pkt, seq_num=None, iv=None, esn_en=None, esn=None): 1015 1016 if iv is None: 1017 iv = self.crypt_algo.generate_iv() 1018 else: 1019 if len(iv) != self.crypt_algo.iv_size: 1020 raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) # noqa: E501 1021 1022 esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv) 1023 1024 if self.tunnel_header: 1025 tunnel = self.tunnel_header.copy() 1026 1027 if tunnel.version == 4: 1028 del tunnel.proto 1029 del tunnel.len 1030 del tunnel.chksum 1031 else: 1032 del tunnel.nh 1033 del tunnel.plen 1034 1035 pkt = tunnel.__class__(raw(tunnel / pkt)) 1036 1037 ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_ESP) 1038 esp.data = payload 1039 esp.nh = nh 1040 1041 esp = self.crypt_algo.pad(esp) 1042 esp = self.crypt_algo.encrypt(self, esp, self.crypt_key, 1043 self.crypt_icv_size, 1044 esn_en=esn_en or self.esn_en, 1045 esn=esn or self.esn) 1046 1047 self.auth_algo.sign(esp, 1048 self.auth_key, 1049 esn_en=esn_en or self.esn_en, 1050 esn=esn or self.esn) 1051 1052 if self.nat_t_header: 1053 nat_t_header = self.nat_t_header.copy() 1054 nat_t_header.chksum = 0 1055 del nat_t_header.len 1056 if ip_header.version == 4: 1057 del ip_header.proto 1058 else: 1059 del ip_header.nh 1060 ip_header /= nat_t_header 1061 1062 if ip_header.version == 4: 1063 del ip_header.len 1064 del ip_header.chksum 1065 else: 1066 del ip_header.plen 1067 1068 # sequence number must always change, unless specified by the user 1069 if seq_num is None: 1070 self.seq_num += 1 1071 1072 return ip_header.__class__(raw(ip_header / esp)) 1073 1074 def _encrypt_ah(self, pkt, seq_num=None, esn_en=False, esn=0): 1075 1076 ah = AH(spi=self.spi, seq=seq_num or self.seq_num, 1077 icv=b"\x00" * self.auth_algo.icv_size) 1078 1079 if self.tunnel_header: 1080 tunnel = self.tunnel_header.copy() 1081 1082 if tunnel.version == 4: 1083 del tunnel.proto 1084 del tunnel.len 1085 del tunnel.chksum 1086 else: 1087 del tunnel.nh 1088 del tunnel.plen 1089 1090 pkt = tunnel.__class__(raw(tunnel / pkt)) 1091 1092 ip_header, nh, payload = split_for_transport(pkt, socket.IPPROTO_AH) 1093 ah.nh = nh 1094 1095 if ip_header.version == 6 and len(ah) % 8 != 0: 1096 # For IPv6, the total length of the header must be a multiple of 1097 # 8-octet units. 1098 ah.padding = b"\x00" * (-len(ah) % 8) 1099 elif len(ah) % 4 != 0: 1100 # For IPv4, the total length of the header must be a multiple of 1101 # 4-octet units. 1102 ah.padding = b"\x00" * (-len(ah) % 4) 1103 1104 # RFC 4302 - Section 2.2. Payload Length 1105 # This 8-bit field specifies the length of AH in 32-bit words (4-byte 1106 # units), minus "2". 1107 ah.payloadlen = len(ah) // 4 - 2 1108 1109 if ip_header.version == 4: 1110 ip_header.len = len(ip_header) + len(ah) + len(payload) 1111 del ip_header.chksum 1112 ip_header = ip_header.__class__(raw(ip_header)) 1113 else: 1114 ip_header.plen = len(ip_header.payload) + len(ah) + len(payload) 1115 1116 signed_pkt = self.auth_algo.sign(ip_header / ah / payload, 1117 self.auth_key, 1118 esn_en=esn_en or self.esn_en, 1119 esn=esn or self.esn) 1120 1121 # sequence number must always change, unless specified by the user 1122 if seq_num is None: 1123 self.seq_num += 1 1124 1125 return signed_pkt 1126 1127 def encrypt(self, pkt, seq_num=None, iv=None, esn_en=None, esn=None): 1128 """ 1129 Encrypt (and encapsulate) an IP(v6) packet with ESP or AH according 1130 to this SecurityAssociation. 1131 1132 :param pkt: the packet to encrypt 1133 :param seq_num: if specified, use this sequence number instead of the 1134 generated one 1135 :param esn_en: extended sequence number enable which allows to 1136 use 64-bit sequence number instead of 32-bit when 1137 using an AEAD algorithm 1138 :param esn: extended sequence number (32 MSB) 1139 :param iv: if specified, use this initialization vector for 1140 encryption instead of a random one. 1141 1142 :returns: the encrypted/encapsulated packet 1143 """ 1144 if not isinstance(pkt, self.SUPPORTED_PROTOS): 1145 raise TypeError('cannot encrypt %s, supported protos are %s' 1146 % (pkt.__class__, self.SUPPORTED_PROTOS)) 1147 if self.proto is ESP: 1148 return self._encrypt_esp(pkt, seq_num=seq_num, 1149 iv=iv, esn_en=esn_en, 1150 esn=esn) 1151 else: 1152 return self._encrypt_ah(pkt, seq_num=seq_num, 1153 esn_en=esn_en, esn=esn) 1154 1155 def _decrypt_esp(self, pkt, verify=True, esn_en=None, esn=None): 1156 1157 encrypted = pkt[ESP] 1158 1159 if verify: 1160 self.check_spi(pkt) 1161 self.auth_algo.verify(encrypted, self.auth_key, 1162 esn_en=esn_en or self.esn_en, 1163 esn=esn or self.esn) 1164 1165 esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key, 1166 self.crypt_icv_size or 1167 self.crypt_algo.icv_size or 1168 self.auth_algo.icv_size, 1169 esn_en=esn_en or self.esn_en, 1170 esn=esn or self.esn) 1171 1172 if self.tunnel_header: 1173 # drop the tunnel header and return the payload untouched 1174 1175 pkt.remove_payload() 1176 if pkt.version == 4: 1177 pkt.proto = esp.nh 1178 else: 1179 pkt.nh = esp.nh 1180 cls = pkt.guess_payload_class(esp.data) 1181 1182 return cls(esp.data) 1183 else: 1184 ip_header = pkt 1185 1186 if ip_header.version == 4: 1187 ip_header.proto = esp.nh 1188 del ip_header.chksum 1189 ip_header.remove_payload() 1190 ip_header.len = len(ip_header) + len(esp.data) 1191 # recompute checksum 1192 ip_header = ip_header.__class__(raw(ip_header)) 1193 else: 1194 if self.nat_t_header: 1195 # drop the UDP header and return the payload untouched 1196 ip_header.nh = esp.nh 1197 ip_header.remove_payload() 1198 else: 1199 encrypted.underlayer.nh = esp.nh 1200 encrypted.underlayer.remove_payload() 1201 ip_header.plen = len(ip_header.payload) + len(esp.data) 1202 1203 cls = ip_header.guess_payload_class(esp.data) 1204 1205 # reassemble the ip_header with the ESP payload 1206 return ip_header / cls(esp.data) 1207 1208 def _decrypt_ah(self, pkt, verify=True, esn_en=None, esn=None): 1209 1210 if verify: 1211 self.check_spi(pkt) 1212 self.auth_algo.verify(pkt, self.auth_key, 1213 esn_en=esn_en or self.esn_en, 1214 esn=esn or self.esn) 1215 1216 ah = pkt[AH] 1217 payload = ah.payload 1218 payload.remove_underlayer(None) # useless argument... 1219 1220 if self.tunnel_header: 1221 return payload 1222 else: 1223 ip_header = pkt 1224 1225 if ip_header.version == 4: 1226 ip_header.proto = ah.nh 1227 del ip_header.chksum 1228 ip_header.remove_payload() 1229 ip_header.len = len(ip_header) + len(payload) 1230 # recompute checksum 1231 ip_header = ip_header.__class__(raw(ip_header)) 1232 else: 1233 ah.underlayer.nh = ah.nh 1234 ah.underlayer.remove_payload() 1235 ip_header.plen = len(ip_header.payload) + len(payload) 1236 1237 # reassemble the ip_header with the AH payload 1238 return ip_header / payload 1239 1240 def decrypt(self, pkt, verify=True, esn_en=None, esn=None): 1241 """ 1242 Decrypt (and decapsulate) an IP(v6) packet containing ESP or AH. 1243 1244 :param pkt: the packet to decrypt 1245 :param verify: if False, do not perform the integrity check 1246 :param esn_en: extended sequence number enable which allows to use 1247 64-bit sequence number instead of 32-bit when using an 1248 AEAD algorithm 1249 :param esn: extended sequence number (32 MSB) 1250 :returns: the decrypted/decapsulated packet 1251 :raise scapy.layers.ipsec.IPSecIntegrityError: if the integrity check 1252 fails 1253 """ 1254 if not isinstance(pkt, self.SUPPORTED_PROTOS): 1255 raise TypeError('cannot decrypt %s, supported protos are %s' 1256 % (pkt.__class__, self.SUPPORTED_PROTOS)) 1257 1258 if self.proto is ESP and pkt.haslayer(ESP): 1259 return self._decrypt_esp(pkt, verify=verify, 1260 esn_en=esn_en, esn=esn) 1261 elif self.proto is AH and pkt.haslayer(AH): 1262 return self._decrypt_ah(pkt, verify=verify, esn_en=esn_en, esn=esn) 1263 else: 1264 raise TypeError('%s has no %s layer' % (pkt, self.proto.name)) 1265