1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 5 6""" 7SPNEGO 8 9Implements parts of: 10 11- GSSAPI SPNEGO: RFC4178 > RFC2478 12- GSSAPI SPNEGO NEGOEX: [MS-NEGOEX] 13 14.. note:: 15 You will find more complete documentation for this layer over at 16 `GSSAPI <https://scapy.readthedocs.io/en/latest/layers/gssapi.html#spnego>`_ 17""" 18 19import struct 20from uuid import UUID 21 22from scapy.asn1.asn1 import ( 23 ASN1_OID, 24 ASN1_STRING, 25 ASN1_Codecs, 26) 27from scapy.asn1.mib import conf # loads conf.mib 28from scapy.asn1fields import ( 29 ASN1F_CHOICE, 30 ASN1F_ENUMERATED, 31 ASN1F_FLAGS, 32 ASN1F_GENERAL_STRING, 33 ASN1F_OID, 34 ASN1F_PACKET, 35 ASN1F_SEQUENCE, 36 ASN1F_SEQUENCE_OF, 37 ASN1F_STRING, 38 ASN1F_optional, 39) 40from scapy.asn1packet import ASN1_Packet 41from scapy.fields import ( 42 FieldListField, 43 LEIntEnumField, 44 LEIntField, 45 LELongEnumField, 46 LELongField, 47 LEShortField, 48 MultipleTypeField, 49 PacketField, 50 PacketListField, 51 StrField, 52 StrFixedLenField, 53 UUIDEnumField, 54 UUIDField, 55 XStrFixedLenField, 56 XStrLenField, 57) 58from scapy.packet import Packet, bind_layers 59 60from scapy.layers.gssapi import ( 61 GSSAPI_BLOB, 62 GSSAPI_BLOB_SIGNATURE, 63 GSS_C_FLAGS, 64 GSS_S_BAD_MECH, 65 GSS_S_COMPLETE, 66 GSS_S_CONTINUE_NEEDED, 67 SSP, 68 _GSSAPI_OIDS, 69 _GSSAPI_SIGNATURE_OIDS, 70) 71 72# SSP Providers 73from scapy.layers.kerberos import ( 74 Kerberos, 75) 76from scapy.layers.ntlm import ( 77 NEGOEX_EXCHANGE_NTLM, 78 NTLM_Header, 79 _NTLMPayloadField, 80 _NTLMPayloadPacket, 81) 82 83# Typing imports 84from typing import ( 85 Dict, 86 Optional, 87 Tuple, 88) 89 90# SPNEGO negTokenInit 91# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.1 92 93 94class SPNEGO_MechType(ASN1_Packet): 95 ASN1_codec = ASN1_Codecs.BER 96 ASN1_root = ASN1F_OID("oid", None) 97 98 99class SPNEGO_MechTypes(ASN1_Packet): 100 ASN1_codec = ASN1_Codecs.BER 101 ASN1_root = ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType) 102 103 104class SPNEGO_MechListMIC(ASN1_Packet): 105 ASN1_codec = ASN1_Codecs.BER 106 ASN1_root = ASN1F_STRING("value", "") 107 108 109_mechDissector = { 110 "1.3.6.1.4.1.311.2.2.10": NTLM_Header, # NTLM 111 "1.2.840.48018.1.2.2": Kerberos, # MS KRB5 - Microsoft Kerberos 5 112 "1.2.840.113554.1.2.2": Kerberos, # Kerberos 5 113} 114 115 116class _SPNEGO_Token_Field(ASN1F_STRING): 117 def i2m(self, pkt, x): 118 if x is None: 119 x = b"" 120 return super(_SPNEGO_Token_Field, self).i2m(pkt, bytes(x)) 121 122 def m2i(self, pkt, s): 123 dat, r = super(_SPNEGO_Token_Field, self).m2i(pkt, s) 124 if isinstance(pkt.underlayer, SPNEGO_negTokenInit): 125 types = pkt.underlayer.mechTypes 126 elif isinstance(pkt.underlayer, SPNEGO_negTokenResp): 127 types = [pkt.underlayer.supportedMech] 128 if types and types[0] and types[0].oid.val in _mechDissector: 129 return _mechDissector[types[0].oid.val](dat.val), r 130 return dat, r 131 132 133class SPNEGO_Token(ASN1_Packet): 134 ASN1_codec = ASN1_Codecs.BER 135 ASN1_root = _SPNEGO_Token_Field("value", None) 136 137 138_ContextFlags = [ 139 "delegFlag", 140 "mutualFlag", 141 "replayFlag", 142 "sequenceFlag", 143 "superseded", 144 "anonFlag", 145 "confFlag", 146 "integFlag", 147] 148 149 150class SPNEGO_negHints(ASN1_Packet): 151 # [MS-SPNG] 2.2.1 152 ASN1_codec = ASN1_Codecs.BER 153 ASN1_root = ASN1F_SEQUENCE( 154 ASN1F_optional( 155 ASN1F_GENERAL_STRING( 156 "hintName", "not_defined_in_RFC4178@please_ignore", explicit_tag=0xA0 157 ), 158 ), 159 ASN1F_optional( 160 ASN1F_GENERAL_STRING("hintAddress", None, explicit_tag=0xA1), 161 ), 162 ) 163 164 165class SPNEGO_negTokenInit(ASN1_Packet): 166 ASN1_codec = ASN1_Codecs.BER 167 ASN1_root = ASN1F_SEQUENCE( 168 ASN1F_optional( 169 ASN1F_SEQUENCE_OF("mechTypes", None, SPNEGO_MechType, explicit_tag=0xA0) 170 ), 171 ASN1F_optional(ASN1F_FLAGS("reqFlags", None, _ContextFlags, implicit_tag=0x81)), 172 ASN1F_optional( 173 ASN1F_PACKET("mechToken", None, SPNEGO_Token, explicit_tag=0xA2) 174 ), 175 # [MS-SPNG] flavor ! 176 ASN1F_optional( 177 ASN1F_PACKET("negHints", None, SPNEGO_negHints, explicit_tag=0xA3) 178 ), 179 ASN1F_optional( 180 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA4) 181 ), 182 # Compat with RFC 4178's SPNEGO_negTokenInit 183 ASN1F_optional( 184 ASN1F_PACKET("_mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 185 ), 186 ) 187 188 189# SPNEGO negTokenTarg 190# https://datatracker.ietf.org/doc/html/rfc4178#section-4.2.2 191 192 193class SPNEGO_negTokenResp(ASN1_Packet): 194 ASN1_codec = ASN1_Codecs.BER 195 ASN1_root = ASN1F_SEQUENCE( 196 ASN1F_optional( 197 ASN1F_ENUMERATED( 198 "negResult", 199 0, 200 { 201 0: "accept-completed", 202 1: "accept-incomplete", 203 2: "reject", 204 3: "request-mic", 205 }, 206 explicit_tag=0xA0, 207 ), 208 ), 209 ASN1F_optional( 210 ASN1F_PACKET( 211 "supportedMech", SPNEGO_MechType(), SPNEGO_MechType, explicit_tag=0xA1 212 ), 213 ), 214 ASN1F_optional( 215 ASN1F_PACKET("responseToken", None, SPNEGO_Token, explicit_tag=0xA2) 216 ), 217 ASN1F_optional( 218 ASN1F_PACKET("mechListMIC", None, SPNEGO_MechListMIC, explicit_tag=0xA3) 219 ), 220 ) 221 222 223class SPNEGO_negToken(ASN1_Packet): 224 ASN1_codec = ASN1_Codecs.BER 225 ASN1_root = ASN1F_CHOICE( 226 "token", 227 SPNEGO_negTokenInit(), 228 ASN1F_PACKET( 229 "negTokenInit", 230 SPNEGO_negTokenInit(), 231 SPNEGO_negTokenInit, 232 explicit_tag=0xA0, 233 ), 234 ASN1F_PACKET( 235 "negTokenResp", 236 SPNEGO_negTokenResp(), 237 SPNEGO_negTokenResp, 238 explicit_tag=0xA1, 239 ), 240 ) 241 242 243# Register for the GSS API Blob 244 245_GSSAPI_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 246_GSSAPI_SIGNATURE_OIDS["1.3.6.1.5.5.2"] = SPNEGO_negToken 247 248 249def mechListMIC(oids): 250 """ 251 Implementation of RFC 4178 - Appendix D. mechListMIC Computation 252 """ 253 return bytes(SPNEGO_MechTypes(mechTypes=oids)) 254 255 256# NEGOEX 257# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-negoex/0ad7a003-ab56-4839-a204-b555ca6759a2 258 259 260_NEGOEX_AUTH_SCHEMES = { 261 # Reversed. Is there any doc related to this? 262 # The NEGOEX doc is very ellusive 263 UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"): "UUID('[NTLM-UUID]')", 264} 265 266 267class NEGOEX_MESSAGE_HEADER(Packet): 268 fields_desc = [ 269 StrFixedLenField("Signature", "NEGOEXTS", length=8), 270 LEIntEnumField( 271 "MessageType", 272 0, 273 { 274 0x0: "INITIATOR_NEGO", 275 0x01: "ACCEPTOR_NEGO", 276 0x02: "INITIATOR_META_DATA", 277 0x03: "ACCEPTOR_META_DATA", 278 0x04: "CHALLENGE", 279 0x05: "AP_REQUEST", 280 0x06: "VERIFY", 281 0x07: "ALERT", 282 }, 283 ), 284 LEIntField("SequenceNum", 0), 285 LEIntField("cbHeaderLength", None), 286 LEIntField("cbMessageLength", None), 287 UUIDField("ConversationId", None), 288 ] 289 290 def post_build(self, pkt, pay): 291 if self.cbHeaderLength is None: 292 pkt = pkt[16:] + struct.pack("<I", len(pkt)) + pkt[20:] 293 if self.cbMessageLength is None: 294 pkt = pkt[20:] + struct.pack("<I", len(pkt) + len(pay)) + pkt[24:] 295 return pkt + pay 296 297 298def _NEGOEX_post_build(self, p, pay_offset, fields): 299 # type: (Packet, bytes, int, Dict[str, Tuple[str, int]]) -> bytes 300 """Util function to build the offset and populate the lengths""" 301 for field_name, value in self.fields["Payload"]: 302 length = self.get_field("Payload").fields_map[field_name].i2len(self, value) 303 count = self.get_field("Payload").fields_map[field_name].i2count(self, value) 304 offset = fields[field_name] 305 # Offset 306 if self.getfieldval(field_name + "BufferOffset") is None: 307 p = p[:offset] + struct.pack("<I", pay_offset) + p[offset + 4 :] 308 # Count 309 if self.getfieldval(field_name + "Count") is None: 310 p = p[: offset + 4] + struct.pack("<H", count) + p[offset + 6 :] 311 pay_offset += length 312 return p 313 314 315class NEGOEX_BYTE_VECTOR(Packet): 316 fields_desc = [ 317 LEIntField("ByteArrayBufferOffset", 0), 318 LEIntField("ByteArrayLength", 0), 319 ] 320 321 def guess_payload_class(self, payload): 322 return conf.padding_layer 323 324 325class NEGOEX_EXTENSION_VECTOR(Packet): 326 fields_desc = [ 327 LELongField("ExtensionArrayOffset", 0), 328 LEShortField("ExtensionCount", 0), 329 ] 330 331 332class NEGOEX_NEGO_MESSAGE(_NTLMPayloadPacket): 333 OFFSET = 92 334 show_indent = 0 335 fields_desc = [ 336 NEGOEX_MESSAGE_HEADER, 337 XStrFixedLenField("Random", b"", length=32), 338 LELongField("ProtocolVersion", 0), 339 LEIntField("AuthSchemeBufferOffset", None), 340 LEShortField("AuthSchemeCount", None), 341 LEIntField("ExtensionBufferOffset", None), 342 LEShortField("ExtensionCount", None), 343 # Payload 344 _NTLMPayloadField( 345 "Payload", 346 OFFSET, 347 [ 348 FieldListField( 349 "AuthScheme", 350 [], 351 UUIDEnumField("", None, _NEGOEX_AUTH_SCHEMES), 352 count_from=lambda pkt: pkt.AuthSchemeCount, 353 ), 354 PacketListField( 355 "Extension", 356 [], 357 NEGOEX_EXTENSION_VECTOR, 358 count_from=lambda pkt: pkt.ExtensionCount, 359 ), 360 ], 361 length_from=lambda pkt: pkt.cbMessageLength - 92, 362 ), 363 # TODO: dissect extensions 364 ] 365 366 def post_build(self, pkt, pay): 367 # type: (bytes, bytes) -> bytes 368 return ( 369 _NEGOEX_post_build( 370 self, 371 pkt, 372 self.OFFSET, 373 { 374 "AuthScheme": 96, 375 "Extension": 102, 376 }, 377 ) 378 + pay 379 ) 380 381 @classmethod 382 def dispatch_hook(cls, _pkt=None, *args, **kargs): 383 if _pkt and len(_pkt) >= 12: 384 MessageType = struct.unpack("<I", _pkt[8:12])[0] 385 if MessageType in [0, 1]: 386 return NEGOEX_NEGO_MESSAGE 387 elif MessageType in [2, 3]: 388 return NEGOEX_EXCHANGE_MESSAGE 389 return cls 390 391 392# RFC3961 393_checksum_types = { 394 1: "CRC32", 395 2: "RSA-MD4", 396 3: "RSA-MD4-DES", 397 4: "DES-MAC", 398 5: "DES-MAC-K", 399 6: "RSA-MDA-DES-K", 400 7: "RSA-MD5", 401 8: "RSA-MD5-DES", 402 9: "RSA-MD5-DES3", 403 10: "SHA1", 404 12: "HMAC-SHA1-DES3-KD", 405 13: "HMAC-SHA1-DES3", 406 14: "SHA1", 407 15: "HMAC-SHA1-96-AES128", 408 16: "HMAC-SHA1-96-AES256", 409} 410 411 412def _checksum_size(pkt): 413 if pkt.ChecksumType == 1: 414 return 4 415 elif pkt.ChecksumType in [2, 4, 6, 7]: 416 return 16 417 elif pkt.ChecksumType in [3, 8, 9]: 418 return 24 419 elif pkt.ChecksumType == 5: 420 return 8 421 elif pkt.ChecksumType in [10, 12, 13, 14, 15, 16]: 422 return 20 423 return 0 424 425 426class NEGOEX_CHECKSUM(Packet): 427 fields_desc = [ 428 LELongField("cbHeaderLength", 20), 429 LELongEnumField("ChecksumScheme", 1, {1: "CHECKSUM_SCHEME_RFC3961"}), 430 LELongEnumField("ChecksumType", None, _checksum_types), 431 XStrLenField("ChecksumValue", b"", length_from=_checksum_size), 432 ] 433 434 435class NEGOEX_EXCHANGE_MESSAGE(_NTLMPayloadPacket): 436 OFFSET = 64 437 show_indent = 0 438 fields_desc = [ 439 NEGOEX_MESSAGE_HEADER, 440 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 441 LEIntField("ExchangeBufferOffset", 0), 442 LEIntField("ExchangeLen", 0), 443 _NTLMPayloadField( 444 "Payload", 445 OFFSET, 446 [ 447 # The NEGOEX doc mentions the following blob as as an 448 # "opaque handshake for the client authentication scheme". 449 # NEGOEX_EXCHANGE_NTLM is a reversed interpretation, and is 450 # probably not accurate. 451 MultipleTypeField( 452 [ 453 ( 454 PacketField("Exchange", None, NEGOEX_EXCHANGE_NTLM), 455 lambda pkt: pkt.AuthScheme 456 == UUID("5c33530d-eaf9-0d4d-b2ec-4ae3786ec308"), 457 ), 458 ], 459 StrField("Exchange", b""), 460 ) 461 ], 462 length_from=lambda pkt: pkt.cbMessageLength - pkt.cbHeaderLength, 463 ), 464 ] 465 466 467class NEGOEX_VERIFY_MESSAGE(Packet): 468 show_indent = 0 469 fields_desc = [ 470 NEGOEX_MESSAGE_HEADER, 471 UUIDEnumField("AuthScheme", None, _NEGOEX_AUTH_SCHEMES), 472 PacketField("Checksum", NEGOEX_CHECKSUM(), NEGOEX_CHECKSUM), 473 ] 474 475 476bind_layers(NEGOEX_NEGO_MESSAGE, NEGOEX_NEGO_MESSAGE) 477 478 479_mechDissector["1.3.6.1.4.1.311.2.2.30"] = NEGOEX_NEGO_MESSAGE 480 481# -- SSP 482 483 484class SPNEGOSSP(SSP): 485 """ 486 The SPNEGO SSP 487 488 :param ssps: a dict with keys being the SSP class, and the value being a 489 dictionary of the keyword arguments to pass it on init. 490 491 Example:: 492 493 from scapy.layers.ntlm import NTLMSSP 494 from scapy.layers.kerberos import KerberosSSP 495 from scapy.layers.spnego import SPNEGOSSP 496 from scapy.layers.smbserver import smbserver 497 from scapy.libs.rfc3961 import Encryption, Key 498 499 ssp = SPNEGOSSP([ 500 NTLMSSP( 501 IDENTITIES={ 502 "User1": MD4le("Password1"), 503 "Administrator": MD4le("Password123!"), 504 } 505 ), 506 KerberosSSP( 507 SPN="cifs/server2.domain.local", 508 KEY=Key( 509 Encryption.AES256, 510 key=hex_bytes("5e9255c907b2f7e969ddad816eabbec8f1f7a387c7194ecc98b827bdc9421c2b") 511 ) 512 ) 513 ]) 514 smbserver(ssp=ssp) 515 """ 516 517 __slots__ = [ 518 "supported_ssps", 519 "force_supported_mechtypes", 520 ] 521 auth_type = 0x09 522 523 class STATE(SSP.STATE): 524 FIRST = 1 525 CHANGESSP = 2 526 NORMAL = 3 527 528 class CONTEXT(SSP.CONTEXT): 529 __slots__ = [ 530 "supported_mechtypes", 531 "requested_mechtypes", 532 "req_flags", 533 "negotiated_mechtype", 534 "first_choice", 535 "sub_context", 536 "ssp", 537 ] 538 539 def __init__( 540 self, supported_ssps, req_flags=None, force_supported_mechtypes=None 541 ): 542 self.state = SPNEGOSSP.STATE.FIRST 543 self.requested_mechtypes = None 544 self.req_flags = req_flags 545 self.first_choice = True 546 self.negotiated_mechtype = None 547 self.sub_context = None 548 self.ssp = None 549 if force_supported_mechtypes is None: 550 self.supported_mechtypes = [ 551 SPNEGO_MechType(oid=ASN1_OID(oid)) for oid in supported_ssps 552 ] 553 self.supported_mechtypes.sort( 554 key=lambda x: SPNEGOSSP._PREF_ORDER.index(x.oid.val) 555 ) 556 else: 557 self.supported_mechtypes = force_supported_mechtypes 558 super(SPNEGOSSP.CONTEXT, self).__init__() 559 560 # Passthrough attributes and functions 561 562 def clifailure(self): 563 self.sub_context.clifailure() 564 565 def __getattr__(self, attr): 566 try: 567 return object.__getattribute__(self, attr) 568 except AttributeError: 569 return getattr(self.sub_context, attr) 570 571 def __setattr__(self, attr, val): 572 try: 573 return object.__setattr__(self, attr, val) 574 except AttributeError: 575 return setattr(self.sub_context, attr, val) 576 577 # Passthrough the flags property 578 579 @property 580 def flags(self): 581 if self.sub_context: 582 return self.sub_context.flags 583 return GSS_C_FLAGS(0) 584 585 @flags.setter 586 def flags(self, x): 587 if not self.sub_context: 588 return 589 self.sub_context.flags = x 590 591 def __repr__(self): 592 return "SPNEGOSSP[%s]" % repr(self.sub_context) 593 594 _MECH_ALIASES = { 595 # Kerberos has 2 ssps 596 "1.2.840.48018.1.2.2": "1.2.840.113554.1.2.2", 597 "1.2.840.113554.1.2.2": "1.2.840.48018.1.2.2", 598 } 599 600 # This is the order Windows chooses. We mimic it for plausibility 601 _PREF_ORDER = [ 602 "1.2.840.48018.1.2.2", # MS KRB5 603 "1.2.840.113554.1.2.2", # Kerberos 5 604 "1.3.6.1.4.1.311.2.2.30", # NEGOEX 605 "1.3.6.1.4.1.311.2.2.10", # NTLM 606 ] 607 608 def __init__(self, ssps, **kwargs): 609 self.supported_ssps = {x.oid: x for x in ssps} 610 # Apply MechTypes aliases 611 for ssp in ssps: 612 if ssp.oid in self._MECH_ALIASES: 613 self.supported_ssps[self._MECH_ALIASES[ssp.oid]] = self.supported_ssps[ 614 ssp.oid 615 ] 616 self.force_supported_mechtypes = kwargs.pop("force_supported_mechtypes", None) 617 super(SPNEGOSSP, self).__init__(**kwargs) 618 619 def _extract_gssapi(self, Context, x): 620 status, otherMIC, rawToken = None, None, False 621 # Extract values from GSSAPI 622 if isinstance(x, GSSAPI_BLOB): 623 x = x.innerToken 624 if isinstance(x, SPNEGO_negToken): 625 x = x.token 626 if hasattr(x, "mechTypes"): 627 Context.requested_mechtypes = x.mechTypes 628 Context.negotiated_mechtype = None 629 if hasattr(x, "supportedMech") and x.supportedMech is not None: 630 Context.negotiated_mechtype = x.supportedMech 631 if hasattr(x, "mechListMIC") and x.mechListMIC: 632 otherMIC = GSSAPI_BLOB_SIGNATURE(x.mechListMIC.value.val) 633 if hasattr(x, "_mechListMIC") and x._mechListMIC: 634 otherMIC = GSSAPI_BLOB_SIGNATURE(x._mechListMIC.value.val) 635 if hasattr(x, "negResult"): 636 status = x.negResult 637 try: 638 x = x.mechToken 639 except AttributeError: 640 try: 641 x = x.responseToken 642 except AttributeError: 643 # No GSSAPI wrapper (windows fallback). Remember this for answer 644 rawToken = True 645 if isinstance(x, SPNEGO_Token): 646 x = x.value 647 if Context.requested_mechtypes: 648 try: 649 cls = _mechDissector[ 650 ( 651 Context.negotiated_mechtype or Context.requested_mechtypes[0] 652 ).oid.val # noqa: E501 653 ] 654 except KeyError: 655 cls = conf.raw_layer 656 if isinstance(x, ASN1_STRING): 657 x = cls(x.val) 658 elif isinstance(x, conf.raw_layer): 659 x = cls(x.load) 660 return x, status, otherMIC, rawToken 661 662 def NegTokenInit2(self): 663 """ 664 Server-Initiation of GSSAPI/SPNEGO. 665 See [MS-SPNG] sect 3.2.5.2 666 """ 667 Context = self.CONTEXT( 668 self.supported_ssps, 669 force_supported_mechtypes=self.force_supported_mechtypes, 670 ) 671 return ( 672 Context, 673 GSSAPI_BLOB( 674 innerToken=SPNEGO_negToken( 675 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 676 ) 677 ), 678 ) 679 680 # NOTE: NegoEX has an effect on how the SecurityContext is 681 # initialized, as detailed in [MS-AUTHSOD] sect 3.3.2 682 # But the format that the Exchange token uses appears not to 683 # be documented :/ 684 685 # resp.SecurityBlob.innerToken.token.mechTypes.insert( 686 # 0, 687 # # NEGOEX 688 # SPNEGO_MechType(oid="1.3.6.1.4.1.311.2.2.30"), 689 # ) 690 # resp.SecurityBlob.innerToken.token.mechToken = SPNEGO_Token( 691 # value=negoex_token 692 # ) # noqa: E501 693 694 def GSS_WrapEx(self, Context, *args, **kwargs): 695 # Passthrough 696 return Context.ssp.GSS_WrapEx(Context.sub_context, *args, **kwargs) 697 698 def GSS_UnwrapEx(self, Context, *args, **kwargs): 699 # Passthrough 700 return Context.ssp.GSS_UnwrapEx(Context.sub_context, *args, **kwargs) 701 702 def GSS_GetMICEx(self, Context, *args, **kwargs): 703 # Passthrough 704 return Context.ssp.GSS_GetMICEx(Context.sub_context, *args, **kwargs) 705 706 def GSS_VerifyMICEx(self, Context, *args, **kwargs): 707 # Passthrough 708 return Context.ssp.GSS_VerifyMICEx(Context.sub_context, *args, **kwargs) 709 710 def LegsAmount(self, Context: CONTEXT): 711 return 4 712 713 def _common_spnego_handler(self, Context, IsClient, val=None, req_flags=None): 714 if Context is None: 715 # New Context 716 Context = SPNEGOSSP.CONTEXT( 717 self.supported_ssps, 718 req_flags=req_flags, 719 force_supported_mechtypes=self.force_supported_mechtypes, 720 ) 721 if IsClient: 722 Context.requested_mechtypes = Context.supported_mechtypes 723 724 # Extract values from GSSAPI token 725 status, MIC, otherMIC, rawToken = 0, None, None, False 726 if val: 727 val, status, otherMIC, rawToken = self._extract_gssapi(Context, val) 728 729 # If we don't have a SSP already negotiated, check for requested and available 730 # SSPs and find a common one. 731 if Context.ssp is None: 732 if Context.negotiated_mechtype is None: 733 if Context.requested_mechtypes: 734 # Find a common SSP 735 try: 736 Context.negotiated_mechtype = next( 737 x 738 for x in Context.requested_mechtypes 739 if x in Context.supported_mechtypes 740 ) 741 except StopIteration: 742 # no common mechanisms 743 raise ValueError("No common SSP mechanisms !") 744 # Check whether the selected SSP was the one preferred by the client 745 if ( 746 Context.negotiated_mechtype != Context.requested_mechtypes[0] 747 and val 748 ): 749 Context.first_choice = False 750 # No SSPs were requested. Use the first available SSP we know. 751 elif Context.supported_mechtypes: 752 Context.negotiated_mechtype = Context.supported_mechtypes[0] 753 else: 754 raise ValueError("Can't figure out what SSP to use") 755 # Set Context.ssp to the object matching the chosen SSP type. 756 Context.ssp = self.supported_ssps[Context.negotiated_mechtype.oid.val] 757 758 if not Context.first_choice: 759 # The currently provided token is not for this SSP ! 760 # Typically a client opportunistically starts with Kerberos, including 761 # its APREQ, and we want to use NTLM. We add one round trip 762 Context.state = SPNEGOSSP.STATE.FIRST 763 Context.first_choice = True # reset to not come here again. 764 tok, status = None, GSS_S_CONTINUE_NEEDED 765 else: 766 # The currently provided token is for this SSP ! 767 # Pass it to the sub ssp, with its own context 768 if IsClient: 769 ( 770 Context.sub_context, 771 tok, 772 status, 773 ) = Context.ssp.GSS_Init_sec_context( 774 Context.sub_context, 775 val=val, 776 req_flags=Context.req_flags, 777 ) 778 else: 779 Context.sub_context, tok, status = Context.ssp.GSS_Accept_sec_context( 780 Context.sub_context, val=val 781 ) 782 # Check whether client or server says the specified mechanism is not valid 783 if status == GSS_S_BAD_MECH: 784 # Mechanism is not usable. Typically the Kerberos SPN is wrong 785 to_remove = [Context.negotiated_mechtype.oid.val] 786 # If there's an alias (for the multiple kerberos oids, also include it) 787 if Context.negotiated_mechtype.oid.val in SPNEGOSSP._MECH_ALIASES: 788 to_remove.append( 789 SPNEGOSSP._MECH_ALIASES[Context.negotiated_mechtype.oid.val] 790 ) 791 # Drop those unusable mechanisms from the supported list 792 for x in list(Context.supported_mechtypes): 793 if x.oid.val in to_remove: 794 Context.supported_mechtypes.remove(x) 795 # Re-calculate negotiated mechtype 796 try: 797 Context.negotiated_mechtype = next( 798 x 799 for x in Context.requested_mechtypes 800 if x in Context.supported_mechtypes 801 ) 802 except StopIteration: 803 # no common mechanisms 804 raise ValueError("No common SSP mechanisms after GSS_S_BAD_MECH !") 805 # Start again. 806 Context.state = SPNEGOSSP.STATE.CHANGESSP 807 Context.ssp = None # Reset the SSP 808 Context.sub_context = None # Reset the SSP context 809 if IsClient: 810 # Call ourselves again for the client to generate a token 811 return self._common_spnego_handler(Context, True, None) 812 else: 813 # Return nothing but the supported SSP list 814 tok, status = None, GSS_S_CONTINUE_NEEDED 815 816 if rawToken: 817 # No GSSAPI wrapper (fallback) 818 return Context, tok, status 819 820 # Client success 821 if IsClient and tok is None and status == GSS_S_COMPLETE: 822 return Context, None, status 823 824 # Map GSSAPI codes to SPNEGO 825 if status == GSS_S_COMPLETE: 826 negResult = 0 # accept_completed 827 elif status == GSS_S_CONTINUE_NEEDED: 828 negResult = 1 # accept_incomplete 829 else: 830 negResult = 2 # reject 831 832 # GSSAPI-MIC 833 if Context.ssp and Context.ssp.canMechListMIC(Context.sub_context): 834 # The documentation on mechListMIC wasn't clear, so note that: 835 # - The mechListMIC that the client sends is computed over the 836 # list of mechanisms that it requests. 837 # - the mechListMIC that the server sends is computed over the 838 # list of mechanisms that the client requested. 839 # Yes, this does indeed mean that NegTokenInit2 added by [MS-SPNG] 840 # is NOT protected. That's not necessarily an issue, since it was 841 # optional in most cases, but it's something to keep in mind. 842 if otherMIC is not None: 843 # Check the received MIC if any 844 if IsClient: # from server 845 Context.ssp.verifyMechListMIC( 846 Context, 847 otherMIC, 848 mechListMIC(Context.supported_mechtypes), 849 ) 850 else: # from client 851 Context.ssp.verifyMechListMIC( 852 Context, 853 otherMIC, 854 mechListMIC(Context.requested_mechtypes), 855 ) 856 # Then build our own MIC 857 if IsClient: # client 858 if negResult == 0: 859 # Include MIC for the last packet. We could add a check 860 # here to only send the MIC when required (when preferred ssp 861 # isn't chosen) 862 MIC = Context.ssp.getMechListMIC( 863 Context, 864 mechListMIC(Context.supported_mechtypes), 865 ) 866 else: # server 867 MIC = Context.ssp.getMechListMIC( 868 Context, 869 mechListMIC(Context.requested_mechtypes), 870 ) 871 872 if IsClient: 873 if Context.state == SPNEGOSSP.STATE.FIRST: 874 # First client token 875 spnego_tok = SPNEGO_negToken( 876 token=SPNEGO_negTokenInit(mechTypes=Context.supported_mechtypes) 877 ) 878 if tok: 879 spnego_tok.token.mechToken = SPNEGO_Token(value=tok) 880 else: 881 # Subsequent client tokens 882 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 883 token=SPNEGO_negTokenResp( 884 supportedMech=None, 885 negResult=None, 886 ) 887 ) 888 if tok: 889 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 890 if Context.state == SPNEGOSSP.STATE.CHANGESSP: 891 # On renegotiation, include the negResult and chosen mechanism 892 spnego_tok.token.negResult = negResult 893 spnego_tok.token.supportedMech = Context.negotiated_mechtype 894 else: 895 spnego_tok = SPNEGO_negToken( # GSSAPI_BLOB is stripped 896 token=SPNEGO_negTokenResp( 897 supportedMech=None, 898 negResult=negResult, 899 ) 900 ) 901 if Context.state in [SPNEGOSSP.STATE.FIRST, SPNEGOSSP.STATE.CHANGESSP]: 902 # Include the supportedMech list if this is the first thing we do 903 # or a renegotiation. 904 spnego_tok.token.supportedMech = Context.negotiated_mechtype 905 if tok: 906 spnego_tok.token.responseToken = SPNEGO_Token(value=tok) 907 # Apply MIC if available 908 if MIC: 909 spnego_tok.token.mechListMIC = SPNEGO_MechListMIC( 910 value=ASN1_STRING(MIC), 911 ) 912 if ( 913 IsClient and Context.state == SPNEGOSSP.STATE.FIRST 914 ): # Client: after the first packet, specifying 'SPNEGO' is implicit. 915 # Always implicit for the server. 916 spnego_tok = GSSAPI_BLOB(innerToken=spnego_tok) 917 # Not the first token anymore 918 Context.state = SPNEGOSSP.STATE.NORMAL 919 return Context, spnego_tok, status 920 921 def GSS_Init_sec_context( 922 self, Context: CONTEXT, val=None, req_flags: Optional[GSS_C_FLAGS] = None 923 ): 924 return self._common_spnego_handler(Context, True, val=val, req_flags=req_flags) 925 926 def GSS_Accept_sec_context(self, Context: CONTEXT, val=None): 927 return self._common_spnego_handler(Context, False, val=val, req_flags=0) 928 929 def GSS_Passive(self, Context: CONTEXT, val=None): 930 if Context is None: 931 # New Context 932 Context = SPNEGOSSP.CONTEXT(self.supported_ssps) 933 Context.passive = True 934 935 # Extraction 936 val, status, _, rawToken = self._extract_gssapi(Context, val) 937 938 if val is None and status == GSS_S_COMPLETE: 939 return Context, None 940 941 # Just get the negotiated SSP 942 if Context.negotiated_mechtype: 943 mechtype = Context.negotiated_mechtype 944 elif Context.requested_mechtypes: 945 mechtype = Context.requested_mechtypes[0] 946 elif rawToken and Context.supported_mechtypes: 947 mechtype = Context.supported_mechtypes[0] 948 else: 949 return None, GSS_S_BAD_MECH 950 try: 951 ssp = self.supported_ssps[mechtype.oid.val] 952 except KeyError: 953 return None, GSS_S_BAD_MECH 954 955 if Context.ssp is not None: 956 # Detect resets 957 if Context.ssp != ssp: 958 Context.ssp = ssp 959 Context.sub_context = None 960 else: 961 Context.ssp = ssp 962 963 # Passthrough 964 Context.sub_context, status = Context.ssp.GSS_Passive(Context.sub_context, val) 965 966 return Context, status 967 968 def GSS_Passive_set_Direction(self, Context: CONTEXT, IsAcceptor=False): 969 Context.ssp.GSS_Passive_set_Direction( 970 Context.sub_context, IsAcceptor=IsAcceptor 971 ) 972 973 def MaximumSignatureLength(self, Context: CONTEXT): 974 return Context.ssp.MaximumSignatureLength(Context.sub_context) 975