1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) 2017 Maxence Tury 5# 2019 Romain Perez 6 7""" 8Common TLS 1.3 fields & bindings. 9 10This module covers the record layer, along with the ChangeCipherSpec, Alert and 11ApplicationData submessages. For the Handshake type, see tls_handshake.py. 12 13See the TLS class documentation for more information. 14""" 15 16import struct 17 18from scapy.error import log_runtime, warning 19from scapy.compat import raw, orb 20from scapy.fields import ByteEnumField, PacketField, XStrField 21from scapy.layers.tls.session import _GenericTLSSessionInheritance 22from scapy.layers.tls.basefields import _TLSVersionField, _tls_version, \ 23 _TLSMACField, _TLSLengthField, _tls_type 24from scapy.layers.tls.record import _TLSMsgListField, TLS 25from scapy.layers.tls.crypto.cipher_aead import AEADTagError 26from scapy.layers.tls.crypto.cipher_stream import Cipher_NULL 27from scapy.layers.tls.crypto.common import CipherError 28from scapy.layers.tls.crypto.pkcs1 import pkcs_i2osp 29 30 31############################################################################### 32# TLS Record Protocol # 33############################################################################### 34 35 36class TLSInnerPlaintext(_GenericTLSSessionInheritance): 37 name = "TLS Inner Plaintext" 38 fields_desc = [_TLSMsgListField("msg", []), 39 ByteEnumField("type", None, _tls_type), 40 XStrField("pad", "")] 41 42 def pre_dissect(self, s): 43 """ 44 We need to parse the padding and type as soon as possible, 45 else we won't be able to parse the message list... 46 """ 47 if len(s) < 1: 48 raise Exception("Invalid InnerPlaintext (too short).") 49 50 tmp_len = len(s) - 1 51 if s[-1] != b"\x00": 52 msg_len = tmp_len 53 else: 54 n = 1 55 while s[-n] != b"\x00" and n < tmp_len: 56 n += 1 57 msg_len = tmp_len - n 58 self.fields_desc[0].length_from = lambda pkt: msg_len 59 60 self.type = struct.unpack("B", s[msg_len:msg_len + 1])[0] 61 62 return s 63 64 65class _TLSInnerPlaintextField(PacketField): 66 def __init__(self, name, default, *args, **kargs): 67 super(_TLSInnerPlaintextField, self).__init__(name, 68 default, 69 TLSInnerPlaintext) 70 71 def m2i(self, pkt, m): 72 return self.cls(m, tls_session=pkt.tls_session) 73 74 def getfield(self, pkt, s): 75 tag_len = pkt.tls_session.rcs.mac_len 76 frag_len = pkt.len - tag_len 77 if frag_len < 1: 78 warning("InnerPlaintext should at least contain a byte type!") 79 return s, None 80 remain, i = super(_TLSInnerPlaintextField, self).getfield(pkt, s[:frag_len]) # noqa: E501 81 # remain should be empty here 82 return remain + s[frag_len:], i 83 84 def i2m(self, pkt, p): 85 if isinstance(p, _GenericTLSSessionInheritance): 86 p.tls_session = pkt.tls_session 87 if not pkt.tls_session.frozen: 88 return p.raw_stateful() 89 return raw(p) 90 91 92class TLS13(_GenericTLSSessionInheritance): 93 __slots__ = ["deciphered_len"] 94 name = "TLS 1.3" 95 fields_desc = [ByteEnumField("type", 0x17, _tls_type), 96 _TLSVersionField("version", 0x0303, _tls_version), 97 _TLSLengthField("len", None), 98 _TLSInnerPlaintextField("inner", TLSInnerPlaintext()), 99 _TLSMACField("auth_tag", None)] 100 101 def __init__(self, *args, **kargs): 102 self.deciphered_len = kargs.get("deciphered_len", None) 103 super(TLS13, self).__init__(*args, **kargs) 104 105 # Parsing methods 106 107 def _tls_auth_decrypt(self, s): 108 """ 109 Provided with the record header and AEAD-ciphered data, return the 110 sliced and clear tuple (TLSInnerPlaintext, tag). Note that 111 we still return the slicing of the original input in case of decryption 112 failure. Also, if the integrity check fails, a warning will be issued, 113 but we still return the sliced (unauthenticated) plaintext. 114 """ 115 rcs = self.tls_session.rcs 116 read_seq_num = struct.pack("!Q", rcs.seq_num) 117 rcs.seq_num += 1 118 add_data = (pkcs_i2osp(self.type, 1) + 119 pkcs_i2osp(self.version, 2) + 120 pkcs_i2osp(len(s), 2)) 121 try: 122 return rcs.cipher.auth_decrypt(add_data, s, read_seq_num) 123 except CipherError as e: 124 return e.args 125 except AEADTagError as e: 126 pkt_info = self.firstlayer().summary() 127 log_runtime.info("TLS 1.3: record integrity check failed [%s]", pkt_info) # noqa: E501 128 return e.args 129 130 def pre_dissect(self, s): 131 """ 132 Decrypt, verify and decompress the message. 133 """ 134 # We commit the pending read state if it has been triggered. 135 if self.tls_session.triggered_prcs_commit: 136 if self.tls_session.prcs is not None: 137 self.tls_session.rcs = self.tls_session.prcs 138 self.tls_session.prcs = None 139 self.tls_session.triggered_prcs_commit = False 140 if len(s) < 5: 141 raise Exception("Invalid record: header is too short.") 142 143 self.type = orb(s[0]) 144 if (isinstance(self.tls_session.rcs.cipher, Cipher_NULL) or 145 self.type == 0x14): 146 self.deciphered_len = None 147 return s 148 else: 149 msglen = struct.unpack('!H', s[3:5])[0] 150 hdr, efrag, r = s[:5], s[5:5 + msglen], s[msglen + 5:] 151 frag, auth_tag = self._tls_auth_decrypt(efrag) 152 self.deciphered_len = len(frag) 153 return hdr + frag + auth_tag + r 154 155 def post_dissect(self, s): 156 """ 157 Commit the pending read state if it has been triggered. We update 158 nothing if the prcs was not set, as this probably means that we're 159 working out-of-context (and we need to keep the default rcs). 160 """ 161 if self.tls_session.triggered_prcs_commit: 162 if self.tls_session.prcs is not None: 163 self.tls_session.rcs = self.tls_session.prcs 164 self.tls_session.prcs = None 165 self.tls_session.triggered_prcs_commit = False 166 return s 167 168 def do_dissect_payload(self, s): 169 """ 170 Try to dissect the following data as a TLS message. 171 Note that overloading .guess_payload_class() would not be enough, 172 as the TLS session to be used would get lost. 173 """ 174 return TLS.do_dissect_payload(self, s) 175 176 # Building methods 177 178 def _tls_auth_encrypt(self, s): 179 """ 180 Return the TLSCiphertext.encrypted_record for AEAD ciphers. 181 """ 182 wcs = self.tls_session.wcs 183 write_seq_num = struct.pack("!Q", wcs.seq_num) 184 wcs.seq_num += 1 185 add_data = (pkcs_i2osp(self.type, 1) + 186 pkcs_i2osp(self.version, 2) + 187 pkcs_i2osp(len(s) + wcs.cipher.tag_len, 2)) 188 189 return wcs.cipher.auth_encrypt(s, add_data, write_seq_num) 190 191 def post_build(self, pkt, pay): 192 """ 193 Apply the previous methods according to the writing cipher type. 194 """ 195 # Compute the length of TLSPlaintext fragment 196 hdr, frag = pkt[:5], pkt[5:] 197 if not isinstance(self.tls_session.wcs.cipher, Cipher_NULL): 198 frag = self._tls_auth_encrypt(frag) 199 200 if self.len is not None: 201 # The user gave us a 'len', let's respect this ultimately 202 hdr = hdr[:3] + struct.pack("!H", self.len) 203 else: 204 # Update header with the length of TLSCiphertext.inner 205 hdr = hdr[:3] + struct.pack("!H", len(frag)) 206 207 # Now we commit the pending write state if it has been triggered. We 208 # update nothing if the pwcs was not set. This probably means that 209 # we're working out-of-context (and we need to keep the default wcs). 210 if self.tls_session.triggered_pwcs_commit: 211 if self.tls_session.pwcs is not None: 212 self.tls_session.wcs = self.tls_session.pwcs 213 self.tls_session.pwcs = None 214 self.tls_session.triggered_pwcs_commit = False 215 216 return hdr + frag + pay 217 218 def mysummary(self): 219 s, n = super(TLS13, self).mysummary() 220 if self.inner and self.inner.msg: 221 s += " / " 222 s += " / ".join(getattr(x, "_name", x.name) for x in self.inner.msg) 223 return s, n 224