• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-or-later
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2021 Trend Micro Incorporated
5# Copyright (C) 2021 Alias Robotics S.L.
6
7"""
8Real-Time Publish-Subscribe Protocol (RTPS) dissection
9"""
10
11# scapy.contrib.description = RTPS common types
12# scapy.contrib.status = library
13
14import struct
15
16from scapy.fields import (
17    _FieldContainer,
18    BitField,
19    ConditionalField,
20    EnumField,
21    ByteField,
22    IntField,
23    IPField,
24    LEIntField,
25    PacketField,
26    PacketListField,
27    ReversePadField,
28    StrField,
29    StrLenField,
30    UUIDField,
31    XIntField,
32    XStrFixedLenField,
33)
34from scapy.packet import Packet
35
36FORMAT_LE = "<"
37FORMAT_BE = ">"
38STR_MAX_LEN = 8192
39DEFAULT_ENDIANNESS = FORMAT_LE
40
41
42def is_le(pkt):
43    if hasattr(pkt, "submessageFlags"):
44        end = pkt.submessageFlags & 0b000000001 == 0b000000001
45        return end
46
47    return False
48
49
50def e_flags(pkt):
51    if is_le(pkt):
52        return FORMAT_LE
53    else:
54        return FORMAT_BE
55
56
57class EField(_FieldContainer):
58    """
59    A field that manages endianness of a nested field passed to the constructor
60    """
61
62    __slots__ = ["fld", "endianness", "endianness_from"]
63
64    def __init__(self, fld, endianness=None, endianness_from=None):
65        self.fld = fld
66        self.endianness = endianness
67        self.endianness_from = endianness_from
68
69    def set_endianness(self, pkt):
70        if getattr(pkt, "endianness", None) is not None:
71            self.endianness = pkt.endianness
72        elif self.endianness_from is not None:
73            self.endianness = self.endianness_from(pkt)
74
75        if isinstance(self.endianness, str) and self.endianness:
76            if isinstance(self.fld, UUIDField):
77                self.fld.uuid_fmt = (UUIDField.FORMAT_LE if
78                                     self.endianness == '<'
79                                     else UUIDField.FORMAT_BE)
80            elif hasattr(self.fld, "fmt"):
81                if len(self.fld.fmt) == 1:  # if it's only "I"
82                    _end = self.fld.fmt[0]
83                else:  # if it's "<I"
84                    _end = self.fld.fmt[1:]
85                self.fld.fmt = self.endianness + _end
86                self.fld.struct = struct.Struct(self.fld.fmt)
87
88    def getfield(self, pkt, buf):
89        self.set_endianness(pkt)
90        return self.fld.getfield(pkt, buf)
91
92    def addfield(self, pkt, buf, val):
93        self.set_endianness(pkt)
94        return self.fld.addfield(pkt, buf, val)
95
96
97class EPacket(Packet):
98    """A packet that manages its endianness"""
99
100    __slots__ = ["endianness"]
101
102    def __init__(self, *args, **kwargs):
103        self.endianness = kwargs.pop("endianness", None)
104        super(EPacket, self).__init__(*args, **kwargs)
105
106    def clone_with(self, *args, **kwargs):
107        pkt = super(EPacket, self).clone_with(*args, **kwargs)
108        pkt.endianness = self.endianness
109        return pkt
110
111    def extract_padding(self, p):
112        return b"", p
113
114
115class _EPacketField(object):
116    """
117    A packet field that manages its endianness and that of its nested packet
118    """
119
120    def __init__(self, *args, **kwargs):
121        self.endianness = kwargs.pop("endianness", None)
122        self.endianness_from = kwargs.pop("endianness_from", e_flags)
123        super(_EPacketField, self).__init__(*args, **kwargs)
124
125    def set_endianness(self, pkt):
126        if getattr(pkt, "endianness", None) is not None:
127            self.endianness = pkt.endianness
128        elif self.endianness_from is not None:
129            self.endianness = self.endianness_from(pkt)
130
131    def m2i(self, pkt, m):
132        self.set_endianness(pkt)
133        return self.cls(m, endianness=self.endianness)
134
135    def i2m(self, pkt, m):
136        if m:
137            self.set_endianness(pkt)
138            m.endianness = self.endianness
139        return super(_EPacketField, self).i2m(pkt, m)
140
141
142class EPacketField(_EPacketField, PacketField):
143    """
144    A PacketField that manages its endianness and that of its nested packet
145    """
146    __slots__ = ["endianness", "endianness_from"]
147
148
149class EPacketListField(_EPacketField, PacketListField):
150    """
151    A PacketListField that manages its endianness and
152    that of its nested packet
153    """
154    __slots__ = ["endianness", "endianness_from"]
155
156
157class SerializedDataField(StrLenField):
158    pass
159
160
161class DataPacketField(EPacketField):
162    def m2i(self, pkt, m):
163        self.set_endianness(pkt)
164        pl_len = pkt.octetsToNextHeader - 24
165        _pkt = self.cls(
166            m,
167            endianness=self.endianness,
168            writer_entity_id_key=pkt.writerEntityIdKey,
169            writer_entity_id_kind=pkt.writerEntityIdKind,
170            pl_len=pl_len,
171        )
172
173        return _pkt
174
175
176class InlineQoSPacketField(EPacketField):
177    pass
178
179
180class PIDPadField(StrField):
181    def getfield(self, pkt, s):
182        len_pkt = 2  # TODO this is dynamic
183        return s[len_pkt:], self.m2i(pkt, s[:len_pkt])
184
185
186class GUIDPacket(Packet):
187    name = "RTPS GUID"
188    fields_desc = [
189        XIntField("hostId", 0),
190        XIntField("appId", 0),
191        XIntField("instanceId", 0),
192        XIntField("entityId", 0),
193    ]
194
195    def extract_padding(self, p):
196        return b"", p
197
198
199class LocatorPacket(EPacket):
200    name = "RTPS Locator"
201    fields_desc = [
202        EField(
203            XIntField("locatorKind", 0),
204            endianness=FORMAT_LE,
205            endianness_from=None),
206        EField(
207            IntField("port", 0),
208            endianness=FORMAT_LE,
209            endianness_from=None),
210        ConditionalField(
211            ReversePadField(IPField("address", "0.0.0.0"), 20),
212            lambda p: p.locatorKind == 0x1
213        ),
214        ConditionalField(
215            XStrFixedLenField("hostId", 0x0, 16),
216            lambda p: p.locatorKind == 0x01000000
217        )
218    ]
219
220    def extract_padding(self, p):
221        return b"", p
222
223
224class ProductVersionPacket(EPacket):
225    name = "Product Version"
226    fields_desc = [
227        ByteField("major", 0),
228        ByteField("minor", 0),
229        ByteField("release", 0),
230        ByteField("revision", 0),
231    ]
232
233
234class TransportInfoPacket(EPacket):
235    name = "Transport Info"
236    fields_desc = [
237        LEIntField("classID", 0),
238        LEIntField("messageSizeMax", 0)
239    ]
240
241
242class EndpointFlagsPacket(Packet):
243    name = "RTPS Endpoint Builtin Endpoint Flags"
244    fields_desc = [
245        BitField("participantSecureReader", 0, 1),
246        BitField("participantSecureWriter", 0, 1),
247        BitField("secureParticipantVolatileMessageReader", 0, 1),
248        BitField("secureParticipantVolatileMessageWriter", 0, 1),
249        BitField("participantStatelessMessageReader", 0, 1),
250        BitField("participantStatelessMessageWriter", 0, 1),
251        BitField("secureParticipantMessageReader", 0, 1),
252        BitField("secureParticipantMessageWriter", 0, 1),
253        BitField("secureSubscriptionReader", 0, 1),
254        BitField("secureSubscriptionWriter", 0, 1),
255        BitField("securePublicationReader", 0, 1),
256        BitField("securePublicationWriter", 0, 1),
257        BitField("reserved", 0, 4),
258        BitField("participantMessageDataReader", 0, 1),
259        BitField("participantMessageDataWriter", 0, 1),
260        BitField("participantStateDetector", 0, 1),
261        BitField("participantStateAnnouncer", 0, 1),
262        BitField("publicationDetector", 0, 1),
263        BitField("publicationAnnouncer", 0, 1),
264        BitField("participantDetector", 0, 1),
265        BitField("participantAnnouncer", 0, 1),
266    ]
267
268    def extract_padding(self, p):
269        return b"", p
270
271
272class ProtocolVersionPacket(Packet):
273    name = "RTPS Protocol Version"
274    fields_desc = [ByteField("major", 0), ByteField("minor", 0)]
275
276    def extract_padding(self, p):
277        return b"", p
278
279
280_rtps_vendor_ids = {
281    0x0000: "VENDOR_ID_UNKNOWN (0x0000)",
282    0x0101: "Real-Time Innovations, Inc. (RTI) - Connext DDS",
283    0x0102: "ADLink Ltd. - OpenSplice DDS",
284    0x0103: "Object Computing Inc. (OCI) - OpenDDS",
285    0x0104: "MilSoft - Mil-DDS",
286    0x0105: "Kongsberg - InterCOM DDS",
287    0x0106: "Twin Oaks Computing, Inc. - CoreDX DDS",
288    0x0107: "Lakota Technical Solutions, Inc.",
289    0x0108: "ICOUP Consulting",
290    0x0109: "Electronics and Telecommunication Research Institute (ETRI) - Diamond DDS",
291    0x010A: "Real-Time Innovations, Inc. (RTI) - Connext DDS Micro",
292    0x010B: "ADLink Ltd. - VortexCafe",
293    0x010C: "PrismTech Ltd",
294    0x010D: "ADLink Ltd. - Vortex Lite",
295    0x010E: "Technicolor - Qeo",
296    0x010F: "eProsima - FastRTPS, FastDDS",
297    0x0110: "Eclipse Foundation - Cyclone DDS",
298    0x0111: "Gurum Networks, Inc. - GurumDDS",
299    0x0112: "Atostek - RustDDS",
300    0x0113: "Nanjing Zhenrong Software Technology Co. \
301        - Zhenrong Data Distribution Service (ZRDDS)",
302    0x0114: "S2E Software Systems B.V. - Dust DDS",
303}
304
305
306class VendorIdPacket(Packet):
307    name = "RTPS Vendor ID"
308    fields_desc = [
309        # ByteField("major", 0),
310        # ByteField("minor", 0),
311        EnumField(
312            name="vendor_id",
313            default=0,
314            enum=_rtps_vendor_ids,
315        ),
316    ]
317
318    def extract_padding(self, p):
319        return b"", p
320
321
322class LeaseDurationPacket(Packet):
323    name = "Lease Duration"
324    fields_desc = [
325        IntField("seconds", 0),
326        IntField("fraction", 0),
327    ]
328
329    def extract_padding(self, p):
330        return b"", p
331