1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) 2017 Maxence Tury 5 6""" 7SSLv2 handshake fields & logic. 8""" 9 10import struct 11 12from scapy.error import log_runtime, warning 13from scapy.utils import randstring 14from scapy.fields import ByteEnumField, ByteField, EnumField, FieldLenField, \ 15 ShortEnumField, StrLenField, XStrField, XStrLenField 16 17from scapy.packet import Padding 18from scapy.layers.tls.cert import Cert 19from scapy.layers.tls.basefields import _tls_version, _TLSVersionField 20from scapy.layers.tls.handshake import _CipherSuitesField 21from scapy.layers.tls.keyexchange import _TLSSignatureField, _TLSSignature 22from scapy.layers.tls.session import (_GenericTLSSessionInheritance, 23 readConnState, writeConnState) 24from scapy.layers.tls.crypto.suites import (_tls_cipher_suites, 25 _tls_cipher_suites_cls, 26 get_usable_ciphersuites, 27 SSL_CK_DES_192_EDE3_CBC_WITH_MD5) 28 29 30############################################################################### 31# Generic SSLv2 Handshake message # 32############################################################################### 33 34_sslv2_handshake_type = {0: "error", 1: "client_hello", 35 2: "client_master_key", 3: "client_finished", 36 4: "server_hello", 5: "server_verify", 37 6: "server_finished", 7: "request_certificate", 38 8: "client_certificate"} 39 40 41class _SSLv2Handshake(_GenericTLSSessionInheritance): 42 """ 43 Inherited by other Handshake classes to get post_build(). 44 Also used as a fallback for unknown TLS Handshake packets. 45 """ 46 name = "SSLv2 Handshake Generic message" 47 fields_desc = [ByteEnumField("msgtype", None, _sslv2_handshake_type)] 48 49 def guess_payload_class(self, p): 50 return Padding 51 52 def tls_session_update(self, msg_str): 53 """ 54 Covers both post_build- and post_dissection- context updates. 55 """ 56 self.tls_session.handshake_messages.append(msg_str) 57 self.tls_session.handshake_messages_parsed.append(self) 58 59 60############################################################################### 61# Error # 62############################################################################### 63 64_tls_error_code = {1: "no_cipher", 2: "no_certificate", 65 4: "bad_certificate", 6: "unsupported_certificate_type"} 66 67 68class SSLv2Error(_SSLv2Handshake): 69 """ 70 SSLv2 Error. 71 """ 72 name = "SSLv2 Handshake - Error" 73 fields_desc = [ByteEnumField("msgtype", 0, _sslv2_handshake_type), 74 ShortEnumField("code", None, _tls_error_code)] 75 76 77############################################################################### 78# ClientHello # 79############################################################################### 80 81class _SSLv2CipherSuitesField(_CipherSuitesField): 82 def __init__(self, name, default, dico, length_from=None): 83 _CipherSuitesField.__init__(self, name, default, dico, 84 length_from=length_from) 85 self.itemfmt = b"" 86 self.itemsize = 3 87 88 def i2m(self, pkt, val): 89 if val is None: 90 val2 = [] 91 val2 = [(x >> 16, x & 0x00ffff) for x in val] 92 return b"".join([struct.pack(">BH", x[0], x[1]) for x in val2]) 93 94 def m2i(self, pkt, m): 95 res = [] 96 while m: 97 res.append(struct.unpack("!I", b"\x00" + m[:3])[0]) 98 m = m[3:] 99 return res 100 101 102class SSLv2ClientHello(_SSLv2Handshake): 103 """ 104 SSLv2 ClientHello. 105 """ 106 name = "SSLv2 Handshake - Client Hello" 107 fields_desc = [ByteEnumField("msgtype", 1, _sslv2_handshake_type), 108 _TLSVersionField("version", 0x0002, _tls_version), 109 110 FieldLenField("cipherslen", None, fmt="!H", 111 length_of="ciphers"), 112 FieldLenField("sidlen", None, fmt="!H", 113 length_of="sid"), 114 FieldLenField("challengelen", None, fmt="!H", 115 length_of="challenge"), 116 117 XStrLenField("sid", b"", 118 length_from=lambda pkt:pkt.sidlen), 119 _SSLv2CipherSuitesField("ciphers", 120 [SSL_CK_DES_192_EDE3_CBC_WITH_MD5], 121 _tls_cipher_suites, 122 length_from=lambda pkt: pkt.cipherslen), # noqa: E501 123 XStrLenField("challenge", b"", 124 length_from=lambda pkt:pkt.challengelen)] 125 126 def tls_session_update(self, msg_str): 127 super(SSLv2ClientHello, self).tls_session_update(msg_str) 128 self.tls_session.advertised_tls_version = self.version 129 self.tls_session.sslv2_common_cs = self.ciphers 130 self.tls_session.sslv2_challenge = self.challenge 131 132 133############################################################################### 134# ServerHello # 135############################################################################### 136 137class _SSLv2CertDataField(StrLenField): 138 def getfield(self, pkt, s): 139 tmp_len = 0 140 if self.length_from is not None: 141 tmp_len = self.length_from(pkt) 142 try: 143 certdata = Cert(s[:tmp_len]) 144 except Exception: 145 # Packets are sometimes wrongly interpreted as SSLv2 146 # (see record.py). We ignore failures silently 147 certdata = s[:tmp_len] 148 return s[tmp_len:], certdata 149 150 def i2len(self, pkt, i): 151 if isinstance(i, Cert): 152 return len(i.der) 153 return len(i) 154 155 def i2m(self, pkt, i): 156 if isinstance(i, Cert): 157 return i.der 158 return i 159 160 161class SSLv2ServerHello(_SSLv2Handshake): 162 """ 163 SSLv2 ServerHello. 164 """ 165 name = "SSLv2 Handshake - Server Hello" 166 fields_desc = [ByteEnumField("msgtype", 4, _sslv2_handshake_type), 167 168 ByteField("sid_hit", 0), 169 ByteEnumField("certtype", 1, {1: "x509_cert"}), 170 _TLSVersionField("version", 0x0002, _tls_version), 171 172 FieldLenField("certlen", None, fmt="!H", 173 length_of="cert"), 174 FieldLenField("cipherslen", None, fmt="!H", 175 length_of="ciphers"), 176 FieldLenField("connection_idlen", None, fmt="!H", 177 length_of="connection_id"), 178 179 _SSLv2CertDataField("cert", b"", 180 length_from=lambda pkt: pkt.certlen), 181 _SSLv2CipherSuitesField("ciphers", [], _tls_cipher_suites, 182 length_from=lambda pkt: pkt.cipherslen), # noqa: E501 183 XStrLenField("connection_id", b"", 184 length_from=lambda pkt: pkt.connection_idlen)] 185 186 def tls_session_update(self, msg_str): 187 """ 188 XXX Something should be done about the session ID here. 189 """ 190 super(SSLv2ServerHello, self).tls_session_update(msg_str) 191 192 s = self.tls_session 193 client_cs = s.sslv2_common_cs 194 css = [cs for cs in client_cs if cs in self.ciphers] 195 s.sslv2_common_cs = css 196 s.sslv2_connection_id = self.connection_id 197 s.tls_version = self.version 198 if self.cert is not None: 199 s.server_certs = [self.cert] 200 201 202############################################################################### 203# ClientMasterKey # 204############################################################################### 205 206class _SSLv2CipherSuiteField(EnumField): 207 def __init__(self, name, default, dico): 208 EnumField.__init__(self, name, default, dico) 209 210 def i2m(self, pkt, val): 211 if val is None: 212 return b"" 213 val2 = (val >> 16, val & 0x00ffff) 214 return struct.pack(">BH", val2[0], val2[1]) 215 216 def addfield(self, pkt, s, val): 217 return s + self.i2m(pkt, val) 218 219 def m2i(self, pkt, m): 220 return struct.unpack("!I", b"\x00" + m[:3])[0] 221 222 def getfield(self, pkt, s): 223 return s[3:], self.m2i(pkt, s) 224 225 226class _SSLv2EncryptedKeyField(XStrLenField): 227 def i2repr(self, pkt, x): 228 s = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, x) 229 if pkt.decryptedkey is not None: 230 dx = pkt.decryptedkey 231 ds = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, dx) 232 s += " [decryptedkey= %s]" % ds 233 return s 234 235 236class SSLv2ClientMasterKey(_SSLv2Handshake): 237 """ 238 SSLv2 ClientMasterKey. 239 """ 240 __slots__ = ["decryptedkey"] 241 name = "SSLv2 Handshake - Client Master Key" 242 fields_desc = [ByteEnumField("msgtype", 2, _sslv2_handshake_type), 243 _SSLv2CipherSuiteField("cipher", None, _tls_cipher_suites), 244 245 FieldLenField("clearkeylen", None, fmt="!H", 246 length_of="clearkey"), 247 FieldLenField("encryptedkeylen", None, fmt="!H", 248 length_of="encryptedkey"), 249 FieldLenField("keyarglen", None, fmt="!H", 250 length_of="keyarg"), 251 252 XStrLenField("clearkey", "", 253 length_from=lambda pkt: pkt.clearkeylen), 254 _SSLv2EncryptedKeyField("encryptedkey", "", 255 length_from=lambda pkt: pkt.encryptedkeylen), # noqa: E501 256 XStrLenField("keyarg", "", 257 length_from=lambda pkt: pkt.keyarglen)] 258 259 def __init__(self, *args, **kargs): 260 """ 261 When post_building, the packets fields are updated (this is somewhat 262 non-standard). We might need these fields later, but calling __str__ 263 on a new packet (i.e. not dissected from a raw string) applies 264 post_build to an object different from the original one... unless 265 we hackishly always set self.explicit to 1. 266 """ 267 self.decryptedkey = kargs.pop("decryptedkey", b"") 268 super(SSLv2ClientMasterKey, self).__init__(*args, **kargs) 269 self.explicit = 1 270 271 def pre_dissect(self, s): 272 clearkeylen = struct.unpack("!H", s[4:6])[0] 273 encryptedkeylen = struct.unpack("!H", s[6:8])[0] 274 encryptedkeystart = 10 + clearkeylen 275 encryptedkey = s[encryptedkeystart:encryptedkeystart + encryptedkeylen] 276 if self.tls_session.server_rsa_key: 277 self.decryptedkey = \ 278 self.tls_session.server_rsa_key.decrypt(encryptedkey) 279 else: 280 self.decryptedkey = None 281 return s 282 283 def post_build(self, pkt, pay): 284 cs_val = None 285 if self.cipher is None: 286 common_cs = self.tls_session.sslv2_common_cs 287 cs_vals = get_usable_ciphersuites(common_cs, "SSLv2") 288 if len(cs_vals) == 0: 289 warning("No known common cipher suite between SSLv2 Hellos.") 290 cs_val = 0x0700c0 291 cipher = b"\x07\x00\xc0" 292 else: 293 cs_val = cs_vals[0] # XXX choose the best one 294 cipher = struct.pack(">BH", cs_val >> 16, cs_val & 0x00ffff) 295 cs_cls = _tls_cipher_suites_cls[cs_val] 296 self.cipher = cs_val 297 else: 298 cipher = pkt[1:4] 299 cs_val = struct.unpack("!I", b"\x00" + cipher)[0] 300 if cs_val not in _tls_cipher_suites_cls: 301 warning("Unknown cipher suite %d from ClientMasterKey", cs_val) 302 cs_cls = None 303 else: 304 cs_cls = _tls_cipher_suites_cls[cs_val] 305 306 if cs_cls: 307 if (self.encryptedkey == b"" and 308 len(self.tls_session.server_certs) > 0): 309 # else, the user is responsible for export slicing & encryption 310 key = randstring(cs_cls.cipher_alg.key_len) 311 312 if self.clearkey == b"" and cs_cls.kx_alg.export: 313 self.clearkey = key[:-5] 314 315 if self.decryptedkey == b"": 316 if cs_cls.kx_alg.export: 317 self.decryptedkey = key[-5:] 318 else: 319 self.decryptedkey = key 320 321 pubkey = self.tls_session.server_certs[0].pubKey 322 self.encryptedkey = pubkey.encrypt(self.decryptedkey) 323 324 if self.keyarg == b"" and cs_cls.cipher_alg.type == "block": 325 self.keyarg = randstring(cs_cls.cipher_alg.block_size) 326 327 clearkey = self.clearkey or b"" 328 if self.clearkeylen is None: 329 self.clearkeylen = len(clearkey) 330 clearkeylen = struct.pack("!H", self.clearkeylen) 331 332 encryptedkey = self.encryptedkey or b"" 333 if self.encryptedkeylen is None: 334 self.encryptedkeylen = len(encryptedkey) 335 encryptedkeylen = struct.pack("!H", self.encryptedkeylen) 336 337 keyarg = self.keyarg or b"" 338 if self.keyarglen is None: 339 self.keyarglen = len(keyarg) 340 keyarglen = struct.pack("!H", self.keyarglen) 341 342 s = (pkt[:1] + cipher + 343 clearkeylen + encryptedkeylen + keyarglen + 344 clearkey + encryptedkey + keyarg) 345 return s + pay 346 347 def tls_session_update(self, msg_str): 348 super(SSLv2ClientMasterKey, self).tls_session_update(msg_str) 349 350 s = self.tls_session 351 cs_val = self.cipher 352 if cs_val not in _tls_cipher_suites_cls: 353 warning("Unknown cipher suite %d from ClientMasterKey", cs_val) 354 cs_cls = None 355 else: 356 cs_cls = _tls_cipher_suites_cls[cs_val] 357 358 tls_version = s.tls_version or 0x0002 359 connection_end = s.connection_end 360 wcs_seq_num = s.wcs.seq_num 361 s.pwcs = writeConnState(ciphersuite=cs_cls, 362 connection_end=connection_end, 363 seq_num=wcs_seq_num, 364 tls_version=tls_version) 365 rcs_seq_num = s.rcs.seq_num 366 s.prcs = readConnState(ciphersuite=cs_cls, 367 connection_end=connection_end, 368 seq_num=rcs_seq_num, 369 tls_version=tls_version) 370 371 if self.decryptedkey is not None: 372 s.master_secret = self.clearkey + self.decryptedkey 373 s.compute_sslv2_km_and_derive_keys() 374 375 if s.pwcs.cipher.type == "block": 376 s.pwcs.cipher.iv = self.keyarg 377 if s.prcs.cipher.type == "block": 378 s.prcs.cipher.iv = self.keyarg 379 380 s.triggered_prcs_commit = True 381 s.triggered_pwcs_commit = True 382 383 384############################################################################### 385# ServerVerify # 386############################################################################### 387 388class SSLv2ServerVerify(_SSLv2Handshake): 389 """ 390 In order to parse a ServerVerify, the exact message string should be 391 fed to the class. This is how SSLv2 defines the challenge length... 392 """ 393 name = "SSLv2 Handshake - Server Verify" 394 fields_desc = [ByteEnumField("msgtype", 5, _sslv2_handshake_type), 395 XStrField("challenge", "")] 396 397 def build(self, *args, **kargs): 398 fval = self.getfieldval("challenge") 399 if fval is None: 400 self.challenge = self.tls_session.sslv2_challenge 401 return super(SSLv2ServerVerify, self).build(*args, **kargs) 402 403 def post_dissection(self, pkt): 404 s = self.tls_session 405 if s.sslv2_challenge is not None: 406 if self.challenge != s.sslv2_challenge: 407 pkt_info = pkt.firstlayer().summary() 408 log_runtime.info("TLS: invalid ServerVerify received [%s]", pkt_info) # noqa: E501 409 410 411############################################################################### 412# RequestCertificate # 413############################################################################### 414 415class SSLv2RequestCertificate(_SSLv2Handshake): 416 """ 417 In order to parse a RequestCertificate, the exact message string should be 418 fed to the class. This is how SSLv2 defines the challenge length... 419 """ 420 name = "SSLv2 Handshake - Request Certificate" 421 fields_desc = [ByteEnumField("msgtype", 7, _sslv2_handshake_type), 422 ByteEnumField("authtype", 1, {1: "md5_with_rsa"}), 423 XStrField("challenge", "")] 424 425 def tls_session_update(self, msg_str): 426 super(SSLv2RequestCertificate, self).tls_session_update(msg_str) 427 self.tls_session.sslv2_challenge_clientcert = self.challenge 428 429 430############################################################################### 431# ClientCertificate # 432############################################################################### 433 434class SSLv2ClientCertificate(_SSLv2Handshake): 435 """ 436 SSLv2 ClientCertificate. 437 """ 438 name = "SSLv2 Handshake - Client Certificate" 439 fields_desc = [ByteEnumField("msgtype", 8, _sslv2_handshake_type), 440 441 ByteEnumField("certtype", 1, {1: "x509_cert"}), 442 FieldLenField("certlen", None, fmt="!H", 443 length_of="certdata"), 444 FieldLenField("responselen", None, fmt="!H", 445 length_of="responsedata"), 446 447 _SSLv2CertDataField("certdata", b"", 448 length_from=lambda pkt: pkt.certlen), 449 _TLSSignatureField("responsedata", None, 450 length_from=lambda pkt: pkt.responselen)] 451 452 def build(self, *args, **kargs): 453 s = self.tls_session 454 sig = self.getfieldval("responsedata") 455 test = (sig is None and 456 s.sslv2_key_material is not None and 457 s.sslv2_challenge_clientcert is not None and 458 len(s.server_certs) > 0) 459 if test: 460 s = self.tls_session 461 m = (s.sslv2_key_material + 462 s.sslv2_challenge_clientcert + 463 s.server_certs[0].der) 464 self.responsedata = _TLSSignature(tls_session=s) 465 self.responsedata._update_sig(m, s.client_key) 466 else: 467 self.responsedata = b"" 468 return super(SSLv2ClientCertificate, self).build(*args, **kargs) 469 470 def post_dissection_tls_session_update(self, msg_str): 471 self.tls_session_update(msg_str) 472 473 s = self.tls_session 474 test = (len(s.client_certs) > 0 and 475 s.sslv2_key_material is not None and 476 s.sslv2_challenge_clientcert is not None and 477 len(s.server_certs) > 0) 478 if test: 479 m = (s.sslv2_key_material + 480 s.sslv2_challenge_clientcert + 481 s.server_certs[0].der) 482 sig_test = self.responsedata._verify_sig(m, s.client_certs[0]) 483 if not sig_test: 484 pkt_info = self.firstlayer().summary() 485 log_runtime.info("TLS: invalid client CertificateVerify signature [%s]", pkt_info) # noqa: E501 486 487 def tls_session_update(self, msg_str): 488 super(SSLv2ClientCertificate, self).tls_session_update(msg_str) 489 if self.certdata: 490 self.tls_session.client_certs = [self.certdata] 491 492 493############################################################################### 494# Finished # 495############################################################################### 496 497class SSLv2ClientFinished(_SSLv2Handshake): 498 """ 499 In order to parse a ClientFinished, the exact message string should be fed 500 to the class. SSLv2 does not offer any other way to know the c_id length. 501 """ 502 name = "SSLv2 Handshake - Client Finished" 503 fields_desc = [ByteEnumField("msgtype", 3, _sslv2_handshake_type), 504 XStrField("connection_id", "")] 505 506 def build(self, *args, **kargs): 507 fval = self.getfieldval("connection_id") 508 if fval == b"": 509 self.connection_id = self.tls_session.sslv2_connection_id 510 return super(SSLv2ClientFinished, self).build(*args, **kargs) 511 512 def post_dissection(self, pkt): 513 s = self.tls_session 514 if s.sslv2_connection_id is not None: 515 if self.connection_id != s.sslv2_connection_id: 516 pkt_info = pkt.firstlayer().summary() 517 log_runtime.info("TLS: invalid client Finished received [%s]", pkt_info) # noqa: E501 518 519 520class SSLv2ServerFinished(_SSLv2Handshake): 521 """ 522 In order to parse a ServerFinished, the exact message string should be fed 523 to the class. SSLv2 does not offer any other way to know the sid length. 524 """ 525 name = "SSLv2 Handshake - Server Finished" 526 fields_desc = [ByteEnumField("msgtype", 6, _sslv2_handshake_type), 527 XStrField("sid", "")] 528 529 def build(self, *args, **kargs): 530 fval = self.getfieldval("sid") 531 if fval == b"" and self.tls_session: 532 self.sid = self.tls_session.sid 533 return super(SSLv2ServerFinished, self).build(*args, **kargs) 534 535 def post_dissection_tls_session_update(self, msg_str): 536 self.tls_session_update(msg_str) 537 self.tls_session.sid = self.sid 538 539 540############################################################################### 541# All handshake messages defined in this module # 542############################################################################### 543 544_sslv2_handshake_cls = {0: SSLv2Error, 1: SSLv2ClientHello, 545 2: SSLv2ClientMasterKey, 3: SSLv2ClientFinished, 546 4: SSLv2ServerHello, 5: SSLv2ServerVerify, 547 6: SSLv2ServerFinished, 7: SSLv2RequestCertificate, 548 8: SSLv2ClientCertificate} 549