1# SPDX-License-Identifier: GPL-2.0-or-later 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Gabriel Potter 5 6""" 7EPT map (EndPoinT mapper) 8""" 9 10import uuid 11 12from scapy.config import conf 13from scapy.fields import ( 14 ByteEnumField, 15 ConditionalField, 16 FieldLenField, 17 IPField, 18 LEShortField, 19 MultipleTypeField, 20 PacketListField, 21 ShortField, 22 StrLenField, 23 UUIDEnumField, 24) 25from scapy.packet import Packet 26from scapy.layers.dcerpc import ( 27 DCE_RPC_INTERFACES_NAMES, 28 DCE_RPC_INTERFACES_NAMES_rev, 29 DCE_RPC_TRANSFER_SYNTAXES, 30) 31 32from scapy.layers.msrpce.raw.ept import * # noqa: F401, F403 33 34 35# [C706] Appendix L 36 37# "For historical reasons, this cannot be done using the standard 38# NDR encoding rules for marshalling and unmarshalling. 39# A special encoding is required." - Appendix L 40 41 42class octet_string_t(Packet): 43 fields_desc = [ 44 FieldLenField("count", None, fmt="<H", length_of="value"), 45 StrLenField("value", b"", length_from=lambda pkt: pkt.count), 46 ] 47 48 def default_payload_class(self, _): 49 return conf.padding_layer 50 51 52def _uuid_res(x): 53 # Look in both DCE_RPC_INTERFACES_NAMES and DCE_RPC_TRANSFER_SYNTAXES 54 dct = DCE_RPC_INTERFACES_NAMES.copy() 55 dct.update(DCE_RPC_TRANSFER_SYNTAXES) 56 return dct.get(x) 57 58 59def _uuid_res_rev(x): 60 # Same but reversed 61 dct = DCE_RPC_INTERFACES_NAMES_rev.copy() 62 dct.update({v: k for k, v in DCE_RPC_TRANSFER_SYNTAXES.items()}) 63 return dct.get(x) 64 65 66class prot_and_addr_t(Packet): 67 fields_desc = [ 68 # --- LHS 69 LEShortField( 70 "lhs_length", 71 0, 72 ), 73 # [C706] Appendix I with names from Appendix B 74 ByteEnumField( 75 "protocol_identifier", 76 0, 77 { 78 0x0: "OSI OID", # Special 79 0x0D: "UUID", # Special 80 # Transports 81 # 0x2: "DNA Session Control", 82 # 0x3: "DNA Session Control V3", 83 # 0x4: "DNA NSP Transport", 84 # 0x5: "OSI TP4", 85 0x06: "NCADG_OSI_CLSN", # [C706] 86 0x07: "NCACN_IP_TCP", # [C706] 87 0x08: "NCADG_IP_UDP", # [C706] 88 0x09: "IP", # [C706] 89 0x0A: "RPC connectionless protocol", # [C706] 90 0x0B: "RPC connection-oriented protocol", # [C706] 91 0x0C: "NCALRPC", 92 0x0F: "NCACN_NP", # [MS-RPCE] 93 0x11: "NCACN_NB", # [C706] 94 0x12: "NCACN_NB_NB", # [MS-RPCE] 95 0x13: "NCACN_SPX", # [C706] 96 0x14: "NCADG_IPX", # [C706] 97 0x16: "NCACN_AT_DSP", # [C706] 98 0x17: "NCADG_AT_DSP", # [C706] 99 0x19: "NCADG_NB", # [C706] 100 0x1A: "NCACN_VNS_SPP", # [C706] 101 0x1B: "NCADG_VNS_IPC", # [C706] 102 0x1F: "NCACN_HTTP", # [MS-RPCE] 103 }, 104 ), 105 # 0x0 106 ConditionalField( 107 StrLenField("oid", "", length_from=lambda pkt: pkt.lhs_length - 1), 108 lambda pkt: pkt.protocol_identifier == 0x0, 109 ), 110 # 0xD 111 ConditionalField( 112 UUIDEnumField( 113 "uuid", 114 uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860"), 115 ( 116 # Those are dynamic 117 _uuid_res, 118 _uuid_res_rev, 119 ), 120 uuid_fmt=UUIDEnumField.FORMAT_LE, 121 ), 122 lambda pkt: pkt.protocol_identifier == 0xD, 123 ), 124 ConditionalField( 125 LEShortField("version", 0), lambda pkt: pkt.protocol_identifier == 0xD 126 ), 127 # Other 128 ConditionalField( 129 StrLenField("lhs", "", length_from=lambda pkt: pkt.lhs_length - 1), 130 lambda pkt: pkt.protocol_identifier not in [0x0, 0x7, 0xD], 131 ), 132 # --- RHS 133 LEShortField( 134 "rhs_length", 135 None, 136 ), 137 MultipleTypeField( 138 [ 139 ( 140 # (big-endian) 141 ShortField("rhs", 0), 142 lambda pkt: pkt.protocol_identifier in [0x7, 0x8, 0x1F], 143 "port", 144 ), 145 ( 146 # (big-endian) 147 IPField("rhs", 0), 148 lambda pkt: pkt.protocol_identifier == 0x9, 149 "addr", 150 ), 151 ( 152 LEShortField("rhs", 5), 153 lambda pkt: pkt.protocol_identifier in [0xA, 0xB, 0xD], 154 "minor version", 155 ), 156 ( 157 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 158 lambda pkt: pkt.protocol_identifier == 0xF, 159 "named pipe", 160 ), 161 ( 162 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 163 lambda pkt: pkt.protocol_identifier == 0x11, 164 "netbios name", 165 ), 166 ], 167 StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length), 168 ), 169 ] 170 171 def default_payload_class(self, _): 172 return conf.padding_layer 173 174 175class protocol_tower_t(Packet): 176 fields_desc = [ 177 FieldLenField("count", None, fmt="<H", count_of="floors"), 178 PacketListField( 179 "floors", 180 [prot_and_addr_t()], 181 prot_and_addr_t, 182 count_from=lambda pkt: pkt.count, 183 ), 184 ] 185 186 def _summary(self): 187 if len(self.floors) < 4: 188 raise ValueError("Malformed protocol_tower_t (not enough floors)") 189 if self.floors[0].protocol_identifier != 0xD: 190 raise ValueError("Malformed protocol_tower_t (bad floor 1)") 191 if self.floors[1].protocol_identifier != 0xD: 192 raise ValueError("Malformed protocol_tower_t (bad floor 2)") 193 if self.floors[2].protocol_identifier in [0xA, 0xB]: # Connection oriented/less 194 endpoint = "%s:%s" % ( 195 self.floors[3].sprintf("%protocol_identifier%"), 196 ":".join( 197 x.rhs.decode() if isinstance(x.rhs, bytes) else str(x.rhs) 198 for x in self.floors[3:][::-1] 199 ), 200 ) 201 elif self.floors[2].protocol_identifier == 0xC: # NCALRPC 202 endpoint = "%s:%s" % ( 203 self.floors[2].sprintf("%protocol_identifier%"), 204 self.floors[3].rhs.decode(), 205 ) 206 else: 207 raise ValueError( 208 "Unknown RPC transport: %s" % self.floors[2].protocol_identifier 209 ) 210 return ( 211 self.floors[0].sprintf("%uuid% (%version%.%r,rhs%)"), 212 endpoint, 213 ) 214 215 def mysummary(self): 216 try: 217 return "%s %s" % self._summary() 218 except ValueError as ex: 219 return str(ex) 220