• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2017 Maxence Tury
5#               2019 Romain Perez
6
7"""
8Common TLS 1.3 fields & bindings.
9
10This module covers the record layer, along with the ChangeCipherSpec, Alert and
11ApplicationData submessages. For the Handshake type, see tls_handshake.py.
12
13See the TLS class documentation for more information.
14"""
15
16import struct
17
18from scapy.error import log_runtime, warning
19from scapy.compat import raw, orb
20from scapy.fields import ByteEnumField, PacketField, XStrField
21from scapy.layers.tls.session import _GenericTLSSessionInheritance
22from scapy.layers.tls.basefields import _TLSVersionField, _tls_version, \
23    _TLSMACField, _TLSLengthField, _tls_type
24from scapy.layers.tls.record import _TLSMsgListField, TLS
25from scapy.layers.tls.crypto.cipher_aead import AEADTagError
26from scapy.layers.tls.crypto.cipher_stream import Cipher_NULL
27from scapy.layers.tls.crypto.common import CipherError
28from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp
29
30
31###############################################################################
32#   TLS Record Protocol                                                       #
33###############################################################################
34
35
36class TLSInnerPlaintext(_GenericTLSSessionInheritance):
37    name = "TLS Inner Plaintext"
38    fields_desc = [_TLSMsgListField("msg", []),
39                   ByteEnumField("type", None, _tls_type),
40                   XStrField("pad", "")]
41
42    def pre_dissect(self, s):
43        """
44        We need to parse the padding and type as soon as possible,
45        else we won't be able to parse the message list...
46        """
47        if len(s) < 1:
48            raise Exception("Invalid InnerPlaintext (too short).")
49
50        tmp_len = len(s) - 1
51        if s[-1] != b"\x00":
52            msg_len = tmp_len
53        else:
54            n = 1
55            while s[-n] != b"\x00" and n < tmp_len:
56                n += 1
57            msg_len = tmp_len - n
58        self.fields_desc[0].length_from = lambda pkt: msg_len
59
60        self.type = struct.unpack("B", s[msg_len:msg_len + 1])[0]
61
62        return s
63
64
65class _TLSInnerPlaintextField(PacketField):
66    def __init__(self, name, default, *args, **kargs):
67        super(_TLSInnerPlaintextField, self).__init__(name,
68                                                      default,
69                                                      TLSInnerPlaintext)
70
71    def m2i(self, pkt, m):
72        return self.cls(m, tls_session=pkt.tls_session)
73
74    def getfield(self, pkt, s):
75        tag_len = pkt.tls_session.rcs.mac_len
76        frag_len = pkt.len - tag_len
77        if frag_len < 1:
78            warning("InnerPlaintext should at least contain a byte type!")
79            return s, None
80        remain, i = super(_TLSInnerPlaintextField, self).getfield(pkt, s[:frag_len])  # noqa: E501
81        # remain should be empty here
82        return remain + s[frag_len:], i
83
84    def i2m(self, pkt, p):
85        if isinstance(p, _GenericTLSSessionInheritance):
86            p.tls_session = pkt.tls_session
87            if not pkt.tls_session.frozen:
88                return p.raw_stateful()
89        return raw(p)
90
91
92class TLS13(_GenericTLSSessionInheritance):
93    __slots__ = ["deciphered_len"]
94    name = "TLS 1.3"
95    fields_desc = [ByteEnumField("type", 0x17, _tls_type),
96                   _TLSVersionField("version", 0x0303, _tls_version),
97                   _TLSLengthField("len", None),
98                   _TLSInnerPlaintextField("inner", TLSInnerPlaintext()),
99                   _TLSMACField("auth_tag", None)]
100
101    def __init__(self, *args, **kargs):
102        self.deciphered_len = kargs.get("deciphered_len", None)
103        super(TLS13, self).__init__(*args, **kargs)
104
105    # Parsing methods
106
107    def _tls_auth_decrypt(self, s):
108        """
109        Provided with the record header and AEAD-ciphered data, return the
110        sliced and clear tuple (TLSInnerPlaintext, tag). Note that
111        we still return the slicing of the original input in case of decryption
112        failure. Also, if the integrity check fails, a warning will be issued,
113        but we still return the sliced (unauthenticated) plaintext.
114        """
115        rcs = self.tls_session.rcs
116        read_seq_num = struct.pack("!Q", rcs.seq_num)
117        rcs.seq_num += 1
118        add_data = (pkcs_i2osp(self.type, 1) +
119                    pkcs_i2osp(self.version, 2) +
120                    pkcs_i2osp(len(s), 2))
121        try:
122            return rcs.cipher.auth_decrypt(add_data, s, read_seq_num)
123        except CipherError as e:
124            return e.args
125        except AEADTagError as e:
126            pkt_info = self.firstlayer().summary()
127            log_runtime.info("TLS 1.3: record integrity check failed [%s]", pkt_info)  # noqa: E501
128            return e.args
129
130    def pre_dissect(self, s):
131        """
132        Decrypt, verify and decompress the message.
133        """
134        # We commit the pending read state if it has been triggered.
135        if self.tls_session.triggered_prcs_commit:
136            if self.tls_session.prcs is not None:
137                self.tls_session.rcs = self.tls_session.prcs
138                self.tls_session.prcs = None
139            self.tls_session.triggered_prcs_commit = False
140        if len(s) < 5:
141            raise Exception("Invalid record: header is too short.")
142
143        self.type = orb(s[0])
144        if (isinstance(self.tls_session.rcs.cipher, Cipher_NULL) or
145                self.type == 0x14):
146            self.deciphered_len = None
147            return s
148        else:
149            msglen = struct.unpack('!H', s[3:5])[0]
150            hdr, efrag, r = s[:5], s[5:5 + msglen], s[msglen + 5:]
151            frag, auth_tag = self._tls_auth_decrypt(efrag)
152            self.deciphered_len = len(frag)
153            return hdr + frag + auth_tag + r
154
155    def post_dissect(self, s):
156        """
157        Commit the pending read state if it has been triggered. We update
158        nothing if the prcs was not set, as this probably means that we're
159        working out-of-context (and we need to keep the default rcs).
160        """
161        if self.tls_session.triggered_prcs_commit:
162            if self.tls_session.prcs is not None:
163                self.tls_session.rcs = self.tls_session.prcs
164                self.tls_session.prcs = None
165            self.tls_session.triggered_prcs_commit = False
166        return s
167
168    def do_dissect_payload(self, s):
169        """
170        Try to dissect the following data as a TLS message.
171        Note that overloading .guess_payload_class() would not be enough,
172        as the TLS session to be used would get lost.
173        """
174        return TLS.do_dissect_payload(self, s)
175
176    # Building methods
177
178    def _tls_auth_encrypt(self, s):
179        """
180        Return the TLSCiphertext.encrypted_record for AEAD ciphers.
181        """
182        wcs = self.tls_session.wcs
183        write_seq_num = struct.pack("!Q", wcs.seq_num)
184        wcs.seq_num += 1
185        add_data = (pkcs_i2osp(self.type, 1) +
186                    pkcs_i2osp(self.version, 2) +
187                    pkcs_i2osp(len(s) + wcs.cipher.tag_len, 2))
188
189        return wcs.cipher.auth_encrypt(s, add_data, write_seq_num)
190
191    def post_build(self, pkt, pay):
192        """
193        Apply the previous methods according to the writing cipher type.
194        """
195        # Compute the length of TLSPlaintext fragment
196        hdr, frag = pkt[:5], pkt[5:]
197        if not isinstance(self.tls_session.wcs.cipher, Cipher_NULL):
198            frag = self._tls_auth_encrypt(frag)
199
200        if self.len is not None:
201            # The user gave us a 'len', let's respect this ultimately
202            hdr = hdr[:3] + struct.pack("!H", self.len)
203        else:
204            # Update header with the length of TLSCiphertext.inner
205            hdr = hdr[:3] + struct.pack("!H", len(frag))
206
207        # Now we commit the pending write state if it has been triggered. We
208        # update nothing if the pwcs was not set. This probably means that
209        # we're working out-of-context (and we need to keep the default wcs).
210        if self.tls_session.triggered_pwcs_commit:
211            if self.tls_session.pwcs is not None:
212                self.tls_session.wcs = self.tls_session.pwcs
213                self.tls_session.pwcs = None
214            self.tls_session.triggered_pwcs_commit = False
215
216        return hdr + frag + pay
217
218    def mysummary(self):
219        s, n = super(TLS13, self).mysummary()
220        if self.inner and self.inner.msg:
221            s += " / "
222            s += " / ".join(getattr(x, "_name", x.name) for x in self.inner.msg)
223        return s, n
224