• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2007, 2008, 2009 Arnaud Ebalard
5#               2015, 2016, 2017 Maxence Tury
6
7"""
8Authenticated Encryption with Associated Data ciphers.
9
10RFC 5288 introduces new ciphersuites for TLS 1.2 which are based on AES in
11Galois/Counter Mode (GCM). RFC 6655 in turn introduces AES_CCM ciphersuites.
12The related AEAD algorithms are defined in RFC 5116. Later on, RFC 7905
13introduced cipher suites based on a ChaCha20-Poly1305 construction.
14"""
15
16import struct
17
18from scapy.config import conf
19from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp, pkcs_os2ip
20from scapy.layers.tls.crypto.common import CipherError
21from scapy.utils import strxor
22
23if conf.crypto_valid:
24    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes  # noqa: E501
25    from cryptography.hazmat.backends import default_backend
26    from cryptography.exceptions import InvalidTag
27if conf.crypto_valid_advanced:
28    from cryptography.hazmat.primitives.ciphers.aead import (AESCCM,
29                                                             ChaCha20Poly1305)
30else:
31    class AESCCM:
32        pass
33
34_tls_aead_cipher_algs = {}
35
36
37class _AEADCipherMetaclass(type):
38    """
39    Cipher classes are automatically registered through this metaclass.
40    Furthermore, their name attribute is extracted from their class name.
41    """
42    def __new__(cls, ciph_name, bases, dct):
43        if not ciph_name.startswith("_AEADCipher"):
44            dct["name"] = ciph_name[7:]     # remove leading "Cipher_"
45        the_class = super(_AEADCipherMetaclass, cls).__new__(cls, ciph_name,
46                                                             bases, dct)
47        if not ciph_name.startswith("_AEADCipher"):
48            _tls_aead_cipher_algs[ciph_name[7:]] = the_class
49        return the_class
50
51
52class AEADTagError(Exception):
53    """
54    Raised when MAC verification fails.
55    """
56    pass
57
58
59class _AEADCipher(metaclass=_AEADCipherMetaclass):
60    """
61    The hasattr(self, "pc_cls") tests correspond to the legacy API of the
62    crypto library. With cryptography v2.0, both CCM and GCM should follow
63    the else case.
64
65    Note that the "fixed_iv" in TLS RFCs is called "salt" in the AEAD RFC 5116.
66    """
67    type = "aead"
68    fixed_iv_len = 4
69    nonce_explicit_len = 8
70
71    def __init__(self, key=None, fixed_iv=None, nonce_explicit=None):
72        """
73        'key' and 'fixed_iv' are to be provided as strings, whereas the internal  # noqa: E501
74        'nonce_explicit' is an integer (it is simpler for incrementation).
75        !! The whole 'nonce' may be called IV in certain RFCs.
76        """
77        self.ready = {"key": True, "fixed_iv": True, "nonce_explicit": True}
78        if key is None:
79            self.ready["key"] = False
80            key = b"\0" * self.key_len
81        if fixed_iv is None:
82            self.ready["fixed_iv"] = False
83            fixed_iv = b"\0" * self.fixed_iv_len
84        if nonce_explicit is None:
85            self.ready["nonce_explicit"] = False
86            nonce_explicit = 0
87
88        if isinstance(nonce_explicit, str):
89            nonce_explicit = pkcs_os2ip(nonce_explicit)
90
91        # we use super() in order to avoid any deadlock with __setattr__
92        super(_AEADCipher, self).__setattr__("key", key)
93        super(_AEADCipher, self).__setattr__("fixed_iv", fixed_iv)
94        super(_AEADCipher, self).__setattr__("nonce_explicit", nonce_explicit)
95
96        if hasattr(self, "pc_cls"):
97            if isinstance(self.pc_cls, AESCCM):
98                self._cipher = Cipher(self.pc_cls(key),
99                                      self.pc_cls_mode(self._get_nonce()),
100                                      backend=default_backend(),
101                                      tag_length=self.tag_len)
102            else:
103                self._cipher = Cipher(self.pc_cls(key),
104                                      self.pc_cls_mode(self._get_nonce()),
105                                      backend=default_backend())
106        else:
107            self._cipher = self.cipher_cls(key)
108
109    def __setattr__(self, name, val):
110        if name == "key":
111            if self._cipher is not None:
112                if hasattr(self, "pc_cls"):
113                    self._cipher.algorithm.key = val
114                else:
115                    self._cipher._key = val
116            self.ready["key"] = True
117        elif name == "fixed_iv":
118            self.ready["fixed_iv"] = True
119        elif name == "nonce_explicit":
120            if isinstance(val, str):
121                val = pkcs_os2ip(val)
122            self.ready["nonce_explicit"] = True
123        super(_AEADCipher, self).__setattr__(name, val)
124
125    def _get_nonce(self):
126        return (self.fixed_iv +
127                pkcs_i2osp(self.nonce_explicit, self.nonce_explicit_len))
128
129    def _update_nonce_explicit(self):
130        """
131        Increment the explicit nonce while avoiding any overflow.
132        """
133        ne = self.nonce_explicit + 1
134        self.nonce_explicit = ne % 2**(self.nonce_explicit_len * 8)
135
136    def auth_encrypt(self, P, A, seq_num=None):
137        """
138        Encrypt the data then prepend the explicit part of the nonce. The
139        authentication tag is directly appended with the most recent crypto
140        API. Additional data may be authenticated without encryption (as A).
141
142        The 'seq_num' should never be used here, it is only a safeguard needed
143        because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py
144        actually is a _AEADCipher_TLS13 (even though others are not).
145        """
146        if False in self.ready.values():
147            raise CipherError(P, A)
148
149        if hasattr(self, "pc_cls"):
150            self._cipher.mode._initialization_vector = self._get_nonce()
151            self._cipher.mode._tag = None
152            encryptor = self._cipher.encryptor()
153            encryptor.authenticate_additional_data(A)
154            res = encryptor.update(P) + encryptor.finalize()
155            res += encryptor.tag
156        else:
157            res = self._cipher.encrypt(self._get_nonce(), P, A)
158
159        nonce_explicit = pkcs_i2osp(self.nonce_explicit,
160                                    self.nonce_explicit_len)
161        self._update_nonce_explicit()
162        return nonce_explicit + res
163
164    def auth_decrypt(self, A, C, seq_num=None, add_length=True):
165        """
166        Decrypt the data and authenticate the associated data (i.e. A).
167        If the verification fails, an AEADTagError is raised. It is the user's
168        responsibility to catch it if deemed useful. If we lack the key, we
169        raise a CipherError which contains the encrypted input.
170
171        Note that we add the TLSCiphertext length to A although we're supposed
172        to add the TLSCompressed length. Fortunately, they are the same,
173        but the specifications actually messed up here. :'(
174
175        The 'add_length' switch should always be True for TLS, but we provide
176        it anyway (mostly for test cases, hum).
177
178        The 'seq_num' should never be used here, it is only a safeguard needed
179        because one cipher (ChaCha20Poly1305) using TLS 1.2 logic in record.py
180        actually is a _AEADCipher_TLS13 (even though others are not).
181        """
182        nonce_explicit_str, C, mac = (C[:self.nonce_explicit_len],
183                                      C[self.nonce_explicit_len:-self.tag_len],
184                                      C[-self.tag_len:])
185
186        if False in self.ready.values():
187            raise CipherError(nonce_explicit_str, C, mac)
188
189        self.nonce_explicit = pkcs_os2ip(nonce_explicit_str)
190        if add_length:
191            A += struct.pack("!H", len(C))
192
193        if hasattr(self, "pc_cls"):
194            self._cipher.mode._initialization_vector = self._get_nonce()
195            self._cipher.mode._tag = mac
196            decryptor = self._cipher.decryptor()
197            decryptor.authenticate_additional_data(A)
198            P = decryptor.update(C)
199            try:
200                decryptor.finalize()
201            except InvalidTag:
202                raise AEADTagError(nonce_explicit_str, P, mac)
203        else:
204            try:
205                P = self._cipher.decrypt(self._get_nonce(), C + mac, A)
206            except InvalidTag:
207                raise AEADTagError(nonce_explicit_str,
208                                   "<unauthenticated data>",
209                                   mac)
210        return nonce_explicit_str, P, mac
211
212    def snapshot(self):
213        c = self.__class__(self.key, self.fixed_iv, self.nonce_explicit)
214        c.ready = self.ready.copy()
215        return c
216
217
218if conf.crypto_valid:
219    class Cipher_AES_128_GCM(_AEADCipher):
220        # XXX use the new AESGCM if available
221        # if conf.crypto_valid_advanced:
222        #    cipher_cls = AESGCM
223        # else:
224        pc_cls = algorithms.AES
225        pc_cls_mode = modes.GCM
226        key_len = 16
227        tag_len = 16
228
229    class Cipher_AES_256_GCM(Cipher_AES_128_GCM):
230        key_len = 32
231
232
233if conf.crypto_valid_advanced:
234    class Cipher_AES_128_CCM(_AEADCipher):
235        cipher_cls = AESCCM
236        key_len = 16
237        tag_len = 16
238
239    class Cipher_AES_256_CCM(Cipher_AES_128_CCM):
240        key_len = 32
241
242    class Cipher_AES_128_CCM_8(Cipher_AES_128_CCM):
243        tag_len = 8
244
245    class Cipher_AES_256_CCM_8(Cipher_AES_128_CCM_8):
246        key_len = 32
247
248
249class _AEADCipher_TLS13(metaclass=_AEADCipherMetaclass):
250    """
251    The hasattr(self, "pc_cls") enable support for the legacy implementation
252    of GCM in the cryptography library. They should not be used, and might
253    eventually be removed, with cryptography v2.0. XXX
254    """
255    type = "aead"
256
257    def __init__(self, key=None, fixed_iv=None, nonce_explicit=None):
258        """
259        'key' and 'fixed_iv' are to be provided as strings. This IV never
260        changes: it is either the client_write_IV or server_write_IV.
261
262        Note that 'nonce_explicit' is never used. It is only a safeguard for a
263        call in session.py to the TLS 1.2/ChaCha20Poly1305 case (see RFC 7905).
264        """
265        self.ready = {"key": True, "fixed_iv": True}
266        if key is None:
267            self.ready["key"] = False
268            key = b"\0" * self.key_len
269        if fixed_iv is None:
270            self.ready["fixed_iv"] = False
271            fixed_iv = b"\0" * self.fixed_iv_len
272
273        # we use super() in order to avoid any deadlock with __setattr__
274        super(_AEADCipher_TLS13, self).__setattr__("key", key)
275        super(_AEADCipher_TLS13, self).__setattr__("fixed_iv", fixed_iv)
276
277        if hasattr(self, "pc_cls"):
278            if isinstance(self.pc_cls, AESCCM):
279                self._cipher = Cipher(self.pc_cls(key),
280                                      self.pc_cls_mode(fixed_iv),
281                                      backend=default_backend(),
282                                      tag_length=self.tag_len)
283            else:
284                self._cipher = Cipher(self.pc_cls(key),
285                                      self.pc_cls_mode(fixed_iv),
286                                      backend=default_backend())
287        else:
288            if self.cipher_cls == ChaCha20Poly1305:
289                # ChaCha20Poly1305 doesn't have a tag_length argument...
290                self._cipher = self.cipher_cls(key)
291            else:
292                self._cipher = self.cipher_cls(key, tag_length=self.tag_len)
293
294    def __setattr__(self, name, val):
295        if name == "key":
296            if self._cipher is not None:
297                if hasattr(self, "pc_cls"):
298                    self._cipher.algorithm.key = val
299                else:
300                    self._cipher._key = val
301            self.ready["key"] = True
302        elif name == "fixed_iv":
303            self.ready["fixed_iv"] = True
304        super(_AEADCipher_TLS13, self).__setattr__(name, val)
305
306    def _get_nonce(self, seq_num):
307        padlen = self.fixed_iv_len - len(seq_num)
308        padded_seq_num = b"\x00" * padlen + seq_num
309        return strxor(padded_seq_num, self.fixed_iv)
310
311    def auth_encrypt(self, P, A, seq_num):
312        """
313        Encrypt the data, and append the computed authentication code.
314        The additional data for TLS 1.3 is the record header.
315
316        Note that the cipher's authentication tag must be None when encrypting.
317        """
318        if False in self.ready.values():
319            raise CipherError(P, A)
320
321        if hasattr(self, "pc_cls"):
322            self._cipher.mode._tag = None
323            self._cipher.mode._initialization_vector = self._get_nonce(seq_num)
324            encryptor = self._cipher.encryptor()
325            encryptor.authenticate_additional_data(A)
326            res = encryptor.update(P) + encryptor.finalize()
327            res += encryptor.tag
328        else:
329            if (conf.crypto_valid_advanced and
330                    isinstance(self._cipher, AESCCM)):
331                res = self._cipher.encrypt(self._get_nonce(seq_num), P, A)
332            else:
333                res = self._cipher.encrypt(self._get_nonce(seq_num), P, A)
334        return res
335
336    def auth_decrypt(self, A, C, seq_num):
337        """
338        Decrypt the data and verify the authentication code (in this order).
339        If the verification fails, an AEADTagError is raised. It is the user's
340        responsibility to catch it if deemed useful. If we lack the key, we
341        raise a CipherError which contains the encrypted input.
342        """
343        C, mac = C[:-self.tag_len], C[-self.tag_len:]
344        if False in self.ready.values():
345            raise CipherError(C, mac)
346
347        if hasattr(self, "pc_cls"):
348            self._cipher.mode._initialization_vector = self._get_nonce(seq_num)
349            self._cipher.mode._tag = mac
350            decryptor = self._cipher.decryptor()
351            decryptor.authenticate_additional_data(A)
352            P = decryptor.update(C)
353            try:
354                decryptor.finalize()
355            except InvalidTag:
356                raise AEADTagError(P, mac)
357        else:
358            try:
359                if (conf.crypto_valid_advanced and
360                        isinstance(self._cipher, AESCCM)):
361                    P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A)  # noqa: E501
362                else:
363                    if (conf.crypto_valid_advanced and
364                            isinstance(self, Cipher_CHACHA20_POLY1305)):
365                        A += struct.pack("!H", len(C))
366                    P = self._cipher.decrypt(self._get_nonce(seq_num), C + mac, A)  # noqa: E501
367            except InvalidTag:
368                raise AEADTagError("<unauthenticated data>", mac)
369        return P, mac
370
371    def snapshot(self):
372        c = self.__class__(self.key, self.fixed_iv)
373        c.ready = self.ready.copy()
374        return c
375
376
377if conf.crypto_valid_advanced:
378    class Cipher_CHACHA20_POLY1305_TLS13(_AEADCipher_TLS13):
379        cipher_cls = ChaCha20Poly1305
380        key_len = 32
381        tag_len = 16
382        fixed_iv_len = 12
383        nonce_explicit_len = 0
384
385    class Cipher_CHACHA20_POLY1305(Cipher_CHACHA20_POLY1305_TLS13):
386        """
387        This TLS 1.2 cipher actually uses TLS 1.3 logic, as per RFC 7905.
388        Changes occur at the record layer (in record.py).
389        """
390        pass
391
392
393if conf.crypto_valid:
394    class Cipher_AES_128_GCM_TLS13(_AEADCipher_TLS13):
395        # XXX use the new AESGCM if available
396        # if conf.crypto_valid_advanced:
397        #    cipher_cls = AESGCM
398        # else:
399        pc_cls = algorithms.AES
400        pc_cls_mode = modes.GCM
401        key_len = 16
402        fixed_iv_len = 12
403        tag_len = 16
404
405    class Cipher_AES_256_GCM_TLS13(Cipher_AES_128_GCM_TLS13):
406        key_len = 32
407
408
409if conf.crypto_valid_advanced:
410    class Cipher_AES_128_CCM_TLS13(_AEADCipher_TLS13):
411        cipher_cls = AESCCM
412        key_len = 16
413        tag_len = 16
414        fixed_iv_len = 12
415
416    class Cipher_AES_128_CCM_8_TLS13(Cipher_AES_128_CCM_TLS13):
417        tag_len = 8
418