• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-or-later
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7EPT map (EndPoinT mapper)
8"""
9
10import uuid
11
12from scapy.config import conf
13from scapy.fields import (
14    ByteEnumField,
15    ConditionalField,
16    FieldLenField,
17    IPField,
18    LEShortField,
19    MultipleTypeField,
20    PacketListField,
21    ShortField,
22    StrLenField,
23    UUIDEnumField,
24)
25from scapy.packet import Packet
26from scapy.layers.dcerpc import (
27    DCE_RPC_INTERFACES_NAMES,
28    DCE_RPC_INTERFACES_NAMES_rev,
29    DCE_RPC_TRANSFER_SYNTAXES,
30)
31
32from scapy.layers.msrpce.raw.ept import *  # noqa: F401, F403
33
34
35# [C706] Appendix L
36
37# "For historical reasons, this cannot be done using the standard
38# NDR encoding rules for marshalling and unmarshalling.
39# A special encoding is required." - Appendix L
40
41
42class octet_string_t(Packet):
43    fields_desc = [
44        FieldLenField("count", None, fmt="<H", length_of="value"),
45        StrLenField("value", b"", length_from=lambda pkt: pkt.count),
46    ]
47
48    def default_payload_class(self, _):
49        return conf.padding_layer
50
51
52def _uuid_res(x):
53    # Look in both DCE_RPC_INTERFACES_NAMES and DCE_RPC_TRANSFER_SYNTAXES
54    dct = DCE_RPC_INTERFACES_NAMES.copy()
55    dct.update(DCE_RPC_TRANSFER_SYNTAXES)
56    return dct.get(x)
57
58
59def _uuid_res_rev(x):
60    # Same but reversed
61    dct = DCE_RPC_INTERFACES_NAMES_rev.copy()
62    dct.update({v: k for k, v in DCE_RPC_TRANSFER_SYNTAXES.items()})
63    return dct.get(x)
64
65
66class prot_and_addr_t(Packet):
67    fields_desc = [
68        # --- LHS
69        LEShortField(
70            "lhs_length",
71            0,
72        ),
73        # [C706] Appendix I with names from Appendix B
74        ByteEnumField(
75            "protocol_identifier",
76            0,
77            {
78                0x0: "OSI OID",  # Special
79                0x0D: "UUID",  # Special
80                # Transports
81                # 0x2: "DNA Session Control",
82                # 0x3: "DNA Session Control V3",
83                # 0x4: "DNA NSP Transport",
84                # 0x5: "OSI TP4",
85                0x06: "NCADG_OSI_CLSN",  # [C706]
86                0x07: "NCACN_IP_TCP",  # [C706]
87                0x08: "NCADG_IP_UDP",  # [C706]
88                0x09: "IP",  # [C706]
89                0x0A: "RPC connectionless protocol",  # [C706]
90                0x0B: "RPC connection-oriented protocol",  # [C706]
91                0x0C: "NCALRPC",
92                0x0F: "NCACN_NP",  # [MS-RPCE]
93                0x11: "NCACN_NB",  # [C706]
94                0x12: "NCACN_NB_NB",  # [MS-RPCE]
95                0x13: "NCACN_SPX",  # [C706]
96                0x14: "NCADG_IPX",  # [C706]
97                0x16: "NCACN_AT_DSP",  # [C706]
98                0x17: "NCADG_AT_DSP",  # [C706]
99                0x19: "NCADG_NB",  # [C706]
100                0x1A: "NCACN_VNS_SPP",  # [C706]
101                0x1B: "NCADG_VNS_IPC",  # [C706]
102                0x1F: "NCACN_HTTP",  # [MS-RPCE]
103            },
104        ),
105        # 0x0
106        ConditionalField(
107            StrLenField("oid", "", length_from=lambda pkt: pkt.lhs_length - 1),
108            lambda pkt: pkt.protocol_identifier == 0x0,
109        ),
110        # 0xD
111        ConditionalField(
112            UUIDEnumField(
113                "uuid",
114                uuid.UUID("8a885d04-1ceb-11c9-9fe8-08002b104860"),
115                (
116                    # Those are dynamic
117                    _uuid_res,
118                    _uuid_res_rev,
119                ),
120                uuid_fmt=UUIDEnumField.FORMAT_LE,
121            ),
122            lambda pkt: pkt.protocol_identifier == 0xD,
123        ),
124        ConditionalField(
125            LEShortField("version", 0), lambda pkt: pkt.protocol_identifier == 0xD
126        ),
127        # Other
128        ConditionalField(
129            StrLenField("lhs", "", length_from=lambda pkt: pkt.lhs_length - 1),
130            lambda pkt: pkt.protocol_identifier not in [0x0, 0x7, 0xD],
131        ),
132        # --- RHS
133        LEShortField(
134            "rhs_length",
135            None,
136        ),
137        MultipleTypeField(
138            [
139                (
140                    # (big-endian)
141                    ShortField("rhs", 0),
142                    lambda pkt: pkt.protocol_identifier in [0x7, 0x8, 0x1F],
143                    "port",
144                ),
145                (
146                    # (big-endian)
147                    IPField("rhs", 0),
148                    lambda pkt: pkt.protocol_identifier == 0x9,
149                    "addr",
150                ),
151                (
152                    LEShortField("rhs", 5),
153                    lambda pkt: pkt.protocol_identifier in [0xA, 0xB, 0xD],
154                    "minor version",
155                ),
156                (
157                    StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
158                    lambda pkt: pkt.protocol_identifier == 0xF,
159                    "named pipe",
160                ),
161                (
162                    StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
163                    lambda pkt: pkt.protocol_identifier == 0x11,
164                    "netbios name",
165                ),
166            ],
167            StrLenField("rhs", "", length_from=lambda pkt: pkt.rhs_length),
168        ),
169    ]
170
171    def default_payload_class(self, _):
172        return conf.padding_layer
173
174
175class protocol_tower_t(Packet):
176    fields_desc = [
177        FieldLenField("count", None, fmt="<H", count_of="floors"),
178        PacketListField(
179            "floors",
180            [prot_and_addr_t()],
181            prot_and_addr_t,
182            count_from=lambda pkt: pkt.count,
183        ),
184    ]
185
186    def _summary(self):
187        if len(self.floors) < 4:
188            raise ValueError("Malformed protocol_tower_t (not enough floors)")
189        if self.floors[0].protocol_identifier != 0xD:
190            raise ValueError("Malformed protocol_tower_t (bad floor 1)")
191        if self.floors[1].protocol_identifier != 0xD:
192            raise ValueError("Malformed protocol_tower_t (bad floor 2)")
193        if self.floors[2].protocol_identifier in [0xA, 0xB]:  # Connection oriented/less
194            endpoint = "%s:%s" % (
195                self.floors[3].sprintf("%protocol_identifier%"),
196                ":".join(
197                    x.rhs.decode() if isinstance(x.rhs, bytes) else str(x.rhs)
198                    for x in self.floors[3:][::-1]
199                ),
200            )
201        elif self.floors[2].protocol_identifier == 0xC:  # NCALRPC
202            endpoint = "%s:%s" % (
203                self.floors[2].sprintf("%protocol_identifier%"),
204                self.floors[3].rhs.decode(),
205            )
206        else:
207            raise ValueError(
208                "Unknown RPC transport: %s" % self.floors[2].protocol_identifier
209            )
210        return (
211            self.floors[0].sprintf("%uuid% (%version%.%r,rhs%)"),
212            endpoint,
213        )
214
215    def mysummary(self):
216        try:
217            return "%s %s" % self._summary()
218        except ValueError as ex:
219            return str(ex)
220