• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import ipaddress
31import struct
32from binascii import hexlify
33from enum import IntEnum
34
35import common
36
37
38class TlvType(IntEnum):
39    TARGET_EID = 0
40    MAC_EXTENDED_ADDRESS = 1
41    RLOC16 = 2
42    ML_EID = 3
43    STATUS = 4
44    TIME_SINCE_LAST_TRANSACTION = 6
45    ROUTER_MASK = 7
46    ND_OPTION = 8
47    ND_DATA = 9
48    THREAD_NETWORK_DATA = 10
49    MLE_ROUTING = 11
50    IPv6_ADDRESSES = 14
51    XTAL_ACCURACY = 254
52
53
54class StatusValues(IntEnum):
55    SUCCESS = 0
56    NO_ADDRESS_AVAILABLE = 1
57    TOO_FEW_ROUTERS = 2
58    HAVE_CHILD_ID_REQUEST = 3
59    PARENT_PARTITION_CHANGE = 4
60
61
62class TargetEid(object):
63
64    def __init__(self, eid):
65        self._eid = eid
66
67    @property
68    def eid(self):
69        return self._eid
70
71    def __eq__(self, other):
72        common.expect_the_same_class(self, other)
73        return self.eid == other.eid
74
75    def __repr__(self):
76        return "TargetEid(eid={})".format(hexlify(self.eid))
77
78
79class TargetEidFactory(object):
80
81    def parse(self, data, message_info):
82        eid = bytearray(data.read(16))
83
84        return TargetEid(eid)
85
86
87class MacExtendedAddress(object):
88
89    def __init__(self, mac_address):
90        self._mac_address = mac_address
91
92    @property
93    def mac_address(self):
94        return self._mac_address
95
96    def __eq__(self, other):
97        common.expect_the_same_class(self, other)
98        return self.mac_address == other.mac_address
99
100    def __repr__(self):
101        return "MacExtendedAddress(mac_address={})".format(hexlify(self.mac_address))
102
103
104class MacExtendedAddressFactory(object):
105
106    def parse(self, data, message_info):
107        mac_address = bytearray(data.read(8))
108
109        return MacExtendedAddress(mac_address)
110
111
112class Rloc16(object):
113
114    def __init__(self, rloc16):
115        self._rloc16 = rloc16
116
117    @property
118    def rloc16(self):
119        return self._rloc16
120
121    def __eq__(self, other):
122        common.expect_the_same_class(self, other)
123        return self.rloc16 == other.rloc16
124
125    def __repr__(self):
126        return "Rloc16(rloc16={})".format(hex(self.rloc16))
127
128
129class Rloc16Factory(object):
130
131    def parse(self, data, message_info):
132        rloc16 = struct.unpack(">H", data.read(2))[0]
133
134        return Rloc16(rloc16)
135
136
137class MlEid(object):
138
139    def __init__(self, ml_eid):
140        self._ml_eid = ml_eid
141
142    @property
143    def ml_eid(self):
144        return self._ml_eid
145
146    def __eq__(self, other):
147        common.expect_the_same_class(self, other)
148        return self.ml_eid == other.ml_eid
149
150    def __repr__(self):
151        return "MlEid(ml_eid={})".format(hexlify(self.ml_eid))
152
153
154class MlEidFactory(object):
155
156    def parse(self, data, message_info):
157        ml_eid = bytearray(data.read(8))
158
159        return MlEid(ml_eid)
160
161
162class Status(object):
163
164    def __init__(self, status):
165        self._status = status
166
167    @property
168    def status(self):
169        return self._status
170
171    def __eq__(self, other):
172        common.expect_the_same_class(self, other)
173        return self.status == other.status
174
175    def __repr__(self):
176        return "Status(status={})".format(self.status)
177
178
179class StatusFactory(object):
180
181    def parse(self, data, message_info):
182        status = StatusValues(ord(data.read(1)))
183
184        return Status(status)
185
186
187class TimeSinceLastTransaction(object):
188
189    def __init__(self, seconds):
190        self._seconds = seconds
191
192    @property
193    def seconds(self):
194        return self._seconds
195
196    def __eq__(self, other):
197        common.expect_the_same_class(self, other)
198        return self.seconds == other.seconds
199
200    def __repr__(self):
201        return "TimeSinceLastTransaction(seconds={})".format(self.seconds)
202
203
204class TimeSinceLastTransactionFactory(object):
205
206    def parse(self, data, message_info):
207        seconds = struct.unpack(">L", data.read(4))[0]
208
209        return TimeSinceLastTransaction(seconds)
210
211
212class RouterMask(object):
213
214    def __init__(self, id_sequence, router_id_mask):
215        self._id_sequence = id_sequence
216        self._router_id_mask = router_id_mask
217
218    @property
219    def id_sequence(self):
220        return self._id_sequence
221
222    @property
223    def router_id_mask(self):
224        return self._router_id_mask
225
226    def __eq__(self, other):
227        common.expect_the_same_class(self, other)
228        return (self.id_sequence == other.id_sequence and self.router_id_mask == other.router_id_mask)
229
230    def __repr__(self):
231        return "RouterMask(id_sequence={}, router_id_mask={})".format(self.id_sequence, hex(self.router_id_mask))
232
233
234class RouterMaskFactory(object):
235
236    def parse(self, data, message_info):
237        id_sequence = ord(data.read(1))
238        router_id_mask = struct.unpack(">Q", data.read(8))[0]
239
240        return RouterMask(id_sequence, router_id_mask)
241
242
243class NdOption(object):
244
245    def __init__(self, options):
246        self._options = options
247
248    @property
249    def options(self):
250        return self._options
251
252    def __eq__(self, other):
253        common.expect_the_same_class(self, other)
254        return self.options == other.options
255
256    def __repr__(self):
257        return "NdOption(options=[{}])".format(", ".join([str(opt) for opt in self.options]))
258
259
260class NdOptionFactory(object):
261
262    def parse(self, data, message_info):
263        options = [opt for opt in bytearray(data.read())]
264        return NdOption(options)
265
266
267class NdData(object):
268    # TODO: Not implemented yet
269    pass
270
271
272class NdDataFactory(object):
273    # TODO: Not implemented yet
274
275    def parse(self, data, message_info):
276        raise NotImplementedError("TODO: Not implemented yet")
277
278
279class XtalAccuracy:
280    # TODO: Not implemented yet
281
282    def __init__(self):
283        print("XtalAccuracy is not implemented yet.")
284
285
286class XtalAccuracyFactory:
287
288    def parse(self, data, message_info):
289        return XtalAccuracy()
290
291
292class ThreadNetworkData(object):
293
294    def __init__(self, tlvs):
295        self._tlvs = tlvs
296
297    @property
298    def tlvs(self):
299        return self._tlvs
300
301    def __eq__(self, other):
302        common.expect_the_same_class(self, other)
303        return self.tlvs == other.tlvs
304
305    def __repr__(self):
306        return "ThreadNetworkData(tlvs=[{}])".format(", ".join([str(tlv) for tlv in self.tlvs]))
307
308
309class ThreadNetworkDataFactory(object):
310
311    def __init__(self, network_data_tlvs_factory):
312        self._network_data_tlvs_factory = network_data_tlvs_factory
313
314    def parse(self, data, message_info):
315        tlvs = self._network_data_tlvs_factory.parse(data, message_info)
316        return ThreadNetworkData(tlvs)
317
318
319class IPv6Addresses(list):
320    pass
321
322
323class IPv6AddressesFactory(object):
324
325    def parse(self, data, message_info):
326        data = bytes(data.read())
327        assert len(data) % 16 == 0, data
328        addrs = IPv6Addresses()
329        for i in range(0, len(data), 16):
330            addr = ipaddress.IPv6Address(data[i:i + 16])
331            addrs.append(addr)
332
333        return addrs
334