• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import struct
31
32from binascii import hexlify
33from enum import IntEnum
34
35import ipaddress
36
37# Map of 2 bits of parent priority.
38pp_map = {1: 1, 0: 0, 3: -1, 2: -2}
39
40UDP_TEST_PORT = 12345
41
42
43# Get the signed parent priority from the byte that parent priority is in.
44def map_pp(pp_byte):
45    return pp_map[((pp_byte & 0xC0) >> 6)]
46
47
48def expect_the_same_class(self, other):
49    if not isinstance(other, self.__class__):
50        raise TypeError("Expected the same class. Got {} and {}".format(type(self), type(other)))
51
52
53class MessageInfo(object):
54
55    def __init__(self):
56        self.aux_sec_hdr = None
57        self.aux_sec_hdr_bytes = None
58
59        self.mhr_bytes = None
60        self.extra_open_fields = None
61
62        self.source_mac_address = None
63        self.destination_mac_address = None
64
65        self._source_ipv6 = None
66        self._destination_ipv6 = None
67
68        self._src_port = None
69        self._dst_port = None
70
71        self.stable = None
72        self.payload_length = 0
73
74    def _convert_value_to_ip_address(self, value):
75        if isinstance(value, bytearray):
76            value = bytes(value)
77
78        return ipaddress.ip_address(value)
79
80    @property
81    def source_ipv6(self):
82        return self._source_ipv6
83
84    @source_ipv6.setter
85    def source_ipv6(self, value):
86        self._source_ipv6 = self._convert_value_to_ip_address(value)
87
88    @property
89    def destination_ipv6(self):
90        return self._destination_ipv6
91
92    @destination_ipv6.setter
93    def destination_ipv6(self, value):
94        self._destination_ipv6 = self._convert_value_to_ip_address(value)
95
96    @property
97    def src_port(self):
98        return self._src_port
99
100    @src_port.setter
101    def src_port(self, value):
102        self._src_port = value
103
104    @property
105    def dst_port(self):
106        return self._dst_port
107
108    @dst_port.setter
109    def dst_port(self, value):
110        self._dst_port = value
111
112
113class MacAddressType(IntEnum):
114    SHORT = 0
115    LONG = 1
116
117
118class MacAddress(object):
119
120    def __init__(self, mac_address, _type, big_endian=True):
121        if _type == MacAddressType.SHORT:
122            length = 2
123        elif _type == MacAddressType.LONG:
124            length = 8
125
126        if not big_endian:
127            mac_address = mac_address[::-1]
128
129        self._mac_address = bytearray(mac_address[:length])
130        self._type = _type
131
132    @property
133    def type(self):
134        return self._type
135
136    @property
137    def mac_address(self):
138        return self._mac_address
139
140    @property
141    def rloc(self):
142        return struct.unpack(">H", self._mac_address)[0]
143
144    def convert_to_iid(self):
145        if self._type == MacAddressType.SHORT:
146            return (bytearray([0x00, 0x00, 0x00, 0xff, 0xfe, 0x00]) + self._mac_address[:2])
147        elif self._type == MacAddressType.LONG:
148            return (bytearray([self._mac_address[0] ^ 0x02]) + self._mac_address[1:])
149        else:
150            raise RuntimeError("Could not convert to IID. Invalid MAC address type: {}".format(self._type))
151
152    @classmethod
153    def from_eui64(cls, eui64, big_endian=True):
154        if not isinstance(eui64, bytearray):
155            raise RuntimeError("Could not create MAC address from EUI64. Invalid data type: {}".format(type(eui64)))
156
157        return cls(eui64, MacAddressType.LONG)
158
159    @classmethod
160    def from_rloc16(cls, rloc16, big_endian=True):
161        if isinstance(rloc16, int):
162            mac_address = struct.pack(">H", rloc16)
163        elif isinstance(rloc16, bytearray):
164            mac_address = rloc16[:2]
165        else:
166            raise RuntimeError("Could not create MAC address from RLOC16. Invalid data type: {}".format(type(rloc16)))
167
168        return cls(mac_address, MacAddressType.SHORT)
169
170    def __eq__(self, other):
171        return (self.type == other.type) and (self.mac_address == other.mac_address)
172
173    def __repr__(self):
174        return "MacAddress(mac_address=b'{}', type={})".format(hexlify(self.mac_address), MacAddressType(self._type))
175