• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import struct
31
32from enum import IntEnum
33from typing import List
34
35import common
36import ipaddress
37import mle
38
39
40class TlvType(IntEnum):
41    EXT_ADDRESS = 0
42    ADDRESS16 = 1
43    MODE = 2
44    POLLING_PERIOD = 3
45    CONNECTIVITY = 4
46    ROUTE64 = 5
47    LEADER_DATA = 6
48    NETWORK_DATA = 7
49    IPV6_ADDRESS_LIST = 8
50    MAC_COUNTERS = 9
51    BATTERY_LEVEL = 14
52    SUPPLY_VOLTAGE = 15
53    CHILD_TABLE = 16
54    CHANNEL_PAGES = 17
55    TYPE_LIST = 18
56    MAX_CHILD_TIMEOUT = 19
57
58
59class Ipv6AddressList:
60
61    def __init__(self, addresses: List[ipaddress.IPv6Address]):
62        self._addresses = addresses
63
64    @property
65    def addresses(self):
66        return self._addresses
67
68    def __eq__(self, other):
69        common.expect_the_same_class(self, other)
70        return self.addresses == other.addresses
71
72    def __repr__(self):
73        return f'Ipv6AddressList({self.addresses})'
74
75
76class Ipv6AddressListFactory:
77
78    def parse(self, data, message_info):
79        addresses = []
80        while data.tell() < message_info.length:
81            addresses.append(ipaddress.IPv6Address(data.read(16)))
82        return Ipv6AddressList(addresses)
83
84
85class MacCounters:
86
87    def __init__(self, counters: List[int]):
88        self._counters = counters
89
90    @property
91    def if_in_unknown_protos(self):
92        return self._counters[0]
93
94    @property
95    def if_in_errors(self):
96        return self._counters[1]
97
98    @property
99    def if_out_errors(self):
100        return self._counters[2]
101
102    @property
103    def if_in_ucast_pkts(self):
104        return self._counters[3]
105
106    @property
107    def if_in_broadcast_pkts(self):
108        return self._counters[4]
109
110    @property
111    def if_in_discards(self):
112        return self._counters[5]
113
114    @property
115    def if_out_ucast_pkts(self):
116        return self._counters[6]
117
118    @property
119    def if_out_broadcast_pkts(self):
120        return self._counters[7]
121
122    @property
123    def if_out_discards(self):
124        return self._counters[8]
125
126    @property
127    def counters(self):
128        return self._counters
129
130    def __eq__(self, other):
131        common.expect_the_same_class(self, other)
132
133        return self.counters == other.counters
134
135    def __repr__(self):
136        return ('MacCounters(' + f'if_in_unknown_protos={self.if_in_unknown_protos}, ' +
137                f'if_in_errors={self.if_in_errors}, ' + f'if_out_errors={self.if_out_errors}, ' +
138                f'if_in_ucast_pkts={self.if_in_ucast_pkts}, ' + f'if_in_broadcast_pkts={self.if_in_broadcast_pkts}, ' +
139                f'if_in_discards={self.if_in_discards}, ' + f'if_out_ucast_pkts={self.if_out_ucast_pkts}, ' +
140                f'if_out_broadcast_pkts={self.if_out_broadcast_pkts}, ' + f'if_out_discards={self.if_out_discards})')
141
142
143class MacCountersFactory:
144
145    def parse(self, data, message_info):
146        return MacCounters(struct.unpack('>9I', data.read(4 * 9)))
147
148
149class BatteryLevel:
150
151    def __init__(self, battery_level: int):
152        self._battery_level = battery_level
153
154    @property
155    def battery_level(self):
156        return self._battery_level
157
158    def __eq__(self, other):
159        common.expect_the_same_class(self, other)
160
161        return self.battery_level == other.battery_level
162
163    def __repr__(self):
164        return f'BatteryLevel(battery_level={self.battery_level})'
165
166
167class BatteryLevelFactory:
168
169    def parse(self, data, message_info):
170        return BatteryLevel(struct.unpack('>B', data.read(1))[0])
171
172
173class SupplyVoltage:
174
175    def __init__(self, supply_voltage: int):
176        self._supply_voltage = supply_voltage
177
178    @property
179    def supply_voltage(self):
180        return self._supply_voltage
181
182    def __eq__(self, other):
183        common.expect_the_same_class(self, other)
184
185        return self.supply_voltage == other.supply_voltage
186
187    def __repr__(self):
188        return f'SupplyVoltage(supply_voltage={self.supply_voltage})'
189
190
191class SupplyVoltageFactory:
192
193    def parse(self, data, message_info):
194        return SupplyVoltage(struct.unpack('>H', data.read(2))[0])
195
196
197class ChildTableEntry:
198
199    def __init__(self, timeout: int, child_id: int, mode: mle.Mode):
200        self._timeout = timeout
201        self._child_id = child_id
202        self._mode = mode
203
204    @property
205    def timeout(self):
206        return self._timeout
207
208    @property
209    def child_id(self):
210        return self._child_id
211
212    @property
213    def mode(self):
214        return self._mode
215
216    def __eq__(self, other):
217        common.expect_the_same_class(self, other)
218
219        return (self.timeout == other.timeout and self.child_id == other.child_id and self.mode == other.mode)
220
221    def __repr__(self):
222        return f'ChildTableEntry(timeout={self.timeout}, child_id={self.child_id}, mode={self.mode})'
223
224
225class ChildTable:
226
227    def __init__(self, children: List[ChildTableEntry]):
228        self._children = sorted(children, key=lambda child: child.child_id)
229
230    @property
231    def children(self):
232        return self._children
233
234    def __eq__(self, other):
235        common.expect_the_same_class(self, other)
236
237        return self.children == other.children
238
239    def __repr__(self):
240        return f'ChildTable({self.children})'
241
242
243class ChildTableFactory:
244
245    def parse(self, data, message_info):
246        children = []
247        while message_info.length > 0:
248            timeout_and_id = struct.unpack('>H', data.read(2))[0]
249            message_info.length -= 2
250
251            timeout = (timeout_and_id & 0xf800) >> 11
252            child_id = timeout_and_id & 0x1fff
253
254            mode = mle.ModeFactory().parse(data, message_info)
255            message_info.length -= 1
256
257            children.append(ChildTableEntry(timeout, child_id, mode))
258        return ChildTable(children)
259
260
261class ChannelPages:
262
263    def __init__(self, channel_pages: bytes):
264        self._channel_pages = channel_pages
265
266    @property
267    def channel_pages(self):
268        return self._channel_pages
269
270    def __eq__(self, other):
271        common.expect_the_same_class(self, other)
272
273        return self.channel_pages == other.channel_pages
274
275    def __repr__(self):
276        return f'ChannelPages(channel_pages={self.channel_pages})'
277
278
279class ChannelPagesFactory:
280
281    def parse(self, data, message_info):
282        return ChannelPages(data.getvalue())
283
284
285class TypeList:
286
287    def __init__(self, tlv_types: List[int]):
288        self._tlv_types = tlv_types
289
290    @property
291    def tlv_types(self):
292        return self._tlv_types
293
294    def __eq__(self, other):
295        common.expect_the_same_class(self, other)
296
297        return self.tlv_types == other.tlv_types
298
299    def __repr__(self):
300        return f'TypeList(tlv_types={self.tlv_types})'
301
302
303class TypeListFactory:
304
305    def parse(self, data, message_info):
306        return TypeList([ord(t) for t in data.getvalue()])
307
308
309class MaxChildTimeout:
310
311    def __init__(self, max_child_timeout: int):
312        self._max_child_timeout = max_child_timeout
313
314    @property
315    def max_child_timeout(self):
316        return self._max_child_timeout
317
318    def __eq__(self, other):
319        common.expect_the_same_class(self, other)
320
321        return self.max_child_timeout == other.max_child_timeout
322
323    def __repr__(self):
324        return f'MaxChildTimeout(max_child_timeout={self.max_child_timeout})'
325
326
327class MaxChildTimeoutFactory:
328
329    def parse(self, data, message_info):
330        return MaxChildTimeout(struct.unpack('>I', data.read(4))[0])
331