• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7NetBIOS over TCP/IP
8
9[RFC 1001/1002]
10"""
11
12import struct
13from scapy.arch import get_if_addr
14from scapy.base_classes import Net
15from scapy.ansmachine import AnsweringMachine
16from scapy.compat import bytes_encode
17from scapy.config import conf
18
19from scapy.packet import Packet, bind_bottom_up, bind_layers, bind_top_down
20from scapy.fields import (
21    BitEnumField,
22    BitField,
23    ByteEnumField,
24    ByteField,
25    FieldLenField,
26    FlagsField,
27    IPField,
28    IntField,
29    NetBIOSNameField,
30    PacketListField,
31    ShortEnumField,
32    ShortField,
33    StrFixedLenField,
34    XShortField,
35    XStrFixedLenField
36)
37from scapy.layers.inet import IP, UDP, TCP
38from scapy.layers.l2 import Ether, SourceMACField
39
40
41class NetBIOS_DS(Packet):
42    name = "NetBIOS datagram service"
43    fields_desc = [
44        ByteEnumField("type", 17, {17: "direct_group"}),
45        ByteField("flags", 0),
46        XShortField("id", 0),
47        IPField("src", "127.0.0.1"),
48        ShortField("sport", 138),
49        ShortField("len", None),
50        ShortField("ofs", 0),
51        NetBIOSNameField("srcname", ""),
52        NetBIOSNameField("dstname", ""),
53    ]
54
55    def post_build(self, p, pay):
56        p += pay
57        if self.len is None:
58            tmp_len = len(p) - 14
59            p = p[:10] + struct.pack("!H", tmp_len) + p[12:]
60        return p
61
62#        ShortField("length",0),
63#        ShortField("Delimiter",0),
64#        ByteField("command",0),
65#        ByteField("data1",0),
66#        ShortField("data2",0),
67#        ShortField("XMIt",0),
68#        ShortField("RSPCor",0),
69#        StrFixedLenField("dest","",16),
70#        StrFixedLenField("source","",16),
71#
72#        ]
73#
74
75# NetBIOS
76
77
78_NETBIOS_SUFFIXES = {
79    0x4141: "workstation",
80    0x4141 + 0x03: "messenger service",
81    0x4141 + 0x200: "file server service",
82    0x4141 + 0x10b: "domain master browser",
83    0x4141 + 0x10c: "domain controller",
84    0x4141 + 0x10e: "browser election service"
85}
86
87_NETBIOS_QRTYPES = {
88    0x20: "NB",
89    0x21: "NBSTAT"
90}
91
92_NETBIOS_QRCLASS = {
93    1: "INTERNET"
94}
95
96_NETBIOS_RNAMES = {
97    0xC00C: "Label String Pointer to QUESTION_NAME"
98}
99
100_NETBIOS_OWNER_MODE_TYPES = {
101    0: "B node",
102    1: "P node",
103    2: "M node",
104    3: "H node"
105}
106
107_NETBIOS_GNAMES = {
108    0: "Unique name",
109    1: "Group name"
110}
111
112
113class NBNSHeader(Packet):
114    name = "NBNS Header"
115    fields_desc = [
116        ShortField("NAME_TRN_ID", 0),
117        BitField("RESPONSE", 0, 1),
118        BitField("OPCODE", 0, 4),
119        FlagsField("NM_FLAGS", 0, 7, ["B",
120                                      "res1",
121                                      "res0",
122                                      "RA",
123                                      "RD",
124                                      "TC",
125                                      "AA"]),
126        BitField("RCODE", 0, 4),
127        ShortField("QDCOUNT", 0),
128        ShortField("ANCOUNT", 0),
129        ShortField("NSCOUNT", 0),
130        ShortField("ARCOUNT", 0),
131    ]
132
133    def hashret(self):
134        return b"NBNS" + struct.pack("!B", self.OPCODE)
135
136
137# Name Query Request
138# RFC1002 sect 4.2.12
139
140
141class NBNSQueryRequest(Packet):
142    name = "NBNS query request"
143    fields_desc = [NetBIOSNameField("QUESTION_NAME", "windows"),
144                   ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
145                   ByteField("NULL", 0),
146                   ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
147                   ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS)]
148
149    def mysummary(self):
150        return "NBNSQueryRequest who has '\\\\%s'" % (
151            self.QUESTION_NAME.decode(errors="backslashreplace")
152        )
153
154
155bind_layers(NBNSHeader, NBNSQueryRequest,
156            OPCODE=0x0, NM_FLAGS=0x11, QDCOUNT=1)
157
158
159# Name Query Response
160# RFC1002 sect 4.2.13
161
162
163class NBNS_ADD_ENTRY(Packet):
164    fields_desc = [
165        BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
166        BitEnumField("OWNER_NODE_TYPE", 00, 2,
167                     _NETBIOS_OWNER_MODE_TYPES),
168        BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
169        IPField("NB_ADDRESS", "127.0.0.1")
170    ]
171
172
173class NBNSQueryResponse(Packet):
174    name = "NBNS query response"
175    fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
176                   ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
177                   ByteField("NULL", 0),
178                   ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
179                   ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
180                   IntField("TTL", 0x493e0),
181                   FieldLenField("RDLENGTH", None, length_of="ADDR_ENTRY"),
182                   PacketListField("ADDR_ENTRY",
183                                   [NBNS_ADD_ENTRY()], NBNS_ADD_ENTRY,
184                                   length_from=lambda pkt: pkt.RDLENGTH)
185                   ]
186
187    def mysummary(self):
188        if not self.ADDR_ENTRY:
189            return "NBNSQueryResponse"
190        return "NBNSQueryResponse '\\\\%s' is at %s" % (
191            self.RR_NAME.decode(errors="backslashreplace"),
192            self.ADDR_ENTRY[0].NB_ADDRESS
193        )
194
195    def answers(self, other):
196        return (
197            isinstance(other, NBNSQueryRequest) and
198            other.QUESTION_NAME == self.RR_NAME
199        )
200
201
202bind_layers(NBNSHeader, NBNSQueryResponse,  # RD+AA
203            OPCODE=0x0, NM_FLAGS=0x50, RESPONSE=1, ANCOUNT=1)
204for _flg in [0x58, 0x70, 0x78]:
205    bind_bottom_up(NBNSHeader, NBNSQueryResponse,
206                   OPCODE=0x0, NM_FLAGS=_flg, RESPONSE=1, ANCOUNT=1)
207
208
209# Node Status Request
210# RFC1002 sect 4.2.17
211
212class NBNSNodeStatusRequest(NBNSQueryRequest):
213    name = "NBNS status request"
214    QUESTION_NAME = b"*" + b"\x00" * 14
215    QUESTION_TYPE = 0x21
216
217    def mysummary(self):
218        return "NBNSNodeStatusRequest who has '\\\\%s'" % (
219            self.QUESTION_NAME.decode(errors="backslashreplace")
220        )
221
222
223bind_bottom_up(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=0, QDCOUNT=1)
224bind_layers(NBNSHeader, NBNSNodeStatusRequest, OPCODE=0x0, NM_FLAGS=1, QDCOUNT=1)
225
226
227# Node Status Response
228# RFC1002 sect 4.2.18
229
230class NBNSNodeStatusResponseService(Packet):
231    name = "NBNS Node Status Response Service"
232    fields_desc = [StrFixedLenField("NETBIOS_NAME", "WINDOWS         ", 15),
233                   ByteEnumField("SUFFIX", 0, {0: "workstation",
234                                               0x03: "messenger service",
235                                               0x20: "file server service",
236                                               0x1b: "domain master browser",
237                                               0x1c: "domain controller",
238                                               0x1e: "browser election service"
239                                               }),
240                   ByteField("NAME_FLAGS", 0x4),
241                   ByteEnumField("UNUSED", 0, {0: "unused"})]
242
243    def default_payload_class(self, payload):
244        return conf.padding_layer
245
246
247class NBNSNodeStatusResponse(Packet):
248    name = "NBNS Node Status Response"
249    fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
250                   ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
251                   ByteField("NULL", 0),
252                   ShortEnumField("RR_TYPE", 0x21, _NETBIOS_QRTYPES),
253                   ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
254                   IntField("TTL", 0),
255                   ShortField("RDLENGTH", 83),
256                   FieldLenField("NUM_NAMES", None, fmt="B",
257                                 count_of="NODE_NAME"),
258                   PacketListField("NODE_NAME",
259                                   [NBNSNodeStatusResponseService()],
260                                   NBNSNodeStatusResponseService,
261                                   count_from=lambda pkt: pkt.NUM_NAMES),
262                   SourceMACField("MAC_ADDRESS"),
263                   XStrFixedLenField("STATISTICS", b"", 46)]
264
265    def answers(self, other):
266        return (
267            isinstance(other, NBNSNodeStatusRequest) and
268            other.QUESTION_NAME == self.RR_NAME
269        )
270
271
272bind_layers(NBNSHeader, NBNSNodeStatusResponse,
273            OPCODE=0x0, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
274
275
276# Name Registration Request
277# RFC1002 sect 4.2.2
278
279class NBNSRegistrationRequest(Packet):
280    name = "NBNS registration request"
281    fields_desc = [
282        NetBIOSNameField("QUESTION_NAME", "Windows"),
283        ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
284        ByteField("NULL", 0),
285        ShortEnumField("QUESTION_TYPE", 0x20, _NETBIOS_QRTYPES),
286        ShortEnumField("QUESTION_CLASS", 1, _NETBIOS_QRCLASS),
287        ShortEnumField("RR_NAME", 0xC00C, _NETBIOS_RNAMES),
288        ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
289        ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
290        IntField("TTL", 0),
291        ShortField("RDLENGTH", 6),
292        BitEnumField("G", 0, 1, _NETBIOS_GNAMES),
293        BitEnumField("OWNER_NODE_TYPE", 00, 2,
294                     _NETBIOS_OWNER_MODE_TYPES),
295        BitEnumField("UNUSED", 0, 13, {0: "Unused"}),
296        IPField("NB_ADDRESS", "127.0.0.1")
297    ]
298
299    def mysummary(self):
300        return self.sprintf("Register %G% %QUESTION_NAME% at %NB_ADDRESS%")
301
302
303bind_bottom_up(NBNSHeader, NBNSRegistrationRequest, OPCODE=0x5)
304bind_layers(NBNSHeader, NBNSRegistrationRequest,
305            OPCODE=0x5, NM_FLAGS=0x11, QDCOUNT=1, ARCOUNT=1)
306
307
308# Wait for Acknowledgement Response
309# RFC1002 sect 4.2.16
310
311class NBNSWackResponse(Packet):
312    name = "NBNS Wait for Acknowledgement Response"
313    fields_desc = [NetBIOSNameField("RR_NAME", "windows"),
314                   ShortEnumField("SUFFIX", 0x4141, _NETBIOS_SUFFIXES),
315                   ByteField("NULL", 0),
316                   ShortEnumField("RR_TYPE", 0x20, _NETBIOS_QRTYPES),
317                   ShortEnumField("RR_CLASS", 1, _NETBIOS_QRCLASS),
318                   IntField("TTL", 2),
319                   ShortField("RDLENGTH", 2),
320                   BitField("RDATA", 10512, 16)]  # 10512=0010100100010000
321
322
323bind_layers(NBNSHeader, NBNSWackResponse,
324            OPCODE=0x7, NM_FLAGS=0x40, RESPONSE=1, ANCOUNT=1)
325
326# NetBIOS DATAGRAM HEADER
327
328
329class NBTDatagram(Packet):
330    name = "NBT Datagram Packet"
331    fields_desc = [ByteField("Type", 0x10),
332                   ByteField("Flags", 0x02),
333                   ShortField("ID", 0),
334                   IPField("SourceIP", "127.0.0.1"),
335                   ShortField("SourcePort", 138),
336                   ShortField("Length", None),
337                   ShortField("Offset", 0),
338                   NetBIOSNameField("SourceName", "windows"),
339                   ShortEnumField("SUFFIX1", 0x4141, _NETBIOS_SUFFIXES),
340                   ByteField("NULL1", 0),
341                   NetBIOSNameField("DestinationName", "windows"),
342                   ShortEnumField("SUFFIX2", 0x4141, _NETBIOS_SUFFIXES),
343                   ByteField("NULL2", 0)]
344
345    def post_build(self, pkt, pay):
346        if self.Length is None:
347            length = len(pay) + 68
348            pkt = pkt[:10] + struct.pack("!H", length) + pkt[12:]
349        return pkt + pay
350
351
352# SESSION SERVICE PACKETS
353
354
355class NBTSession(Packet):
356    name = "NBT Session Packet"
357    MAXLENGTH = 0x3ffff
358    fields_desc = [ByteEnumField("TYPE", 0, {0x00: "Session Message",
359                                             0x81: "Session Request",
360                                             0x82: "Positive Session Response",
361                                             0x83: "Negative Session Response",
362                                             0x84: "Retarget Session Response",
363                                             0x85: "Session Keepalive"}),
364                   BitField("RESERVED", 0x00, 7),
365                   BitField("LENGTH", None, 17)]
366
367    def post_build(self, pkt, pay):
368        if self.LENGTH is None:
369            length = len(pay) & self.MAXLENGTH
370            pkt = pkt[:1] + struct.pack("!I", length)[1:]
371        return pkt + pay
372
373    def extract_padding(self, s):
374        return s[:self.LENGTH], s[self.LENGTH:]
375
376    @classmethod
377    def tcp_reassemble(cls, data, *args, **kwargs):
378        if len(data) < 4:
379            return None
380        length = struct.unpack("!I", data[:4])[0] & cls.MAXLENGTH
381        if len(data) >= length + 4:
382            return cls(data)
383
384
385bind_bottom_up(UDP, NBNSHeader, dport=137)
386bind_bottom_up(UDP, NBNSHeader, sport=137)
387bind_top_down(UDP, NBNSHeader, sport=137, dport=137)
388
389bind_bottom_up(UDP, NBTDatagram, dport=138)
390bind_bottom_up(UDP, NBTDatagram, sport=138)
391bind_top_down(UDP, NBTDatagram, sport=138, dport=138)
392
393bind_bottom_up(TCP, NBTSession, dport=445)
394bind_bottom_up(TCP, NBTSession, sport=445)
395bind_bottom_up(TCP, NBTSession, dport=139)
396bind_bottom_up(TCP, NBTSession, sport=139)
397bind_layers(TCP, NBTSession, dport=139, sport=139)
398
399
400class NBNS_am(AnsweringMachine):
401    function_name = "nbnsd"
402    filter = "udp port 137"
403    sniff_options = {"store": 0}
404
405    def parse_options(self, server_name=None, from_ip=None, ip=None):
406        """
407        NBNS answering machine
408
409        :param server_name: the netbios server name to match
410        :param from_ip: an IP (can have a netmask) to filter on
411        :param ip: the IP to answer with
412        """
413        self.ServerName = bytes_encode(server_name or "")
414        self.ip = ip
415        if isinstance(from_ip, str):
416            self.from_ip = Net(from_ip)
417        else:
418            self.from_ip = from_ip
419
420    def is_request(self, req):
421        if self.from_ip and IP in req and req[IP].src not in self.from_ip:
422            return False
423        return NBNSQueryRequest in req and (
424            not self.ServerName or
425            req[NBNSQueryRequest].QUESTION_NAME.strip() == self.ServerName
426        )
427
428    def make_reply(self, req):
429        # type: (Packet) -> Packet
430        resp = Ether(
431            dst=req[Ether].src,
432            src=None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst,
433        ) / IP(dst=req[IP].src) / UDP(
434            sport=req.dport,
435            dport=req.sport,
436        )
437        address = self.ip or get_if_addr(self.optsniff.get("iface", conf.iface))
438        resp /= NBNSHeader() / NBNSQueryResponse(
439            RR_NAME=self.ServerName or req.QUESTION_NAME,
440            SUFFIX=req.SUFFIX,
441            ADDR_ENTRY=[NBNS_ADD_ENTRY(NB_ADDRESS=address)]
442        )
443        resp.NAME_TRN_ID = req.NAME_TRN_ID
444        return resp
445