1# SPDX-License-Identifier: GPL-2.0-or-later 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) 2021 Trend Micro Incorporated 5# Copyright (C) 2021 Alias Robotics S.L. 6 7""" 8Real-Time Publish-Subscribe Protocol (RTPS) dissection 9""" 10 11# scapy.contrib.description = RTPS common types 12# scapy.contrib.status = library 13 14import struct 15 16from scapy.fields import ( 17 _FieldContainer, 18 BitField, 19 ConditionalField, 20 EnumField, 21 ByteField, 22 IntField, 23 IPField, 24 LEIntField, 25 PacketField, 26 PacketListField, 27 ReversePadField, 28 StrField, 29 StrLenField, 30 UUIDField, 31 XIntField, 32 XStrFixedLenField, 33) 34from scapy.packet import Packet 35 36FORMAT_LE = "<" 37FORMAT_BE = ">" 38STR_MAX_LEN = 8192 39DEFAULT_ENDIANNESS = FORMAT_LE 40 41 42def is_le(pkt): 43 if hasattr(pkt, "submessageFlags"): 44 end = pkt.submessageFlags & 0b000000001 == 0b000000001 45 return end 46 47 return False 48 49 50def e_flags(pkt): 51 if is_le(pkt): 52 return FORMAT_LE 53 else: 54 return FORMAT_BE 55 56 57class EField(_FieldContainer): 58 """ 59 A field that manages endianness of a nested field passed to the constructor 60 """ 61 62 __slots__ = ["fld", "endianness", "endianness_from"] 63 64 def __init__(self, fld, endianness=None, endianness_from=None): 65 self.fld = fld 66 self.endianness = endianness 67 self.endianness_from = endianness_from 68 69 def set_endianness(self, pkt): 70 if getattr(pkt, "endianness", None) is not None: 71 self.endianness = pkt.endianness 72 elif self.endianness_from is not None: 73 self.endianness = self.endianness_from(pkt) 74 75 if isinstance(self.endianness, str) and self.endianness: 76 if isinstance(self.fld, UUIDField): 77 self.fld.uuid_fmt = (UUIDField.FORMAT_LE if 78 self.endianness == '<' 79 else UUIDField.FORMAT_BE) 80 elif hasattr(self.fld, "fmt"): 81 if len(self.fld.fmt) == 1: # if it's only "I" 82 _end = self.fld.fmt[0] 83 else: # if it's "<I" 84 _end = self.fld.fmt[1:] 85 self.fld.fmt = self.endianness + _end 86 self.fld.struct = struct.Struct(self.fld.fmt) 87 88 def getfield(self, pkt, buf): 89 self.set_endianness(pkt) 90 return self.fld.getfield(pkt, buf) 91 92 def addfield(self, pkt, buf, val): 93 self.set_endianness(pkt) 94 return self.fld.addfield(pkt, buf, val) 95 96 97class EPacket(Packet): 98 """A packet that manages its endianness""" 99 100 __slots__ = ["endianness"] 101 102 def __init__(self, *args, **kwargs): 103 self.endianness = kwargs.pop("endianness", None) 104 super(EPacket, self).__init__(*args, **kwargs) 105 106 def clone_with(self, *args, **kwargs): 107 pkt = super(EPacket, self).clone_with(*args, **kwargs) 108 pkt.endianness = self.endianness 109 return pkt 110 111 def extract_padding(self, p): 112 return b"", p 113 114 115class _EPacketField(object): 116 """ 117 A packet field that manages its endianness and that of its nested packet 118 """ 119 120 def __init__(self, *args, **kwargs): 121 self.endianness = kwargs.pop("endianness", None) 122 self.endianness_from = kwargs.pop("endianness_from", e_flags) 123 super(_EPacketField, self).__init__(*args, **kwargs) 124 125 def set_endianness(self, pkt): 126 if getattr(pkt, "endianness", None) is not None: 127 self.endianness = pkt.endianness 128 elif self.endianness_from is not None: 129 self.endianness = self.endianness_from(pkt) 130 131 def m2i(self, pkt, m): 132 self.set_endianness(pkt) 133 return self.cls(m, endianness=self.endianness) 134 135 def i2m(self, pkt, m): 136 if m: 137 self.set_endianness(pkt) 138 m.endianness = self.endianness 139 return super(_EPacketField, self).i2m(pkt, m) 140 141 142class EPacketField(_EPacketField, PacketField): 143 """ 144 A PacketField that manages its endianness and that of its nested packet 145 """ 146 __slots__ = ["endianness", "endianness_from"] 147 148 149class EPacketListField(_EPacketField, PacketListField): 150 """ 151 A PacketListField that manages its endianness and 152 that of its nested packet 153 """ 154 __slots__ = ["endianness", "endianness_from"] 155 156 157class SerializedDataField(StrLenField): 158 pass 159 160 161class DataPacketField(EPacketField): 162 def m2i(self, pkt, m): 163 self.set_endianness(pkt) 164 pl_len = pkt.octetsToNextHeader - 24 165 _pkt = self.cls( 166 m, 167 endianness=self.endianness, 168 writer_entity_id_key=pkt.writerEntityIdKey, 169 writer_entity_id_kind=pkt.writerEntityIdKind, 170 pl_len=pl_len, 171 ) 172 173 return _pkt 174 175 176class InlineQoSPacketField(EPacketField): 177 pass 178 179 180class PIDPadField(StrField): 181 def getfield(self, pkt, s): 182 len_pkt = 2 # TODO this is dynamic 183 return s[len_pkt:], self.m2i(pkt, s[:len_pkt]) 184 185 186class GUIDPacket(Packet): 187 name = "RTPS GUID" 188 fields_desc = [ 189 XIntField("hostId", 0), 190 XIntField("appId", 0), 191 XIntField("instanceId", 0), 192 XIntField("entityId", 0), 193 ] 194 195 def extract_padding(self, p): 196 return b"", p 197 198 199class LocatorPacket(EPacket): 200 name = "RTPS Locator" 201 fields_desc = [ 202 EField( 203 XIntField("locatorKind", 0), 204 endianness=FORMAT_LE, 205 endianness_from=None), 206 EField( 207 IntField("port", 0), 208 endianness=FORMAT_LE, 209 endianness_from=None), 210 ConditionalField( 211 ReversePadField(IPField("address", "0.0.0.0"), 20), 212 lambda p: p.locatorKind == 0x1 213 ), 214 ConditionalField( 215 XStrFixedLenField("hostId", 0x0, 16), 216 lambda p: p.locatorKind == 0x01000000 217 ) 218 ] 219 220 def extract_padding(self, p): 221 return b"", p 222 223 224class ProductVersionPacket(EPacket): 225 name = "Product Version" 226 fields_desc = [ 227 ByteField("major", 0), 228 ByteField("minor", 0), 229 ByteField("release", 0), 230 ByteField("revision", 0), 231 ] 232 233 234class TransportInfoPacket(EPacket): 235 name = "Transport Info" 236 fields_desc = [ 237 LEIntField("classID", 0), 238 LEIntField("messageSizeMax", 0) 239 ] 240 241 242class EndpointFlagsPacket(Packet): 243 name = "RTPS Endpoint Builtin Endpoint Flags" 244 fields_desc = [ 245 BitField("participantSecureReader", 0, 1), 246 BitField("participantSecureWriter", 0, 1), 247 BitField("secureParticipantVolatileMessageReader", 0, 1), 248 BitField("secureParticipantVolatileMessageWriter", 0, 1), 249 BitField("participantStatelessMessageReader", 0, 1), 250 BitField("participantStatelessMessageWriter", 0, 1), 251 BitField("secureParticipantMessageReader", 0, 1), 252 BitField("secureParticipantMessageWriter", 0, 1), 253 BitField("secureSubscriptionReader", 0, 1), 254 BitField("secureSubscriptionWriter", 0, 1), 255 BitField("securePublicationReader", 0, 1), 256 BitField("securePublicationWriter", 0, 1), 257 BitField("reserved", 0, 4), 258 BitField("participantMessageDataReader", 0, 1), 259 BitField("participantMessageDataWriter", 0, 1), 260 BitField("participantStateDetector", 0, 1), 261 BitField("participantStateAnnouncer", 0, 1), 262 BitField("publicationDetector", 0, 1), 263 BitField("publicationAnnouncer", 0, 1), 264 BitField("participantDetector", 0, 1), 265 BitField("participantAnnouncer", 0, 1), 266 ] 267 268 def extract_padding(self, p): 269 return b"", p 270 271 272class ProtocolVersionPacket(Packet): 273 name = "RTPS Protocol Version" 274 fields_desc = [ByteField("major", 0), ByteField("minor", 0)] 275 276 def extract_padding(self, p): 277 return b"", p 278 279 280_rtps_vendor_ids = { 281 0x0000: "VENDOR_ID_UNKNOWN (0x0000)", 282 0x0101: "Real-Time Innovations, Inc. (RTI) - Connext DDS", 283 0x0102: "ADLink Ltd. - OpenSplice DDS", 284 0x0103: "Object Computing Inc. (OCI) - OpenDDS", 285 0x0104: "MilSoft - Mil-DDS", 286 0x0105: "Kongsberg - InterCOM DDS", 287 0x0106: "Twin Oaks Computing, Inc. - CoreDX DDS", 288 0x0107: "Lakota Technical Solutions, Inc.", 289 0x0108: "ICOUP Consulting", 290 0x0109: "Electronics and Telecommunication Research Institute (ETRI) - Diamond DDS", 291 0x010A: "Real-Time Innovations, Inc. (RTI) - Connext DDS Micro", 292 0x010B: "ADLink Ltd. - VortexCafe", 293 0x010C: "PrismTech Ltd", 294 0x010D: "ADLink Ltd. - Vortex Lite", 295 0x010E: "Technicolor - Qeo", 296 0x010F: "eProsima - FastRTPS, FastDDS", 297 0x0110: "Eclipse Foundation - Cyclone DDS", 298 0x0111: "Gurum Networks, Inc. - GurumDDS", 299 0x0112: "Atostek - RustDDS", 300 0x0113: "Nanjing Zhenrong Software Technology Co. \ 301 - Zhenrong Data Distribution Service (ZRDDS)", 302 0x0114: "S2E Software Systems B.V. - Dust DDS", 303} 304 305 306class VendorIdPacket(Packet): 307 name = "RTPS Vendor ID" 308 fields_desc = [ 309 # ByteField("major", 0), 310 # ByteField("minor", 0), 311 EnumField( 312 name="vendor_id", 313 default=0, 314 enum=_rtps_vendor_ids, 315 ), 316 ] 317 318 def extract_padding(self, p): 319 return b"", p 320 321 322class LeaseDurationPacket(Packet): 323 name = "Lease Duration" 324 fields_desc = [ 325 IntField("seconds", 0), 326 IntField("fraction", 0), 327 ] 328 329 def extract_padding(self, p): 330 return b"", p 331