1## This file is part of Scapy 2## Copyright (C) 2017 Maxence Tury 3## This program is published under a GPLv2 license 4 5""" 6Common TLS 1.3 fields & bindings. 7 8This module covers the record layer, along with the ChangeCipherSpec, Alert and 9ApplicationData submessages. For the Handshake type, see tls_handshake.py. 10 11See the TLS class documentation for more information. 12""" 13 14import struct 15 16from scapy.config import conf 17from scapy.error import log_runtime 18from scapy.fields import * 19from scapy.packet import * 20from scapy.layers.tls.session import _GenericTLSSessionInheritance 21from scapy.layers.tls.basefields import (_TLSVersionField, _tls_version, 22 _TLSMACField, _TLSLengthField, _tls_type) 23from scapy.layers.tls.record import _TLSMsgListField 24from scapy.layers.tls.crypto.cipher_aead import AEADTagError 25from scapy.layers.tls.crypto.cipher_stream import Cipher_NULL 26from scapy.layers.tls.crypto.ciphers import CipherError 27 28 29############################################################################### 30### TLS Record Protocol ### 31############################################################################### 32 33class TLSInnerPlaintext(_GenericTLSSessionInheritance): 34 name = "TLS Inner Plaintext" 35 fields_desc = [ _TLSMsgListField("msg", []), 36 ByteEnumField("type", None, _tls_type), 37 XStrField("pad", "") ] 38 39 def pre_dissect(self, s): 40 """ 41 We need to parse the padding and type as soon as possible, 42 else we won't be able to parse the message list... 43 """ 44 if len(s) < 1: 45 raise Exception("Invalid InnerPlaintext (too short).") 46 47 l = len(s) - 1 48 if s[-1] != b"\x00": 49 msg_len = l 50 else: 51 n = 1 52 while s[-n] != b"\x00" and n < l: 53 n += 1 54 msg_len = l - n 55 self.fields_desc[0].length_from = lambda pkt: msg_len 56 57 self.type = struct.unpack("B", s[msg_len:msg_len+1])[0] 58 59 return s 60 61class _TLSInnerPlaintextField(PacketField): 62 def __init__(self, name, default, *args, **kargs): 63 super(_TLSInnerPlaintextField, self).__init__(name, 64 default, 65 TLSInnerPlaintext) 66 67 def m2i(self, pkt, m): 68 return self.cls(m, tls_session=pkt.tls_session) 69 70 def getfield(self, pkt, s): 71 tag_len = pkt.tls_session.rcs.mac_len 72 frag_len = pkt.len - tag_len 73 if frag_len < 1: 74 warning("InnerPlaintext should at least contain a byte type!") 75 return s, None 76 remain, i = super(_TLSInnerPlaintextField, self).getfield(pkt, s[:frag_len]) 77 # remain should be empty here 78 return remain + s[frag_len:], i 79 80 def i2m(self, pkt, p): 81 if isinstance(p, _GenericTLSSessionInheritance): 82 p.tls_session = pkt.tls_session 83 if not pkt.tls_session.frozen: 84 return p.raw_stateful() 85 return raw(p) 86 87 88class TLS13(_GenericTLSSessionInheritance): 89 __slots__ = ["deciphered_len"] 90 name = "TLS 1.3" 91 fields_desc = [ ByteEnumField("type", 0x17, _tls_type), 92 _TLSVersionField("version", 0x0301, _tls_version), 93 _TLSLengthField("len", None), 94 _TLSInnerPlaintextField("inner", TLSInnerPlaintext()), 95 _TLSMACField("auth_tag", None) ] 96 97 def __init__(self, *args, **kargs): 98 self.deciphered_len = kargs.get("deciphered_len", None) 99 super(TLS13, self).__init__(*args, **kargs) 100 101 102 ### Parsing methods 103 104 def _tls_auth_decrypt(self, s): 105 """ 106 Provided with the record header and AEAD-ciphered data, return the 107 sliced and clear tuple (TLSInnerPlaintext, tag). Note that 108 we still return the slicing of the original input in case of decryption 109 failure. Also, if the integrity check fails, a warning will be issued, 110 but we still return the sliced (unauthenticated) plaintext. 111 """ 112 rcs = self.tls_session.rcs 113 read_seq_num = struct.pack("!Q", rcs.seq_num) 114 rcs.seq_num += 1 115 try: 116 return rcs.cipher.auth_decrypt(b"", s, read_seq_num) 117 except CipherError as e: 118 return e.args 119 except AEADTagError as e: 120 pkt_info = self.firstlayer().summary() 121 log_runtime.info("TLS: record integrity check failed [%s]", pkt_info) 122 return e.args 123 124 def pre_dissect(self, s): 125 """ 126 Decrypt, verify and decompress the message. 127 """ 128 if len(s) < 5: 129 raise Exception("Invalid record: header is too short.") 130 131 if isinstance(self.tls_session.rcs.cipher, Cipher_NULL): 132 self.deciphered_len = None 133 return s 134 else: 135 msglen = struct.unpack('!H', s[3:5])[0] 136 hdr, efrag, r = s[:5], s[5:5+msglen], s[msglen+5:] 137 frag, auth_tag = self._tls_auth_decrypt(efrag) 138 self.deciphered_len = len(frag) 139 return hdr + frag + auth_tag + r 140 141 def post_dissect(self, s): 142 """ 143 Commit the pending read state if it has been triggered. We update 144 nothing if the prcs was not set, as this probably means that we're 145 working out-of-context (and we need to keep the default rcs). 146 """ 147 if self.tls_session.triggered_prcs_commit: 148 if self.tls_session.prcs is not None: 149 self.tls_session.rcs = self.tls_session.prcs 150 self.tls_session.prcs = None 151 self.tls_session.triggered_prcs_commit = False 152 return s 153 154 def do_dissect_payload(self, s): 155 """ 156 Try to dissect the following data as a TLS message. 157 Note that overloading .guess_payload_class() would not be enough, 158 as the TLS session to be used would get lost. 159 """ 160 if s: 161 try: 162 p = TLS(s, _internal=1, _underlayer=self, 163 tls_session = self.tls_session) 164 except KeyboardInterrupt: 165 raise 166 except: 167 p = conf.raw_layer(s, _internal=1, _underlayer=self) 168 self.add_payload(p) 169 170 171 ### Building methods 172 173 def _tls_auth_encrypt(self, s): 174 """ 175 Return the TLSCiphertext.encrypted_record for AEAD ciphers. 176 """ 177 wcs = self.tls_session.wcs 178 write_seq_num = struct.pack("!Q", wcs.seq_num) 179 wcs.seq_num += 1 180 return wcs.cipher.auth_encrypt(s, b"", write_seq_num) 181 182 def post_build(self, pkt, pay): 183 """ 184 Apply the previous methods according to the writing cipher type. 185 """ 186 # Compute the length of TLSPlaintext fragment 187 hdr, frag = pkt[:5], pkt[5:] 188 if not isinstance(self.tls_session.rcs.cipher, Cipher_NULL): 189 frag = self._tls_auth_encrypt(frag) 190 191 if self.len is not None: 192 # The user gave us a 'len', let's respect this ultimately 193 hdr = hdr[:3] + struct.pack("!H", self.len) 194 else: 195 # Update header with the length of TLSCiphertext.inner 196 hdr = hdr[:3] + struct.pack("!H", len(frag)) 197 198 # Now we commit the pending write state if it has been triggered. We 199 # update nothing if the pwcs was not set. This probably means that 200 # we're working out-of-context (and we need to keep the default wcs). 201 if self.tls_session.triggered_pwcs_commit: 202 if self.tls_session.pwcs is not None: 203 self.tls_session.wcs = self.tls_session.pwcs 204 self.tls_session.pwcs = None 205 self.tls_session.triggered_pwcs_commit = False 206 207 return hdr + frag + pay 208 209