1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import ipaddress 31import struct 32from binascii import hexlify 33from enum import IntEnum 34 35import common 36 37 38class TlvType(IntEnum): 39 TARGET_EID = 0 40 MAC_EXTENDED_ADDRESS = 1 41 RLOC16 = 2 42 ML_EID = 3 43 STATUS = 4 44 TIME_SINCE_LAST_TRANSACTION = 6 45 ROUTER_MASK = 7 46 ND_OPTION = 8 47 ND_DATA = 9 48 THREAD_NETWORK_DATA = 10 49 MLE_ROUTING = 11 50 IPv6_ADDRESSES = 14 51 XTAL_ACCURACY = 254 52 53 54class StatusValues(IntEnum): 55 SUCCESS = 0 56 NO_ADDRESS_AVAILABLE = 1 57 TOO_FEW_ROUTERS = 2 58 HAVE_CHILD_ID_REQUEST = 3 59 PARENT_PARTITION_CHANGE = 4 60 61 62class TargetEid(object): 63 64 def __init__(self, eid): 65 self._eid = eid 66 67 @property 68 def eid(self): 69 return self._eid 70 71 def __eq__(self, other): 72 common.expect_the_same_class(self, other) 73 return self.eid == other.eid 74 75 def __repr__(self): 76 return "TargetEid(eid={})".format(hexlify(self.eid)) 77 78 79class TargetEidFactory(object): 80 81 def parse(self, data, message_info): 82 eid = bytearray(data.read(16)) 83 84 return TargetEid(eid) 85 86 87class MacExtendedAddress(object): 88 89 def __init__(self, mac_address): 90 self._mac_address = mac_address 91 92 @property 93 def mac_address(self): 94 return self._mac_address 95 96 def __eq__(self, other): 97 common.expect_the_same_class(self, other) 98 return self.mac_address == other.mac_address 99 100 def __repr__(self): 101 return "MacExtendedAddress(mac_address={})".format(hexlify(self.mac_address)) 102 103 104class MacExtendedAddressFactory(object): 105 106 def parse(self, data, message_info): 107 mac_address = bytearray(data.read(8)) 108 109 return MacExtendedAddress(mac_address) 110 111 112class Rloc16(object): 113 114 def __init__(self, rloc16): 115 self._rloc16 = rloc16 116 117 @property 118 def rloc16(self): 119 return self._rloc16 120 121 def __eq__(self, other): 122 common.expect_the_same_class(self, other) 123 return self.rloc16 == other.rloc16 124 125 def __repr__(self): 126 return "Rloc16(rloc16={})".format(hex(self.rloc16)) 127 128 129class Rloc16Factory(object): 130 131 def parse(self, data, message_info): 132 rloc16 = struct.unpack(">H", data.read(2))[0] 133 134 return Rloc16(rloc16) 135 136 137class MlEid(object): 138 139 def __init__(self, ml_eid): 140 self._ml_eid = ml_eid 141 142 @property 143 def ml_eid(self): 144 return self._ml_eid 145 146 def __eq__(self, other): 147 common.expect_the_same_class(self, other) 148 return self.ml_eid == other.ml_eid 149 150 def __repr__(self): 151 return "MlEid(ml_eid={})".format(hexlify(self.ml_eid)) 152 153 154class MlEidFactory(object): 155 156 def parse(self, data, message_info): 157 ml_eid = bytearray(data.read(8)) 158 159 return MlEid(ml_eid) 160 161 162class Status(object): 163 164 def __init__(self, status): 165 self._status = status 166 167 @property 168 def status(self): 169 return self._status 170 171 def __eq__(self, other): 172 common.expect_the_same_class(self, other) 173 return self.status == other.status 174 175 def __repr__(self): 176 return "Status(status={})".format(self.status) 177 178 179class StatusFactory(object): 180 181 def parse(self, data, message_info): 182 status = StatusValues(ord(data.read(1))) 183 184 return Status(status) 185 186 187class TimeSinceLastTransaction(object): 188 189 def __init__(self, seconds): 190 self._seconds = seconds 191 192 @property 193 def seconds(self): 194 return self._seconds 195 196 def __eq__(self, other): 197 common.expect_the_same_class(self, other) 198 return self.seconds == other.seconds 199 200 def __repr__(self): 201 return "TimeSinceLastTransaction(seconds={})".format(self.seconds) 202 203 204class TimeSinceLastTransactionFactory(object): 205 206 def parse(self, data, message_info): 207 seconds = struct.unpack(">L", data.read(4))[0] 208 209 return TimeSinceLastTransaction(seconds) 210 211 212class RouterMask(object): 213 214 def __init__(self, id_sequence, router_id_mask): 215 self._id_sequence = id_sequence 216 self._router_id_mask = router_id_mask 217 218 @property 219 def id_sequence(self): 220 return self._id_sequence 221 222 @property 223 def router_id_mask(self): 224 return self._router_id_mask 225 226 def __eq__(self, other): 227 common.expect_the_same_class(self, other) 228 return (self.id_sequence == other.id_sequence and self.router_id_mask == other.router_id_mask) 229 230 def __repr__(self): 231 return "RouterMask(id_sequence={}, router_id_mask={})".format(self.id_sequence, hex(self.router_id_mask)) 232 233 234class RouterMaskFactory(object): 235 236 def parse(self, data, message_info): 237 id_sequence = ord(data.read(1)) 238 router_id_mask = struct.unpack(">Q", data.read(8))[0] 239 240 return RouterMask(id_sequence, router_id_mask) 241 242 243class NdOption(object): 244 245 def __init__(self, options): 246 self._options = options 247 248 @property 249 def options(self): 250 return self._options 251 252 def __eq__(self, other): 253 common.expect_the_same_class(self, other) 254 return self.options == other.options 255 256 def __repr__(self): 257 return "NdOption(options=[{}])".format(", ".join([str(opt) for opt in self.options])) 258 259 260class NdOptionFactory(object): 261 262 def parse(self, data, message_info): 263 options = [opt for opt in bytearray(data.read())] 264 return NdOption(options) 265 266 267class NdData(object): 268 # TODO: Not implemented yet 269 pass 270 271 272class NdDataFactory(object): 273 # TODO: Not implemented yet 274 275 def parse(self, data, message_info): 276 raise NotImplementedError("TODO: Not implemented yet") 277 278 279class XtalAccuracy: 280 # TODO: Not implemented yet 281 282 def __init__(self): 283 print("XtalAccuracy is not implemented yet.") 284 285 286class XtalAccuracyFactory: 287 288 def parse(self, data, message_info): 289 return XtalAccuracy() 290 291 292class ThreadNetworkData(object): 293 294 def __init__(self, tlvs): 295 self._tlvs = tlvs 296 297 @property 298 def tlvs(self): 299 return self._tlvs 300 301 def __eq__(self, other): 302 common.expect_the_same_class(self, other) 303 return self.tlvs == other.tlvs 304 305 def __repr__(self): 306 return "ThreadNetworkData(tlvs=[{}])".format(", ".join([str(tlv) for tlv in self.tlvs])) 307 308 309class ThreadNetworkDataFactory(object): 310 311 def __init__(self, network_data_tlvs_factory): 312 self._network_data_tlvs_factory = network_data_tlvs_factory 313 314 def parse(self, data, message_info): 315 tlvs = self._network_data_tlvs_factory.parse(data, message_info) 316 return ThreadNetworkData(tlvs) 317 318 319class IPv6Addresses(list): 320 pass 321 322 323class IPv6AddressesFactory(object): 324 325 def parse(self, data, message_info): 326 data = bytes(data.read()) 327 assert len(data) % 16 == 0, data 328 addrs = IPv6Addresses() 329 for i in range(0, len(data), 16): 330 addr = ipaddress.IPv6Address(data[i:i + 16]) 331 addrs.append(addr) 332 333 return addrs 334