• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2017 Maxence Tury
5
6"""
7SSLv2 handshake fields & logic.
8"""
9
10import struct
11
12from scapy.error import log_runtime, warning
13from scapy.utils import randstring
14from scapy.fields import ByteEnumField, ByteField, EnumField, FieldLenField, \
15    ShortEnumField, StrLenField, XStrField, XStrLenField
16
17from scapy.packet import Padding
18from scapy.layers.tls.cert import Cert
19from scapy.layers.tls.basefields import _tls_version, _TLSVersionField
20from scapy.layers.tls.handshake import _CipherSuitesField
21from scapy.layers.tls.keyexchange import _TLSSignatureField, _TLSSignature
22from scapy.layers.tls.session import (_GenericTLSSessionInheritance,
23                                      readConnState, writeConnState)
24from scapy.layers.tls.crypto.suites import (_tls_cipher_suites,
25                                            _tls_cipher_suites_cls,
26                                            get_usable_ciphersuites,
27                                            SSL_CK_DES_192_EDE3_CBC_WITH_MD5)
28
29
30###############################################################################
31#   Generic SSLv2 Handshake message                                           #
32###############################################################################
33
34_sslv2_handshake_type = {0: "error", 1: "client_hello",
35                         2: "client_master_key", 3: "client_finished",
36                         4: "server_hello", 5: "server_verify",
37                         6: "server_finished", 7: "request_certificate",
38                         8: "client_certificate"}
39
40
41class _SSLv2Handshake(_GenericTLSSessionInheritance):
42    """
43    Inherited by other Handshake classes to get post_build().
44    Also used as a fallback for unknown TLS Handshake packets.
45    """
46    name = "SSLv2 Handshake Generic message"
47    fields_desc = [ByteEnumField("msgtype", None, _sslv2_handshake_type)]
48
49    def guess_payload_class(self, p):
50        return Padding
51
52    def tls_session_update(self, msg_str):
53        """
54        Covers both post_build- and post_dissection- context updates.
55        """
56        self.tls_session.handshake_messages.append(msg_str)
57        self.tls_session.handshake_messages_parsed.append(self)
58
59
60###############################################################################
61#   Error                                                                     #
62###############################################################################
63
64_tls_error_code = {1: "no_cipher", 2: "no_certificate",
65                   4: "bad_certificate", 6: "unsupported_certificate_type"}
66
67
68class SSLv2Error(_SSLv2Handshake):
69    """
70    SSLv2 Error.
71    """
72    name = "SSLv2 Handshake - Error"
73    fields_desc = [ByteEnumField("msgtype", 0, _sslv2_handshake_type),
74                   ShortEnumField("code", None, _tls_error_code)]
75
76
77###############################################################################
78#   ClientHello                                                               #
79###############################################################################
80
81class _SSLv2CipherSuitesField(_CipherSuitesField):
82    def __init__(self, name, default, dico, length_from=None):
83        _CipherSuitesField.__init__(self, name, default, dico,
84                                    length_from=length_from)
85        self.itemfmt = b""
86        self.itemsize = 3
87
88    def i2m(self, pkt, val):
89        if val is None:
90            val2 = []
91        val2 = [(x >> 16, x & 0x00ffff) for x in val]
92        return b"".join([struct.pack(">BH", x[0], x[1]) for x in val2])
93
94    def m2i(self, pkt, m):
95        res = []
96        while m:
97            res.append(struct.unpack("!I", b"\x00" + m[:3])[0])
98            m = m[3:]
99        return res
100
101
102class SSLv2ClientHello(_SSLv2Handshake):
103    """
104    SSLv2 ClientHello.
105    """
106    name = "SSLv2 Handshake - Client Hello"
107    fields_desc = [ByteEnumField("msgtype", 1, _sslv2_handshake_type),
108                   _TLSVersionField("version", 0x0002, _tls_version),
109
110                   FieldLenField("cipherslen", None, fmt="!H",
111                                 length_of="ciphers"),
112                   FieldLenField("sidlen", None, fmt="!H",
113                                 length_of="sid"),
114                   FieldLenField("challengelen", None, fmt="!H",
115                                 length_of="challenge"),
116
117                   XStrLenField("sid", b"",
118                                length_from=lambda pkt:pkt.sidlen),
119                   _SSLv2CipherSuitesField("ciphers",
120                                           [SSL_CK_DES_192_EDE3_CBC_WITH_MD5],
121                                           _tls_cipher_suites,
122                                           length_from=lambda pkt: pkt.cipherslen),  # noqa: E501
123                   XStrLenField("challenge", b"",
124                                length_from=lambda pkt:pkt.challengelen)]
125
126    def tls_session_update(self, msg_str):
127        super(SSLv2ClientHello, self).tls_session_update(msg_str)
128        self.tls_session.advertised_tls_version = self.version
129        self.tls_session.sslv2_common_cs = self.ciphers
130        self.tls_session.sslv2_challenge = self.challenge
131
132
133###############################################################################
134#   ServerHello                                                               #
135###############################################################################
136
137class _SSLv2CertDataField(StrLenField):
138    def getfield(self, pkt, s):
139        tmp_len = 0
140        if self.length_from is not None:
141            tmp_len = self.length_from(pkt)
142        try:
143            certdata = Cert(s[:tmp_len])
144        except Exception:
145            # Packets are sometimes wrongly interpreted as SSLv2
146            # (see record.py). We ignore failures silently
147            certdata = s[:tmp_len]
148        return s[tmp_len:], certdata
149
150    def i2len(self, pkt, i):
151        if isinstance(i, Cert):
152            return len(i.der)
153        return len(i)
154
155    def i2m(self, pkt, i):
156        if isinstance(i, Cert):
157            return i.der
158        return i
159
160
161class SSLv2ServerHello(_SSLv2Handshake):
162    """
163    SSLv2 ServerHello.
164    """
165    name = "SSLv2 Handshake - Server Hello"
166    fields_desc = [ByteEnumField("msgtype", 4, _sslv2_handshake_type),
167
168                   ByteField("sid_hit", 0),
169                   ByteEnumField("certtype", 1, {1: "x509_cert"}),
170                   _TLSVersionField("version", 0x0002, _tls_version),
171
172                   FieldLenField("certlen", None, fmt="!H",
173                                 length_of="cert"),
174                   FieldLenField("cipherslen", None, fmt="!H",
175                                 length_of="ciphers"),
176                   FieldLenField("connection_idlen", None, fmt="!H",
177                                 length_of="connection_id"),
178
179                   _SSLv2CertDataField("cert", b"",
180                                       length_from=lambda pkt: pkt.certlen),
181                   _SSLv2CipherSuitesField("ciphers", [], _tls_cipher_suites,
182                                           length_from=lambda pkt: pkt.cipherslen),  # noqa: E501
183                   XStrLenField("connection_id", b"",
184                                length_from=lambda pkt: pkt.connection_idlen)]
185
186    def tls_session_update(self, msg_str):
187        """
188        XXX Something should be done about the session ID here.
189        """
190        super(SSLv2ServerHello, self).tls_session_update(msg_str)
191
192        s = self.tls_session
193        client_cs = s.sslv2_common_cs
194        css = [cs for cs in client_cs if cs in self.ciphers]
195        s.sslv2_common_cs = css
196        s.sslv2_connection_id = self.connection_id
197        s.tls_version = self.version
198        if self.cert is not None:
199            s.server_certs = [self.cert]
200
201
202###############################################################################
203#   ClientMasterKey                                                           #
204###############################################################################
205
206class _SSLv2CipherSuiteField(EnumField):
207    def __init__(self, name, default, dico):
208        EnumField.__init__(self, name, default, dico)
209
210    def i2m(self, pkt, val):
211        if val is None:
212            return b""
213        val2 = (val >> 16, val & 0x00ffff)
214        return struct.pack(">BH", val2[0], val2[1])
215
216    def addfield(self, pkt, s, val):
217        return s + self.i2m(pkt, val)
218
219    def m2i(self, pkt, m):
220        return struct.unpack("!I", b"\x00" + m[:3])[0]
221
222    def getfield(self, pkt, s):
223        return s[3:], self.m2i(pkt, s)
224
225
226class _SSLv2EncryptedKeyField(XStrLenField):
227    def i2repr(self, pkt, x):
228        s = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, x)
229        if pkt.decryptedkey is not None:
230            dx = pkt.decryptedkey
231            ds = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, dx)
232            s += "    [decryptedkey= %s]" % ds
233        return s
234
235
236class SSLv2ClientMasterKey(_SSLv2Handshake):
237    """
238    SSLv2 ClientMasterKey.
239    """
240    __slots__ = ["decryptedkey"]
241    name = "SSLv2 Handshake - Client Master Key"
242    fields_desc = [ByteEnumField("msgtype", 2, _sslv2_handshake_type),
243                   _SSLv2CipherSuiteField("cipher", None, _tls_cipher_suites),
244
245                   FieldLenField("clearkeylen", None, fmt="!H",
246                                 length_of="clearkey"),
247                   FieldLenField("encryptedkeylen", None, fmt="!H",
248                                 length_of="encryptedkey"),
249                   FieldLenField("keyarglen", None, fmt="!H",
250                                 length_of="keyarg"),
251
252                   XStrLenField("clearkey", "",
253                                length_from=lambda pkt: pkt.clearkeylen),
254                   _SSLv2EncryptedKeyField("encryptedkey", "",
255                                           length_from=lambda pkt: pkt.encryptedkeylen),  # noqa: E501
256                   XStrLenField("keyarg", "",
257                                length_from=lambda pkt: pkt.keyarglen)]
258
259    def __init__(self, *args, **kargs):
260        """
261        When post_building, the packets fields are updated (this is somewhat
262        non-standard). We might need these fields later, but calling __str__
263        on a new packet (i.e. not dissected from a raw string) applies
264        post_build to an object different from the original one... unless
265        we hackishly always set self.explicit to 1.
266        """
267        self.decryptedkey = kargs.pop("decryptedkey", b"")
268        super(SSLv2ClientMasterKey, self).__init__(*args, **kargs)
269        self.explicit = 1
270
271    def pre_dissect(self, s):
272        clearkeylen = struct.unpack("!H", s[4:6])[0]
273        encryptedkeylen = struct.unpack("!H", s[6:8])[0]
274        encryptedkeystart = 10 + clearkeylen
275        encryptedkey = s[encryptedkeystart:encryptedkeystart + encryptedkeylen]
276        if self.tls_session.server_rsa_key:
277            self.decryptedkey = \
278                self.tls_session.server_rsa_key.decrypt(encryptedkey)
279        else:
280            self.decryptedkey = None
281        return s
282
283    def post_build(self, pkt, pay):
284        cs_val = None
285        if self.cipher is None:
286            common_cs = self.tls_session.sslv2_common_cs
287            cs_vals = get_usable_ciphersuites(common_cs, "SSLv2")
288            if len(cs_vals) == 0:
289                warning("No known common cipher suite between SSLv2 Hellos.")
290                cs_val = 0x0700c0
291                cipher = b"\x07\x00\xc0"
292            else:
293                cs_val = cs_vals[0]  # XXX choose the best one
294                cipher = struct.pack(">BH", cs_val >> 16, cs_val & 0x00ffff)
295            cs_cls = _tls_cipher_suites_cls[cs_val]
296            self.cipher = cs_val
297        else:
298            cipher = pkt[1:4]
299            cs_val = struct.unpack("!I", b"\x00" + cipher)[0]
300            if cs_val not in _tls_cipher_suites_cls:
301                warning("Unknown cipher suite %d from ClientMasterKey", cs_val)
302                cs_cls = None
303            else:
304                cs_cls = _tls_cipher_suites_cls[cs_val]
305
306        if cs_cls:
307            if (self.encryptedkey == b"" and
308                    len(self.tls_session.server_certs) > 0):
309                # else, the user is responsible for export slicing & encryption
310                key = randstring(cs_cls.cipher_alg.key_len)
311
312                if self.clearkey == b"" and cs_cls.kx_alg.export:
313                    self.clearkey = key[:-5]
314
315                if self.decryptedkey == b"":
316                    if cs_cls.kx_alg.export:
317                        self.decryptedkey = key[-5:]
318                    else:
319                        self.decryptedkey = key
320
321                pubkey = self.tls_session.server_certs[0].pubKey
322                self.encryptedkey = pubkey.encrypt(self.decryptedkey)
323
324            if self.keyarg == b"" and cs_cls.cipher_alg.type == "block":
325                self.keyarg = randstring(cs_cls.cipher_alg.block_size)
326
327        clearkey = self.clearkey or b""
328        if self.clearkeylen is None:
329            self.clearkeylen = len(clearkey)
330        clearkeylen = struct.pack("!H", self.clearkeylen)
331
332        encryptedkey = self.encryptedkey or b""
333        if self.encryptedkeylen is None:
334            self.encryptedkeylen = len(encryptedkey)
335        encryptedkeylen = struct.pack("!H", self.encryptedkeylen)
336
337        keyarg = self.keyarg or b""
338        if self.keyarglen is None:
339            self.keyarglen = len(keyarg)
340        keyarglen = struct.pack("!H", self.keyarglen)
341
342        s = (pkt[:1] + cipher +
343             clearkeylen + encryptedkeylen + keyarglen +
344             clearkey + encryptedkey + keyarg)
345        return s + pay
346
347    def tls_session_update(self, msg_str):
348        super(SSLv2ClientMasterKey, self).tls_session_update(msg_str)
349
350        s = self.tls_session
351        cs_val = self.cipher
352        if cs_val not in _tls_cipher_suites_cls:
353            warning("Unknown cipher suite %d from ClientMasterKey", cs_val)
354            cs_cls = None
355        else:
356            cs_cls = _tls_cipher_suites_cls[cs_val]
357
358        tls_version = s.tls_version or 0x0002
359        connection_end = s.connection_end
360        wcs_seq_num = s.wcs.seq_num
361        s.pwcs = writeConnState(ciphersuite=cs_cls,
362                                connection_end=connection_end,
363                                seq_num=wcs_seq_num,
364                                tls_version=tls_version)
365        rcs_seq_num = s.rcs.seq_num
366        s.prcs = readConnState(ciphersuite=cs_cls,
367                               connection_end=connection_end,
368                               seq_num=rcs_seq_num,
369                               tls_version=tls_version)
370
371        if self.decryptedkey is not None:
372            s.master_secret = self.clearkey + self.decryptedkey
373            s.compute_sslv2_km_and_derive_keys()
374
375            if s.pwcs.cipher.type == "block":
376                s.pwcs.cipher.iv = self.keyarg
377            if s.prcs.cipher.type == "block":
378                s.prcs.cipher.iv = self.keyarg
379
380            s.triggered_prcs_commit = True
381            s.triggered_pwcs_commit = True
382
383
384###############################################################################
385#   ServerVerify                                                              #
386###############################################################################
387
388class SSLv2ServerVerify(_SSLv2Handshake):
389    """
390    In order to parse a ServerVerify, the exact message string should be
391    fed to the class. This is how SSLv2 defines the challenge length...
392    """
393    name = "SSLv2 Handshake - Server Verify"
394    fields_desc = [ByteEnumField("msgtype", 5, _sslv2_handshake_type),
395                   XStrField("challenge", "")]
396
397    def build(self, *args, **kargs):
398        fval = self.getfieldval("challenge")
399        if fval is None:
400            self.challenge = self.tls_session.sslv2_challenge
401        return super(SSLv2ServerVerify, self).build(*args, **kargs)
402
403    def post_dissection(self, pkt):
404        s = self.tls_session
405        if s.sslv2_challenge is not None:
406            if self.challenge != s.sslv2_challenge:
407                pkt_info = pkt.firstlayer().summary()
408                log_runtime.info("TLS: invalid ServerVerify received [%s]", pkt_info)  # noqa: E501
409
410
411###############################################################################
412#   RequestCertificate                                                        #
413###############################################################################
414
415class SSLv2RequestCertificate(_SSLv2Handshake):
416    """
417    In order to parse a RequestCertificate, the exact message string should be
418    fed to the class. This is how SSLv2 defines the challenge length...
419    """
420    name = "SSLv2 Handshake - Request Certificate"
421    fields_desc = [ByteEnumField("msgtype", 7, _sslv2_handshake_type),
422                   ByteEnumField("authtype", 1, {1: "md5_with_rsa"}),
423                   XStrField("challenge", "")]
424
425    def tls_session_update(self, msg_str):
426        super(SSLv2RequestCertificate, self).tls_session_update(msg_str)
427        self.tls_session.sslv2_challenge_clientcert = self.challenge
428
429
430###############################################################################
431#   ClientCertificate                                                         #
432###############################################################################
433
434class SSLv2ClientCertificate(_SSLv2Handshake):
435    """
436    SSLv2 ClientCertificate.
437    """
438    name = "SSLv2 Handshake - Client Certificate"
439    fields_desc = [ByteEnumField("msgtype", 8, _sslv2_handshake_type),
440
441                   ByteEnumField("certtype", 1, {1: "x509_cert"}),
442                   FieldLenField("certlen", None, fmt="!H",
443                                 length_of="certdata"),
444                   FieldLenField("responselen", None, fmt="!H",
445                                 length_of="responsedata"),
446
447                   _SSLv2CertDataField("certdata", b"",
448                                       length_from=lambda pkt: pkt.certlen),
449                   _TLSSignatureField("responsedata", None,
450                                      length_from=lambda pkt: pkt.responselen)]
451
452    def build(self, *args, **kargs):
453        s = self.tls_session
454        sig = self.getfieldval("responsedata")
455        test = (sig is None and
456                s.sslv2_key_material is not None and
457                s.sslv2_challenge_clientcert is not None and
458                len(s.server_certs) > 0)
459        if test:
460            s = self.tls_session
461            m = (s.sslv2_key_material +
462                 s.sslv2_challenge_clientcert +
463                 s.server_certs[0].der)
464            self.responsedata = _TLSSignature(tls_session=s)
465            self.responsedata._update_sig(m, s.client_key)
466        else:
467            self.responsedata = b""
468        return super(SSLv2ClientCertificate, self).build(*args, **kargs)
469
470    def post_dissection_tls_session_update(self, msg_str):
471        self.tls_session_update(msg_str)
472
473        s = self.tls_session
474        test = (len(s.client_certs) > 0 and
475                s.sslv2_key_material is not None and
476                s.sslv2_challenge_clientcert is not None and
477                len(s.server_certs) > 0)
478        if test:
479            m = (s.sslv2_key_material +
480                 s.sslv2_challenge_clientcert +
481                 s.server_certs[0].der)
482            sig_test = self.responsedata._verify_sig(m, s.client_certs[0])
483            if not sig_test:
484                pkt_info = self.firstlayer().summary()
485                log_runtime.info("TLS: invalid client CertificateVerify signature [%s]", pkt_info)  # noqa: E501
486
487    def tls_session_update(self, msg_str):
488        super(SSLv2ClientCertificate, self).tls_session_update(msg_str)
489        if self.certdata:
490            self.tls_session.client_certs = [self.certdata]
491
492
493###############################################################################
494#   Finished                                                                  #
495###############################################################################
496
497class SSLv2ClientFinished(_SSLv2Handshake):
498    """
499    In order to parse a ClientFinished, the exact message string should be fed
500    to the class. SSLv2 does not offer any other way to know the c_id length.
501    """
502    name = "SSLv2 Handshake - Client Finished"
503    fields_desc = [ByteEnumField("msgtype", 3, _sslv2_handshake_type),
504                   XStrField("connection_id", "")]
505
506    def build(self, *args, **kargs):
507        fval = self.getfieldval("connection_id")
508        if fval == b"":
509            self.connection_id = self.tls_session.sslv2_connection_id
510        return super(SSLv2ClientFinished, self).build(*args, **kargs)
511
512    def post_dissection(self, pkt):
513        s = self.tls_session
514        if s.sslv2_connection_id is not None:
515            if self.connection_id != s.sslv2_connection_id:
516                pkt_info = pkt.firstlayer().summary()
517                log_runtime.info("TLS: invalid client Finished received [%s]", pkt_info)  # noqa: E501
518
519
520class SSLv2ServerFinished(_SSLv2Handshake):
521    """
522    In order to parse a ServerFinished, the exact message string should be fed
523    to the class. SSLv2 does not offer any other way to know the sid length.
524    """
525    name = "SSLv2 Handshake - Server Finished"
526    fields_desc = [ByteEnumField("msgtype", 6, _sslv2_handshake_type),
527                   XStrField("sid", "")]
528
529    def build(self, *args, **kargs):
530        fval = self.getfieldval("sid")
531        if fval == b"" and self.tls_session:
532            self.sid = self.tls_session.sid
533        return super(SSLv2ServerFinished, self).build(*args, **kargs)
534
535    def post_dissection_tls_session_update(self, msg_str):
536        self.tls_session_update(msg_str)
537        self.tls_session.sid = self.sid
538
539
540###############################################################################
541#   All handshake messages defined in this module                             #
542###############################################################################
543
544_sslv2_handshake_cls = {0: SSLv2Error, 1: SSLv2ClientHello,
545                        2: SSLv2ClientMasterKey, 3: SSLv2ClientFinished,
546                        4: SSLv2ServerHello, 5: SSLv2ServerVerify,
547                        6: SSLv2ServerFinished, 7: SSLv2RequestCertificate,
548                        8: SSLv2ClientCertificate}
549