1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7NetBIOS over TCP/IP 8 9[RFC 1001/1002] 10""" 11 12import struct 13from scapy.arch import get_if_addr 14from scapy.base_classes import Net 15from scapy.ansmachine import AnsweringMachine 16from scapy.compat import bytes_encode 17from scapy.config import conf 18 19from scapy.packet import Packet, bind_bottom_up, bind_layers, bind_top_down 20from scapy.fields import ( 21 BitEnumField, 22 BitField, 23 ByteEnumField, 24 ByteField, 25 FieldLenField, 26 FlagsField, 27 IPField, 28 IntField, 29 NetBIOSNameField, 30 PacketListField, 31 ShortEnumField, 32 ShortField, 33 StrFixedLenField, 34 XShortField, 35 XStrFixedLenField 36) 37from scapy.layers.inet import IP, UDP, TCP 38from scapy.layers.l2 import Ether, SourceMACField 39 40 41class NetBIOS_DS(Packet): 42 name = "NetBIOS datagram service" 43 fields_desc = [ 44 ByteEnumField("type", 17, {17: "direct_group"}), 45 ByteField("flags", 0), 46 XShortField("id", 0), 47 IPField("src", "127.0.0.1"), 48 ShortField("sport", 138), 49 ShortField("len", None), 50 ShortField("ofs", 0), 51 NetBIOSNameField("srcname", ""), 52 NetBIOSNameField("dstname", ""), 53 ] 54 55 def post_build(self, p, pay): 56 p += pay 57 if self.len is None: 58 tmp_len = len(p) - 14 59 p = p[:10] + struct.pack("!H", tmp_len) + p[12:] 60 return p 61 62# ShortField("length",0), 63# ShortField("Delimiter",0), 64# ByteField("command",0), 65# ByteField("data1",0), 66# ShortField("data2",0), 67# ShortField("XMIt",0), 68# ShortField("RSPCor",0), 69# StrFixedLenField("dest","",16), 70# StrFixedLenField("source","",16), 71# 72# ] 73# 74 75# NetBIOS 76 77 78_NETBIOS_SUFFIXES = { 79 0x4141: "workstation", 80 0x4141 + 0x03: "messenger service", 81 0x4141 + 0x200: "file server service", 82 0x4141 + 0x10b: "domain master browser", 83 0x4141 + 0x10c: "domain controller", 84 0x4141 + 0x10e: "browser election service" 85} 86 87_NETBIOS_QRTYPES = { 88 0x20: "NB", 89 0x21: "NBSTAT" 90} 91 92_NETBIOS_QRCLASS = { 93 1: "INTERNET" 94} 95 96_NETBIOS_RNAMES = { 97 0xC00C: "Label String Pointer to QUESTION_NAME" 98} 99 100_NETBIOS_OWNER_MODE_TYPES = { 101 0: "B node", 102 1: "P node", 103 2: "M node", 104 3: "H node" 105} 106 107_NETBIOS_GNAMES = { 108 0: "Unique name", 109 1: "Group name" 110} 111 112 113class NBNSHeader(Packet): 114 name = "NBNS Header" 115 fields_desc = [ 116 ShortField("NAME_TRN_ID", 0), 117 BitField("RESPONSE", 0, 1), 118 BitField("OPCODE", 0, 4), 119 FlagsField("NM_FLAGS", 0, 7, ["B", 120 "res1", 121 "res0", 122 "RA", 123 "RD", 124 "TC", 125 "AA"]), 126 BitField("RCODE", 0, 4), 127 ShortField("QDCOUNT", 0), 128 ShortField("ANCOUNT", 0), 129 ShortField("NSCOUNT", 0), 130 ShortField("ARCOUNT", 0), 131 ] 132 133 def hashret(self): 134 return b"NBNS" + struct.pack("!B", self.OPCODE) 135 136 137# Name Query Request 138# RFC1002 sect 4.2.12 139 140 141class NBNSQueryRequest(Packet): 142 name = "NBNS query request" 143 fields_desc = [NetBIOSNameField("QUESTION_NAME", "windows"), 144 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 145 ByteField("NULL", 0), 146 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 147 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS)] 148 149 def mysummary(self): 150 return "NBNSQueryRequest who has '\\\\%s'" % ( 151 self.QUESTION_NAME.decode(errors="backslashreplace") 152 ) 153 154 155bind_layers(NBNSHeader, NBNSQueryRequest, 156 OPCODE=0x0, NM_FLAGS=0x11, QDCOUNT=1) 157 158 159# Name Query Response 160# RFC1002 sect 4.2.13 161 162 163class NBNS_ADD_ENTRY(Packet): 164 fields_desc = [ 165 BitEnumField("G", 0, 1, _NETBIOS_GNAMES), 166 BitEnumField("OWNER_NODE_TYPE", 00, 2, 167 _NETBIOS_OWNER_MODE_TYPES), 168 BitEnumField("UNUSED", 0, 13, {0: "Unused"}), 169 IPField("NB_ADDRESS", "127.0.0.1") 170 ] 171 172 173class NBNSQueryResponse(Packet): 174 name = "NBNS query response" 175 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 176 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 177 ByteField("NULL", 0), 178 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 179 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS), 180 IntField("TTL", 0x493e0), 181 FieldLenField("RDLENGTH", None, length_of="ADDR_ENTRY"), 182 PacketListField("ADDR_ENTRY", 183 [NBNS_ADD_ENTRY()], NBNS_ADD_ENTRY, 184 length_from=lambda pkt: pkt.RDLENGTH) 185 ] 186 187 def mysummary(self): 188 if not self.ADDR_ENTRY: 189 return "NBNSQueryResponse" 190 return "NBNSQueryResponse '\\\\%s' is at %s" % ( 191 self.RR_NAME.decode(errors="backslashreplace"), 192 self.ADDR_ENTRY[0].NB_ADDRESS 193 ) 194 195 def answers(self, other): 196 return ( 197 isinstance(other, NBNSQueryRequest) and 198 other.QUESTION_NAME == self.RR_NAME 199 ) 200 201 202bind_layers(NBNSHeader, NBNSQueryResponse, # RD+AA 203 OPCODE=0x0, NM_FLAGS=0x50, RESPONSE=1, ANCOUNT=1) 204for _flg in [0x58, 0x70, 0x78]: 205 bind_bottom_up(NBNSHeader, NBNSQueryResponse, 206 OPCODE=0x0, NM_FLAGS=_flg, RESPONSE=1, ANCOUNT=1) 207 208 209# Node Status Request 210# RFC1002 sect 4.2.17 211 212class NBNSNodeStatusRequest(NBNSQueryRequest): 213 name = "NBNS status request" 214 QUESTION_NAME = b"*" + b"\x00" * 14 215 QUESTION_TYPE = 0x21 216 217 def mysummary(self): 218 return "NBNSNodeStatusRequest who has '\\\\%s'" % ( 219 self.QUESTION_NAME.decode(errors="backslashreplace") 220 ) 221 222 223bind_bottom_up(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=0, QDCOUNT=1) 224bind_layers(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=1, QDCOUNT=1) 225 226 227# Node Status Response 228# RFC1002 sect 4.2.18 229 230class NBNSNodeStatusResponseService(Packet): 231 name = "NBNS Node Status Response Service" 232 fields_desc = [StrFixedLenField("NETBIOS_NAME", "WINDOWS ", 15), 233 ByteEnumField("SUFFIX", 0, {0: "workstation", 234 0x03: "messenger service", 235 0x20: "file server service", 236 0x1b: "domain master browser", 237 0x1c: "domain controller", 238 0x1e: "browser election service" 239 }), 240 ByteField("NAME_FLAGS", 0x4), 241 ByteEnumField("UNUSED", 0, {0: "unused"})] 242 243 def default_payload_class(self, payload): 244 return conf.padding_layer 245 246 247class NBNSNodeStatusResponse(Packet): 248 name = "NBNS Node Status Response" 249 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 250 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 251 ByteField("NULL", 0), 252 ShortEnumField("RR_TYPE", 0x21, _NETBIOS_QRTYPES), 253 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 254 IntField("TTL", 0), 255 ShortField("RDLENGTH", 83), 256 FieldLenField("NUM_NAMES", None, fmt="B", 257 count_of="NODE_NAME"), 258 PacketListField("NODE_NAME", 259 [NBNSNodeStatusResponseService()], 260 NBNSNodeStatusResponseService, 261 count_from=lambda pkt: pkt.NUM_NAMES), 262 SourceMACField("MAC_ADDRESS"), 263 XStrFixedLenField("STATISTICS", b"", 46)] 264 265 def answers(self, other): 266 return ( 267 isinstance(other, NBNSNodeStatusRequest) and 268 other.QUESTION_NAME == self.RR_NAME 269 ) 270 271 272bind_layers(NBNSHeader, NBNSNodeStatusResponse, 273 OPCODE=0x0, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1) 274 275 276# Name Registration Request 277# RFC1002 sect 4.2.2 278 279class NBNSRegistrationRequest(Packet): 280 name = "NBNS registration request" 281 fields_desc = [ 282 NetBIOSNameField("QUESTION_NAME", "Windows"), 283 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 284 ByteField("NULL", 0), 285 ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES), 286 ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS), 287 ShortEnumField("RR_NAME", 0xC00C, _NETBIOS_RNAMES), 288 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES), 289 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 290 IntField("TTL", 0), 291 ShortField("RDLENGTH", 6), 292 BitEnumField("G", 0, 1, _NETBIOS_GNAMES), 293 BitEnumField("OWNER_NODE_TYPE", 00, 2, 294 _NETBIOS_OWNER_MODE_TYPES), 295 BitEnumField("UNUSED", 0, 13, {0: "Unused"}), 296 IPField("NB_ADDRESS", "127.0.0.1") 297 ] 298 299 def mysummary(self): 300 return self.sprintf("Register %G% %QUESTION_NAME% at %NB_ADDRESS%") 301 302 303bind_bottom_up(NBNSHeader, NBNSRegistrationRequest, OPCODE=0x5) 304bind_layers(NBNSHeader, NBNSRegistrationRequest, 305 OPCODE=0x5, NM_FLAGS=0x11, QDCOUNT=1, ARCOUNT=1) 306 307 308# Wait for Acknowledgement Response 309# RFC1002 sect 4.2.16 310 311class NBNSWackResponse(Packet): 312 name = "NBNS Wait for Acknowledgement Response" 313 fields_desc = [NetBIOSNameField("RR_NAME", "windows"), 314 ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES), 315 ByteField("NULL", 0), 316 ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES), 317 ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS), 318 IntField("TTL", 2), 319 ShortField("RDLENGTH", 2), 320 BitField("RDATA", 10512, 16)] # 10512=0010100100010000 321 322 323bind_layers(NBNSHeader, NBNSWackResponse, 324 OPCODE=0x7, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1) 325 326# NetBIOS DATAGRAM HEADER 327 328 329class NBTDatagram(Packet): 330 name = "NBT Datagram Packet" 331 fields_desc = [ByteField("Type", 0x10), 332 ByteField("Flags", 0x02), 333 ShortField("ID", 0), 334 IPField("SourceIP", "127.0.0.1"), 335 ShortField("SourcePort", 138), 336 ShortField("Length", None), 337 ShortField("Offset", 0), 338 NetBIOSNameField("SourceName", "windows"), 339 ShortEnumField("SUFFIX1", 0x4141, _NETBIOS_SUFFIXES), 340 ByteField("NULL1", 0), 341 NetBIOSNameField("DestinationName", "windows"), 342 ShortEnumField("SUFFIX2", 0x4141, _NETBIOS_SUFFIXES), 343 ByteField("NULL2", 0)] 344 345 def post_build(self, pkt, pay): 346 if self.Length is None: 347 length = len(pay) + 68 348 pkt = pkt[:10] + struct.pack("!H", length) + pkt[12:] 349 return pkt + pay 350 351 352# SESSION SERVICE PACKETS 353 354 355class NBTSession(Packet): 356 name = "NBT Session Packet" 357 MAXLENGTH = 0x3ffff 358 fields_desc = [ByteEnumField("TYPE", 0, {0x00: "Session Message", 359 0x81: "Session Request", 360 0x82: "Positive Session Response", 361 0x83: "Negative Session Response", 362 0x84: "Retarget Session Response", 363 0x85: "Session Keepalive"}), 364 BitField("RESERVED", 0x00, 7), 365 BitField("LENGTH", None, 17)] 366 367 def post_build(self, pkt, pay): 368 if self.LENGTH is None: 369 length = len(pay) & self.MAXLENGTH 370 pkt = pkt[:1] + struct.pack("!I", length)[1:] 371 return pkt + pay 372 373 def extract_padding(self, s): 374 return s[:self.LENGTH], s[self.LENGTH:] 375 376 @classmethod 377 def tcp_reassemble(cls, data, *args, **kwargs): 378 if len(data) < 4: 379 return None 380 length = struct.unpack("!I", data[:4])[0] & cls.MAXLENGTH 381 if len(data) >= length + 4: 382 return cls(data) 383 384 385bind_bottom_up(UDP, NBNSHeader, dport=137) 386bind_bottom_up(UDP, NBNSHeader, sport=137) 387bind_top_down(UDP, NBNSHeader, sport=137, dport=137) 388 389bind_bottom_up(UDP, NBTDatagram, dport=138) 390bind_bottom_up(UDP, NBTDatagram, sport=138) 391bind_top_down(UDP, NBTDatagram, sport=138, dport=138) 392 393bind_bottom_up(TCP, NBTSession, dport=445) 394bind_bottom_up(TCP, NBTSession, sport=445) 395bind_bottom_up(TCP, NBTSession, dport=139) 396bind_bottom_up(TCP, NBTSession, sport=139) 397bind_layers(TCP, NBTSession, dport=139, sport=139) 398 399 400class NBNS_am(AnsweringMachine): 401 function_name = "nbnsd" 402 filter = "udp port 137" 403 sniff_options = {"store": 0} 404 405 def parse_options(self, server_name=None, from_ip=None, ip=None): 406 """ 407 NBNS answering machine 408 409 :param server_name: the netbios server name to match 410 :param from_ip: an IP (can have a netmask) to filter on 411 :param ip: the IP to answer with 412 """ 413 self.ServerName = bytes_encode(server_name or "") 414 self.ip = ip 415 if isinstance(from_ip, str): 416 self.from_ip = Net(from_ip) 417 else: 418 self.from_ip = from_ip 419 420 def is_request(self, req): 421 if self.from_ip and IP in req and req[IP].src not in self.from_ip: 422 return False 423 return NBNSQueryRequest in req and ( 424 not self.ServerName or 425 req[NBNSQueryRequest].QUESTION_NAME.strip() == self.ServerName 426 ) 427 428 def make_reply(self, req): 429 # type: (Packet) -> Packet 430 resp = Ether( 431 dst=req[Ether].src, 432 src=None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst, 433 ) / IP(dst=req[IP].src) / UDP( 434 sport=req.dport, 435 dport=req.sport, 436 ) 437 address = self.ip or get_if_addr(self.optsniff.get("iface", conf.iface)) 438 resp /= NBNSHeader() / NBNSQueryResponse( 439 RR_NAME=self.ServerName or req.QUESTION_NAME, 440 SUFFIX=req.SUFFIX, 441 ADDR_ENTRY=[NBNS_ADD_ENTRY(NB_ADDRESS=address)] 442 ) 443 resp.NAME_TRN_ID = req.NAME_TRN_ID 444 return resp 445