1#!/usr/bin/env python3 2# 3# Copyright (c) 2020, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import struct 31 32from enum import IntEnum 33from typing import List 34 35import common 36import ipaddress 37import mle 38 39 40class TlvType(IntEnum): 41 EXT_ADDRESS = 0 42 ADDRESS16 = 1 43 MODE = 2 44 POLLING_PERIOD = 3 45 CONNECTIVITY = 4 46 ROUTE64 = 5 47 LEADER_DATA = 6 48 NETWORK_DATA = 7 49 IPV6_ADDRESS_LIST = 8 50 MAC_COUNTERS = 9 51 BATTERY_LEVEL = 14 52 SUPPLY_VOLTAGE = 15 53 CHILD_TABLE = 16 54 CHANNEL_PAGES = 17 55 TYPE_LIST = 18 56 MAX_CHILD_TIMEOUT = 19 57 58 59class Ipv6AddressList: 60 61 def __init__(self, addresses: List[ipaddress.IPv6Address]): 62 self._addresses = addresses 63 64 @property 65 def addresses(self): 66 return self._addresses 67 68 def __eq__(self, other): 69 common.expect_the_same_class(self, other) 70 return self.addresses == other.addresses 71 72 def __repr__(self): 73 return f'Ipv6AddressList({self.addresses})' 74 75 76class Ipv6AddressListFactory: 77 78 def parse(self, data, message_info): 79 addresses = [] 80 while data.tell() < message_info.length: 81 addresses.append(ipaddress.IPv6Address(data.read(16))) 82 return Ipv6AddressList(addresses) 83 84 85class MacCounters: 86 87 def __init__(self, counters: List[int]): 88 self._counters = counters 89 90 @property 91 def if_in_unknown_protos(self): 92 return self._counters[0] 93 94 @property 95 def if_in_errors(self): 96 return self._counters[1] 97 98 @property 99 def if_out_errors(self): 100 return self._counters[2] 101 102 @property 103 def if_in_ucast_pkts(self): 104 return self._counters[3] 105 106 @property 107 def if_in_broadcast_pkts(self): 108 return self._counters[4] 109 110 @property 111 def if_in_discards(self): 112 return self._counters[5] 113 114 @property 115 def if_out_ucast_pkts(self): 116 return self._counters[6] 117 118 @property 119 def if_out_broadcast_pkts(self): 120 return self._counters[7] 121 122 @property 123 def if_out_discards(self): 124 return self._counters[8] 125 126 @property 127 def counters(self): 128 return self._counters 129 130 def __eq__(self, other): 131 common.expect_the_same_class(self, other) 132 133 return self.counters == other.counters 134 135 def __repr__(self): 136 return ('MacCounters(' + f'if_in_unknown_protos={self.if_in_unknown_protos}, ' + 137 f'if_in_errors={self.if_in_errors}, ' + f'if_out_errors={self.if_out_errors}, ' + 138 f'if_in_ucast_pkts={self.if_in_ucast_pkts}, ' + f'if_in_broadcast_pkts={self.if_in_broadcast_pkts}, ' + 139 f'if_in_discards={self.if_in_discards}, ' + f'if_out_ucast_pkts={self.if_out_ucast_pkts}, ' + 140 f'if_out_broadcast_pkts={self.if_out_broadcast_pkts}, ' + f'if_out_discards={self.if_out_discards})') 141 142 143class MacCountersFactory: 144 145 def parse(self, data, message_info): 146 return MacCounters(struct.unpack('>9I', data.read(4 * 9))) 147 148 149class BatteryLevel: 150 151 def __init__(self, battery_level: int): 152 self._battery_level = battery_level 153 154 @property 155 def battery_level(self): 156 return self._battery_level 157 158 def __eq__(self, other): 159 common.expect_the_same_class(self, other) 160 161 return self.battery_level == other.battery_level 162 163 def __repr__(self): 164 return f'BatteryLevel(battery_level={self.battery_level})' 165 166 167class BatteryLevelFactory: 168 169 def parse(self, data, message_info): 170 return BatteryLevel(struct.unpack('>B', data.read(1))[0]) 171 172 173class SupplyVoltage: 174 175 def __init__(self, supply_voltage: int): 176 self._supply_voltage = supply_voltage 177 178 @property 179 def supply_voltage(self): 180 return self._supply_voltage 181 182 def __eq__(self, other): 183 common.expect_the_same_class(self, other) 184 185 return self.supply_voltage == other.supply_voltage 186 187 def __repr__(self): 188 return f'SupplyVoltage(supply_voltage={self.supply_voltage})' 189 190 191class SupplyVoltageFactory: 192 193 def parse(self, data, message_info): 194 return SupplyVoltage(struct.unpack('>H', data.read(2))[0]) 195 196 197class ChildTableEntry: 198 199 def __init__(self, timeout: int, child_id: int, mode: mle.Mode): 200 self._timeout = timeout 201 self._child_id = child_id 202 self._mode = mode 203 204 @property 205 def timeout(self): 206 return self._timeout 207 208 @property 209 def child_id(self): 210 return self._child_id 211 212 @property 213 def mode(self): 214 return self._mode 215 216 def __eq__(self, other): 217 common.expect_the_same_class(self, other) 218 219 return (self.timeout == other.timeout and self.child_id == other.child_id and self.mode == other.mode) 220 221 def __repr__(self): 222 return f'ChildTableEntry(timeout={self.timeout}, child_id={self.child_id}, mode={self.mode})' 223 224 225class ChildTable: 226 227 def __init__(self, children: List[ChildTableEntry]): 228 self._children = sorted(children, key=lambda child: child.child_id) 229 230 @property 231 def children(self): 232 return self._children 233 234 def __eq__(self, other): 235 common.expect_the_same_class(self, other) 236 237 return self.children == other.children 238 239 def __repr__(self): 240 return f'ChildTable({self.children})' 241 242 243class ChildTableFactory: 244 245 def parse(self, data, message_info): 246 children = [] 247 while message_info.length > 0: 248 timeout_and_id = struct.unpack('>H', data.read(2))[0] 249 message_info.length -= 2 250 251 timeout = (timeout_and_id & 0xf800) >> 11 252 child_id = timeout_and_id & 0x1fff 253 254 mode = mle.ModeFactory().parse(data, message_info) 255 message_info.length -= 1 256 257 children.append(ChildTableEntry(timeout, child_id, mode)) 258 return ChildTable(children) 259 260 261class ChannelPages: 262 263 def __init__(self, channel_pages: bytes): 264 self._channel_pages = channel_pages 265 266 @property 267 def channel_pages(self): 268 return self._channel_pages 269 270 def __eq__(self, other): 271 common.expect_the_same_class(self, other) 272 273 return self.channel_pages == other.channel_pages 274 275 def __repr__(self): 276 return f'ChannelPages(channel_pages={self.channel_pages})' 277 278 279class ChannelPagesFactory: 280 281 def parse(self, data, message_info): 282 return ChannelPages(data.getvalue()) 283 284 285class TypeList: 286 287 def __init__(self, tlv_types: List[int]): 288 self._tlv_types = tlv_types 289 290 @property 291 def tlv_types(self): 292 return self._tlv_types 293 294 def __eq__(self, other): 295 common.expect_the_same_class(self, other) 296 297 return self.tlv_types == other.tlv_types 298 299 def __repr__(self): 300 return f'TypeList(tlv_types={self.tlv_types})' 301 302 303class TypeListFactory: 304 305 def parse(self, data, message_info): 306 return TypeList([ord(t) for t in data.getvalue()]) 307 308 309class MaxChildTimeout: 310 311 def __init__(self, max_child_timeout: int): 312 self._max_child_timeout = max_child_timeout 313 314 @property 315 def max_child_timeout(self): 316 return self._max_child_timeout 317 318 def __eq__(self, other): 319 common.expect_the_same_class(self, other) 320 321 return self.max_child_timeout == other.max_child_timeout 322 323 def __repr__(self): 324 return f'MaxChildTimeout(max_child_timeout={self.max_child_timeout})' 325 326 327class MaxChildTimeoutFactory: 328 329 def parse(self, data, message_info): 330 return MaxChildTimeout(struct.unpack('>I', data.read(4))[0]) 331