• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
5
6"""
7SPNEGO
8
9Implements parts of:
10
11- GSSAPI SPNEGO: RFC4178 > RFC2478
12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX]
13
14.. note::
15    You will find more complete documentation for this layer over at
16    `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_
17"""
18
19import struct
20from uuid import UUID
21
22from scapy.asn1.asn1 import (
23    ASN1_OID,
24    ASN1_STRING,
25    ASN1_Codecs,
26)
27from scapy.asn1.mib import conf  # loads conf.mib
28from scapy.asn1fields import (
29    ASN1F_CHOICE,
30    ASN1F_ENUMERATED,
31    ASN1F_FLAGS,
32    ASN1F_GENERAL_STRING,
33    ASN1F_OID,
34    ASN1F_PACKET,
35    ASN1F_SEQUENCE,
36    ASN1F_SEQUENCE_OF,
37    ASN1F_STRING,
38    ASN1F_optional,
39)
40from scapy.asn1packet import ASN1_Packet
41from scapy.fields import (
42    FieldListField,
43    LEIntEnumField,
44    LEIntField,
45    LELongEnumField,
46    LELongField,
47    LEShortField,
48    MultipleTypeField,
49    PacketField,
50    PacketListField,
51    StrField,
52    StrFixedLenField,
53    UUIDEnumField,
54    UUIDField,
55    XStrFixedLenField,
56    XStrLenField,
57)
58from scapy.packet import Packet, bind_layers
59
60from scapy.layers.gssapi import (
61    GSSAPI_BLOB,
62    GSSAPI_BLOB_SIGNATURE,
63    GSS_C_FLAGS,
64    GSS_S_BAD_MECH,
65    GSS_S_COMPLETE,
66    GSS_S_CONTINUE_NEEDED,
67    SSP,
68    _GSSAPI_OIDS,
69    _GSSAPI_SIGNATURE_OIDS,
70)
71
72# SSP Providers
73from scapy.layers.kerberos import (
74    Kerberos,
75)
76from scapy.layers.ntlm import (
77    NEGOEX_EXCHANGE_NTLM,
78    NTLM_Header,
79    _NTLMPayloadField,
80    _NTLMPayloadPacket,
81)
82
83# Typing imports
84from typing import (
85    Dict,
86    Optional,
87    Tuple,
88)
89
90# SPNEGO negTokenInit
91# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1
92
93
94class SPNEGO_MechType(ASN1_Packet):
95    ASN1_codec = ASN1_Codecs.BER
96    ASN1_root = ASN1F_OID("oid", None)
97
98
99class SPNEGO_MechTypes(ASN1_Packet):
100    ASN1_codec = ASN1_Codecs.BER
101    ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType)
102
103
104class SPNEGO_MechListMIC(ASN1_Packet):
105    ASN1_codec = ASN1_Codecs.BER
106    ASN1_root = ASN1F_STRING("value", "")
107
108
109_mechDissector = {
110    "1.3.6.1.4.1.311.2.2.10": NTLM_Header,  # NTLM
111    "1.2.840.48018.1.2.2": Kerberos,  # MS KRB5 - Microsoft Kerberos 5
112    "1.2.840.113554.1.2.2": Kerberos,  # Kerberos 5
113}
114
115
116class _SPNEGO_Token_Field(ASN1F_STRING):
117    def i2m(self, pkt, x):
118        if x is None:
119            x = b""
120        return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x))
121
122    def m2i(self, pkt, s):
123        dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s)
124        if isinstance(pkt.underlayer, SPNEGO_negTokenInit):
125            types = pkt.underlayer.mechTypes
126        elif isinstance(pkt.underlayer, SPNEGO_negTokenResp):
127            types = [pkt.underlayer.supportedMech]
128        if types and types[0] and types[0].oid.val in _mechDissector:
129            return _mechDissector[types[0].oid.val](dat.val), r
130        return dat, r
131
132
133class SPNEGO_Token(ASN1_Packet):
134    ASN1_codec = ASN1_Codecs.BER
135    ASN1_root = _SPNEGO_Token_Field("value", None)
136
137
138_ContextFlags = [
139    "delegFlag",
140    "mutualFlag",
141    "replayFlag",
142    "sequenceFlag",
143    "superseded",
144    "anonFlag",
145    "confFlag",
146    "integFlag",
147]
148
149
150class SPNEGO_negHints(ASN1_Packet):
151    # [MS-SPNG] 2.2.1
152    ASN1_codec = ASN1_Codecs.BER
153    ASN1_root = ASN1F_SEQUENCE(
154        ASN1F_optional(
155            ASN1F_GENERAL_STRING(
156                "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0
157            ),
158        ),
159        ASN1F_optional(
160            ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1),
161        ),
162    )
163
164
165class SPNEGO_negTokenInit(ASN1_Packet):
166    ASN1_codec = ASN1_Codecs.BER
167    ASN1_root = ASN1F_SEQUENCE(
168        ASN1F_optional(
169            ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0)
170        ),
171        ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)),
172        ASN1F_optional(
173            ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2)
174        ),
175        # [MS-SPNG] flavor !
176        ASN1F_optional(
177            ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3)
178        ),
179        ASN1F_optional(
180            ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4)
181        ),
182        # Compat with RFC 4178's SPNEGO_negTokenInit
183        ASN1F_optional(
184            ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
185        ),
186    )
187
188
189# SPNEGO negTokenTarg
190# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2
191
192
193class SPNEGO_negTokenResp(ASN1_Packet):
194    ASN1_codec = ASN1_Codecs.BER
195    ASN1_root = ASN1F_SEQUENCE(
196        ASN1F_optional(
197            ASN1F_ENUMERATED(
198                "negResult",
199                0,
200                {
201                    0: "accept-completed",
202                    1: "accept-incomplete",
203                    2: "reject",
204                    3: "request-mic",
205                },
206                explicit_tag=0xA0,
207            ),
208        ),
209        ASN1F_optional(
210            ASN1F_PACKET(
211                "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1
212            ),
213        ),
214        ASN1F_optional(
215            ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2)
216        ),
217        ASN1F_optional(
218            ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3)
219        ),
220    )
221
222
223class SPNEGO_negToken(ASN1_Packet):
224    ASN1_codec = ASN1_Codecs.BER
225    ASN1_root = ASN1F_CHOICE(
226        "token",
227        SPNEGO_negTokenInit(),
228        ASN1F_PACKET(
229            "negTokenInit",
230            SPNEGO_negTokenInit(),
231            SPNEGO_negTokenInit,
232            explicit_tag=0xA0,
233        ),
234        ASN1F_PACKET(
235            "negTokenResp",
236            SPNEGO_negTokenResp(),
237            SPNEGO_negTokenResp,
238            explicit_tag=0xA1,
239        ),
240    )
241
242
243# Register for the GSS API Blob
244
245_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
246_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken
247
248
249def mechListMIC(oids):
250    """
251    Implementation of RFC 4178 - Appendix D. mechListMIC Computation
252    """
253    return bytes(SPNEGO_MechTypes(mechTypes=oids))
254
255
256# NEGOEX
257# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2
258
259
260_NEGOEX_AUTH_SCHEMES = {
261    # Reversed. Is there any doc related to this?
262    # The NEGOEX doc is very ellusive
263    UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')",
264}
265
266
267class NEGOEX_MESSAGE_HEADER(Packet):
268    fields_desc = [
269        StrFixedLenField("Signature", "NEGOEXTS", length=8),
270        LEIntEnumField(
271            "MessageType",
272            0,
273            {
274                0x0: "INITIATOR_NEGO",
275                0x01: "ACCEPTOR_NEGO",
276                0x02: "INITIATOR_META_DATA",
277                0x03: "ACCEPTOR_META_DATA",
278                0x04: "CHALLENGE",
279                0x05: "AP_REQUEST",
280                0x06: "VERIFY",
281                0x07: "ALERT",
282            },
283        ),
284        LEIntField("SequenceNum", 0),
285        LEIntField("cbHeaderLength", None),
286        LEIntField("cbMessageLength", None),
287        UUIDField("ConversationId", None),
288    ]
289
290    def post_build(self, pkt, pay):
291        if self.cbHeaderLength is None:
292            pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:]
293        if self.cbMessageLength is None:
294            pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:]
295        return pkt + pay
296
297
298def _NEGOEX_post_build(self, p, pay_offset, fields):
299    # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes
300    """Util function to build the offset and populate the lengths"""
301    for field_name, value in self.fields["Payload"]:
302        length = self.get_field("Payload").fields_map[field_name].i2len(self, value)
303        count = self.get_field("Payload").fields_map[field_name].i2count(self, value)
304        offset = fields[field_name]
305        # Offset
306        if self.getfieldval(field_name + "BufferOffset") is None:
307            p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :]
308        # Count
309        if self.getfieldval(field_name + "Count") is None:
310            p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :]
311        pay_offset += length
312    return p
313
314
315class NEGOEX_BYTE_VECTOR(Packet):
316    fields_desc = [
317        LEIntField("ByteArrayBufferOffset", 0),
318        LEIntField("ByteArrayLength", 0),
319    ]
320
321    def guess_payload_class(self, payload):
322        return conf.padding_layer
323
324
325class NEGOEX_EXTENSION_VECTOR(Packet):
326    fields_desc = [
327        LELongField("ExtensionArrayOffset", 0),
328        LEShortField("ExtensionCount", 0),
329    ]
330
331
332class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket):
333    OFFSET = 92
334    show_indent = 0
335    fields_desc = [
336        NEGOEX_MESSAGE_HEADER,
337        XStrFixedLenField("Random", b"", length=32),
338        LELongField("ProtocolVersion", 0),
339        LEIntField("AuthSchemeBufferOffset", None),
340        LEShortField("AuthSchemeCount", None),
341        LEIntField("ExtensionBufferOffset", None),
342        LEShortField("ExtensionCount", None),
343        # Payload
344        _NTLMPayloadField(
345            "Payload",
346            OFFSET,
347            [
348                FieldListField(
349                    "AuthScheme",
350                    [],
351                    UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES),
352                    count_from=lambda pkt: pkt.AuthSchemeCount,
353                ),
354                PacketListField(
355                    "Extension",
356                    [],
357                    NEGOEX_EXTENSION_VECTOR,
358                    count_from=lambda pkt: pkt.ExtensionCount,
359                ),
360            ],
361            length_from=lambda pkt: pkt.cbMessageLength - 92,
362        ),
363        # TODO: dissect extensions
364    ]
365
366    def post_build(self, pkt, pay):
367        # type: (bytes, bytes) -> bytes
368        return (
369            _NEGOEX_post_build(
370                self,
371                pkt,
372                self.OFFSET,
373                {
374                    "AuthScheme": 96,
375                    "Extension": 102,
376                },
377            )
378            + pay
379        )
380
381    @classmethod
382    def dispatch_hook(cls, _pkt=None, *args, **kargs):
383        if _pkt and len(_pkt) >= 12:
384            MessageType = struct.unpack("<I", _pkt[8:12])[0]
385            if MessageType in [0, 1]:
386                return NEGOEX_NEGO_MESSAGE
387            elif MessageType in [2, 3]:
388                return NEGOEX_EXCHANGE_MESSAGE
389        return cls
390
391
392# RFC3961
393_checksum_types = {
394    1: "CRC32",
395    2: "RSA-MD4",
396    3: "RSA-MD4-DES",
397    4: "DES-MAC",
398    5: "DES-MAC-K",
399    6: "RSA-MDA-DES-K",
400    7: "RSA-MD5",
401    8: "RSA-MD5-DES",
402    9: "RSA-MD5-DES3",
403    10: "SHA1",
404    12: "HMAC-SHA1-DES3-KD",
405    13: "HMAC-SHA1-DES3",
406    14: "SHA1",
407    15: "HMAC-SHA1-96-AES128",
408    16: "HMAC-SHA1-96-AES256",
409}
410
411
412def _checksum_size(pkt):
413    if pkt.ChecksumType == 1:
414        return 4
415    elif pkt.ChecksumType in [2, 4, 6, 7]:
416        return 16
417    elif pkt.ChecksumType in [3, 8, 9]:
418        return 24
419    elif pkt.ChecksumType == 5:
420        return 8
421    elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]:
422        return 20
423    return 0
424
425
426class NEGOEX_CHECKSUM(Packet):
427    fields_desc = [
428        LELongField("cbHeaderLength", 20),
429        LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}),
430        LELongEnumField("ChecksumType", None, _checksum_types),
431        XStrLenField("ChecksumValue", b"", length_from=_checksum_size),
432    ]
433
434
435class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket):
436    OFFSET = 64
437    show_indent = 0
438    fields_desc = [
439        NEGOEX_MESSAGE_HEADER,
440        UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
441        LEIntField("ExchangeBufferOffset", 0),
442        LEIntField("ExchangeLen", 0),
443        _NTLMPayloadField(
444            "Payload",
445            OFFSET,
446            [
447                # The NEGOEX doc mentions the following blob as as an
448                # "opaque handshake for the client authentication scheme".
449                # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is
450                # probably not accurate.
451                MultipleTypeField(
452                    [
453                        (
454                            PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM),
455                            lambda pkt: pkt.AuthScheme
456                            == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"),
457                        ),
458                    ],
459                    StrField("Exchange", b""),
460                )
461            ],
462            length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength,
463        ),
464    ]
465
466
467class NEGOEX_VERIFY_MESSAGE(Packet):
468    show_indent = 0
469    fields_desc = [
470        NEGOEX_MESSAGE_HEADER,
471        UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES),
472        PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM),
473    ]
474
475
476bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE)
477
478
479_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE
480
481# -- SSP
482
483
484class SPNEGOSSP(SSP):
485    """
486    The SPNEGO SSP
487
488    :param ssps: a dict with keys being the SSP class, and the value being a
489                 dictionary of the keyword arguments to pass it on init.
490
491    Example::
492
493        from scapy.layers.ntlm import NTLMSSP
494        from scapy.layers.kerberos import KerberosSSP
495        from scapy.layers.spnego import SPNEGOSSP
496        from scapy.layers.smbserver import smbserver
497        from scapy.libs.rfc3961 import Encryption, Key
498
499        ssp = SPNEGOSSP([
500            NTLMSSP(
501                IDENTITIES={
502                    "User1": MD4le("Password1"),
503                    "Administrator": MD4le("Password123!"),
504                }
505            ),
506            KerberosSSP(
507                SPN="cifs/server2.domain.local",
508                KEY=Key(
509                    Encryption.AES256,
510                    key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b")
511                )
512            )
513        ])
514        smbserver(ssp=ssp)
515    """
516
517    __slots__ = [
518        "supported_ssps",
519        "force_supported_mechtypes",
520    ]
521    auth_type = 0x09
522
523    class STATE(SSP.STATE):
524        FIRST = 1
525        CHANGESSP = 2
526        NORMAL = 3
527
528    class CONTEXT(SSP.CONTEXT):
529        __slots__ = [
530            "supported_mechtypes",
531            "requested_mechtypes",
532            "req_flags",
533            "negotiated_mechtype",
534            "first_choice",
535            "sub_context",
536            "ssp",
537        ]
538
539        def __init__(
540            self, supported_ssps, req_flags=None, force_supported_mechtypes=None
541        ):
542            self.state = SPNEGOSSP.STATE.FIRST
543            self.requested_mechtypes = None
544            self.req_flags = req_flags
545            self.first_choice = True
546            self.negotiated_mechtype = None
547            self.sub_context = None
548            self.ssp = None
549            if force_supported_mechtypes is None:
550                self.supported_mechtypes = [
551                    SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps
552                ]
553                self.supported_mechtypes.sort(
554                    key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val)
555                )
556            else:
557                self.supported_mechtypes = force_supported_mechtypes
558            super(SPNEGOSSP.CONTEXT, self).__init__()
559
560        # Passthrough attributes and functions
561
562        def clifailure(self):
563            self.sub_context.clifailure()
564
565        def __getattr__(self, attr):
566            try:
567                return object.__getattribute__(self, attr)
568            except AttributeError:
569                return getattr(self.sub_context, attr)
570
571        def __setattr__(self, attr, val):
572            try:
573                return object.__setattr__(self, attr, val)
574            except AttributeError:
575                return setattr(self.sub_context, attr, val)
576
577        # Passthrough the flags property
578
579        @property
580        def flags(self):
581            if self.sub_context:
582                return self.sub_context.flags
583            return GSS_C_FLAGS(0)
584
585        @flags.setter
586        def flags(self, x):
587            if not self.sub_context:
588                return
589            self.sub_context.flags = x
590
591        def __repr__(self):
592            return "SPNEGOSSP[%s]" % repr(self.sub_context)
593
594    _MECH_ALIASES = {
595        # Kerberos has 2 ssps
596        "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2",
597        "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2",
598    }
599
600    # This is the order Windows chooses. We mimic it for plausibility
601    _PREF_ORDER = [
602        "1.2.840.48018.1.2.2",  # MS KRB5
603        "1.2.840.113554.1.2.2",  # Kerberos 5
604        "1.3.6.1.4.1.311.2.2.30",  # NEGOEX
605        "1.3.6.1.4.1.311.2.2.10",  # NTLM
606    ]
607
608    def __init__(self, ssps, **kwargs):
609        self.supported_ssps = {x.oid: x for x in ssps}
610        # Apply MechTypes aliases
611        for ssp in ssps:
612            if ssp.oid in self._MECH_ALIASES:
613                self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[
614                    ssp.oid
615                ]
616        self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None)
617        super(SPNEGOSSP, self).__init__(**kwargs)
618
619    def _extract_gssapi(self, Context, x):
620        status, otherMIC, rawToken = None, None, False
621        # Extract values from GSSAPI
622        if isinstance(x, GSSAPI_BLOB):
623            x = x.innerToken
624        if isinstance(x, SPNEGO_negToken):
625            x = x.token
626        if hasattr(x, "mechTypes"):
627            Context.requested_mechtypes = x.mechTypes
628            Context.negotiated_mechtype = None
629        if hasattr(x, "supportedMech") and x.supportedMech is not None:
630            Context.negotiated_mechtype = x.supportedMech
631        if hasattr(x, "mechListMIC") and x.mechListMIC:
632            otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val)
633        if hasattr(x, "_mechListMIC") and x._mechListMIC:
634            otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val)
635        if hasattr(x, "negResult"):
636            status = x.negResult
637        try:
638            x = x.mechToken
639        except AttributeError:
640            try:
641                x = x.responseToken
642            except AttributeError:
643                # No GSSAPI wrapper (windows fallback). Remember this for answer
644                rawToken = True
645        if isinstance(x, SPNEGO_Token):
646            x = x.value
647        if Context.requested_mechtypes:
648            try:
649                cls = _mechDissector[
650                    (
651                        Context.negotiated_mechtype or Context.requested_mechtypes[0]
652                    ).oid.val  # noqa: E501
653                ]
654            except KeyError:
655                cls = conf.raw_layer
656            if isinstance(x, ASN1_STRING):
657                x = cls(x.val)
658            elif isinstance(x, conf.raw_layer):
659                x = cls(x.load)
660        return x, status, otherMIC, rawToken
661
662    def NegTokenInit2(self):
663        """
664        Server-Initiation of GSSAPI/SPNEGO.
665        See [MS-SPNG] sect 3.2.5.2
666        """
667        Context = self.CONTEXT(
668            self.supported_ssps,
669            force_supported_mechtypes=self.force_supported_mechtypes,
670        )
671        return (
672            Context,
673            GSSAPI_BLOB(
674                innerToken=SPNEGO_negToken(
675                    token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes)
676                )
677            ),
678        )
679
680        # NOTE: NegoEX has an effect on how the SecurityContext is
681        # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2
682        # But the format that the Exchange token uses appears not to
683        # be documented :/
684
685        # resp.SecurityBlob.innerToken.token.mechTypes.insert(
686        #     0,
687        #     # NEGOEX
688        #     SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"),
689        # )
690        # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token(
691        #     value=negoex_token
692        # )  # noqa: E501
693
694    def GSS_WrapEx(self, Context, *args, **kwargs):
695        # Passthrough
696        return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs)
697
698    def GSS_UnwrapEx(self, Context, *args, **kwargs):
699        # Passthrough
700        return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs)
701
702    def GSS_GetMICEx(self, Context, *args, **kwargs):
703        # Passthrough
704        return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs)
705
706    def GSS_VerifyMICEx(self, Context, *args, **kwargs):
707        # Passthrough
708        return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs)
709
710    def LegsAmount(self, Context: CONTEXT):
711        return 4
712
713    def _common_spnego_handler(self, Context, IsClient, val=None, req_flags=None):
714        if Context is None:
715            # New Context
716            Context = SPNEGOSSP.CONTEXT(
717                self.supported_ssps,
718                req_flags=req_flags,
719                force_supported_mechtypes=self.force_supported_mechtypes,
720            )
721            if IsClient:
722                Context.requested_mechtypes = Context.supported_mechtypes
723
724        # Extract values from GSSAPI token
725        status, MIC, otherMIC, rawToken = 0, None, None, False
726        if val:
727            val, status, otherMIC, rawToken = self._extract_gssapi(Context, val)
728
729        # If we don't have a SSP already negotiated, check for requested and available
730        # SSPs and find a common one.
731        if Context.ssp is None:
732            if Context.negotiated_mechtype is None:
733                if Context.requested_mechtypes:
734                    # Find a common SSP
735                    try:
736                        Context.negotiated_mechtype = next(
737                            x
738                            for x in Context.requested_mechtypes
739                            if x in Context.supported_mechtypes
740                        )
741                    except StopIteration:
742                        # no common mechanisms
743                        raise ValueError("No common SSP mechanisms !")
744                    # Check whether the selected SSP was the one preferred by the client
745                    if (
746                        Context.negotiated_mechtype != Context.requested_mechtypes[0]
747                        and val
748                    ):
749                        Context.first_choice = False
750                # No SSPs were requested. Use the first available SSP we know.
751                elif Context.supported_mechtypes:
752                    Context.negotiated_mechtype = Context.supported_mechtypes[0]
753                else:
754                    raise ValueError("Can't figure out what SSP to use")
755            # Set Context.ssp to the object matching the chosen SSP type.
756            Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val]
757
758        if not Context.first_choice:
759            # The currently provided token is not for this SSP !
760            # Typically a client opportunistically starts with Kerberos, including
761            # its APREQ, and we want to use NTLM. We add one round trip
762            Context.state = SPNEGOSSP.STATE.FIRST
763            Context.first_choice = True  # reset to not come here again.
764            tok, status = None, GSS_S_CONTINUE_NEEDED
765        else:
766            # The currently provided token is for this SSP !
767            # Pass it to the sub ssp, with its own context
768            if IsClient:
769                (
770                    Context.sub_context,
771                    tok,
772                    status,
773                ) = Context.ssp.GSS_Init_sec_context(
774                    Context.sub_context,
775                    val=val,
776                    req_flags=Context.req_flags,
777                )
778            else:
779                Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context(
780                    Context.sub_context, val=val
781                )
782            # Check whether client or server says the specified mechanism is not valid
783            if status == GSS_S_BAD_MECH:
784                # Mechanism is not usable. Typically the Kerberos SPN is wrong
785                to_remove = [Context.negotiated_mechtype.oid.val]
786                # If there's an alias (for the multiple kerberos oids, also include it)
787                if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES:
788                    to_remove.append(
789                        SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val]
790                    )
791                # Drop those unusable mechanisms from the supported list
792                for x in list(Context.supported_mechtypes):
793                    if x.oid.val in to_remove:
794                        Context.supported_mechtypes.remove(x)
795                # Re-calculate negotiated mechtype
796                try:
797                    Context.negotiated_mechtype = next(
798                        x
799                        for x in Context.requested_mechtypes
800                        if x in Context.supported_mechtypes
801                    )
802                except StopIteration:
803                    # no common mechanisms
804                    raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !")
805                # Start again.
806                Context.state = SPNEGOSSP.STATE.CHANGESSP
807                Context.ssp = None  # Reset the SSP
808                Context.sub_context = None  # Reset the SSP context
809                if IsClient:
810                    # Call ourselves again for the client to generate a token
811                    return self._common_spnego_handler(Context, True, None)
812                else:
813                    # Return nothing but the supported SSP list
814                    tok, status = None, GSS_S_CONTINUE_NEEDED
815
816        if rawToken:
817            # No GSSAPI wrapper (fallback)
818            return Context, tok, status
819
820        # Client success
821        if IsClient and tok is None and status == GSS_S_COMPLETE:
822            return Context, None, status
823
824        # Map GSSAPI codes to SPNEGO
825        if status == GSS_S_COMPLETE:
826            negResult = 0  # accept_completed
827        elif status == GSS_S_CONTINUE_NEEDED:
828            negResult = 1  # accept_incomplete
829        else:
830            negResult = 2  # reject
831
832        # GSSAPI-MIC
833        if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context):
834            # The documentation on mechListMIC wasn't clear, so note that:
835            # - The mechListMIC that the client sends is computed over the
836            #   list of mechanisms that it requests.
837            # - the mechListMIC that the server sends is computed over the
838            #   list of mechanisms that the client requested.
839            # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG]
840            # is NOT protected. That's not necessarily an issue, since it was
841            # optional in most cases, but it's something to keep in mind.
842            if otherMIC is not None:
843                # Check the received MIC if any
844                if IsClient:  # from server
845                    Context.ssp.verifyMechListMIC(
846                        Context,
847                        otherMIC,
848                        mechListMIC(Context.supported_mechtypes),
849                    )
850                else:  # from client
851                    Context.ssp.verifyMechListMIC(
852                        Context,
853                        otherMIC,
854                        mechListMIC(Context.requested_mechtypes),
855                    )
856            # Then build our own MIC
857            if IsClient:  # client
858                if negResult == 0:
859                    # Include MIC for the last packet. We could add a check
860                    # here to only send the MIC when required (when preferred ssp
861                    # isn't chosen)
862                    MIC = Context.ssp.getMechListMIC(
863                        Context,
864                        mechListMIC(Context.supported_mechtypes),
865                    )
866            else:  # server
867                MIC = Context.ssp.getMechListMIC(
868                    Context,
869                    mechListMIC(Context.requested_mechtypes),
870                )
871
872        if IsClient:
873            if Context.state == SPNEGOSSP.STATE.FIRST:
874                # First client token
875                spnego_tok = SPNEGO_negToken(
876                    token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes)
877                )
878                if tok:
879                    spnego_tok.token.mechToken = SPNEGO_Token(value=tok)
880            else:
881                # Subsequent client tokens
882                spnego_tok = SPNEGO_negToken(  # GSSAPI_BLOB is stripped
883                    token=SPNEGO_negTokenResp(
884                        supportedMech=None,
885                        negResult=None,
886                    )
887                )
888                if tok:
889                    spnego_tok.token.responseToken = SPNEGO_Token(value=tok)
890                if Context.state == SPNEGOSSP.STATE.CHANGESSP:
891                    # On renegotiation, include the negResult and chosen mechanism
892                    spnego_tok.token.negResult = negResult
893                    spnego_tok.token.supportedMech = Context.negotiated_mechtype
894        else:
895            spnego_tok = SPNEGO_negToken(  # GSSAPI_BLOB is stripped
896                token=SPNEGO_negTokenResp(
897                    supportedMech=None,
898                    negResult=negResult,
899                )
900            )
901            if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]:
902                # Include the supportedMech list if this is the first thing we do
903                # or a renegotiation.
904                spnego_tok.token.supportedMech = Context.negotiated_mechtype
905            if tok:
906                spnego_tok.token.responseToken = SPNEGO_Token(value=tok)
907        # Apply MIC if available
908        if MIC:
909            spnego_tok.token.mechListMIC = SPNEGO_MechListMIC(
910                value=ASN1_STRING(MIC),
911            )
912        if (
913            IsClient and Context.state == SPNEGOSSP.STATE.FIRST
914        ):  # Client: after the first packet, specifying 'SPNEGO' is implicit.
915            # Always implicit for the server.
916            spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok)
917        # Not the first token anymore
918        Context.state = SPNEGOSSP.STATE.NORMAL
919        return Context, spnego_tok, status
920
921    def GSS_Init_sec_context(
922        self, Context: CONTEXT, val=None, req_flags: Optional[GSS_C_FLAGS] = None
923    ):
924        return self._common_spnego_handler(Context, True, val=val, req_flags=req_flags)
925
926    def GSS_Accept_sec_context(self, Context: CONTEXT, val=None):
927        return self._common_spnego_handler(Context, False, val=val, req_flags=0)
928
929    def GSS_Passive(self, Context: CONTEXT, val=None):
930        if Context is None:
931            # New Context
932            Context = SPNEGOSSP.CONTEXT(self.supported_ssps)
933            Context.passive = True
934
935        # Extraction
936        val, status, _, rawToken = self._extract_gssapi(Context, val)
937
938        if val is None and status == GSS_S_COMPLETE:
939            return Context, None
940
941        # Just get the negotiated SSP
942        if Context.negotiated_mechtype:
943            mechtype = Context.negotiated_mechtype
944        elif Context.requested_mechtypes:
945            mechtype = Context.requested_mechtypes[0]
946        elif rawToken and Context.supported_mechtypes:
947            mechtype = Context.supported_mechtypes[0]
948        else:
949            return None, GSS_S_BAD_MECH
950        try:
951            ssp = self.supported_ssps[mechtype.oid.val]
952        except KeyError:
953            return None, GSS_S_BAD_MECH
954
955        if Context.ssp is not None:
956            # Detect resets
957            if Context.ssp != ssp:
958                Context.ssp = ssp
959                Context.sub_context = None
960        else:
961            Context.ssp = ssp
962
963        # Passthrough
964        Context.sub_context, status = Context.ssp.GSS_Passive(Context.sub_context, val)
965
966        return Context, status
967
968    def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False):
969        Context.ssp.GSS_Passive_set_Direction(
970            Context.sub_context, IsAcceptor=IsAcceptor
971        )
972
973    def MaximumSignatureLength(self, Context: CONTEXT):
974        return Context.ssp.MaximumSignatureLength(Context.sub_context)
975