1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import struct 31 32from binascii import hexlify 33from enum import IntEnum 34 35import ipaddress 36 37# Map of 2 bits of parent priority. 38pp_map = {1: 1, 0: 0, 3: -1, 2: -2} 39 40UDP_TEST_PORT = 12345 41 42 43# Get the signed parent priority from the byte that parent priority is in. 44def map_pp(pp_byte): 45 return pp_map[((pp_byte & 0xC0) >> 6)] 46 47 48def expect_the_same_class(self, other): 49 if not isinstance(other, self.__class__): 50 raise TypeError("Expected the same class. Got {} and {}".format(type(self), type(other))) 51 52 53class MessageInfo(object): 54 55 def __init__(self): 56 self.aux_sec_hdr = None 57 self.aux_sec_hdr_bytes = None 58 59 self.mhr_bytes = None 60 self.extra_open_fields = None 61 62 self.source_mac_address = None 63 self.destination_mac_address = None 64 65 self._source_ipv6 = None 66 self._destination_ipv6 = None 67 68 self._src_port = None 69 self._dst_port = None 70 71 self.stable = None 72 self.payload_length = 0 73 74 def _convert_value_to_ip_address(self, value): 75 if isinstance(value, bytearray): 76 value = bytes(value) 77 78 return ipaddress.ip_address(value) 79 80 @property 81 def source_ipv6(self): 82 return self._source_ipv6 83 84 @source_ipv6.setter 85 def source_ipv6(self, value): 86 self._source_ipv6 = self._convert_value_to_ip_address(value) 87 88 @property 89 def destination_ipv6(self): 90 return self._destination_ipv6 91 92 @destination_ipv6.setter 93 def destination_ipv6(self, value): 94 self._destination_ipv6 = self._convert_value_to_ip_address(value) 95 96 @property 97 def src_port(self): 98 return self._src_port 99 100 @src_port.setter 101 def src_port(self, value): 102 self._src_port = value 103 104 @property 105 def dst_port(self): 106 return self._dst_port 107 108 @dst_port.setter 109 def dst_port(self, value): 110 self._dst_port = value 111 112 113class MacAddressType(IntEnum): 114 SHORT = 0 115 LONG = 1 116 117 118class MacAddress(object): 119 120 def __init__(self, mac_address, _type, big_endian=True): 121 if _type == MacAddressType.SHORT: 122 length = 2 123 elif _type == MacAddressType.LONG: 124 length = 8 125 126 if not big_endian: 127 mac_address = mac_address[::-1] 128 129 self._mac_address = bytearray(mac_address[:length]) 130 self._type = _type 131 132 @property 133 def type(self): 134 return self._type 135 136 @property 137 def mac_address(self): 138 return self._mac_address 139 140 @property 141 def rloc(self): 142 return struct.unpack(">H", self._mac_address)[0] 143 144 def convert_to_iid(self): 145 if self._type == MacAddressType.SHORT: 146 return (bytearray([0x00, 0x00, 0x00, 0xff, 0xfe, 0x00]) + self._mac_address[:2]) 147 elif self._type == MacAddressType.LONG: 148 return (bytearray([self._mac_address[0] ^ 0x02]) + self._mac_address[1:]) 149 else: 150 raise RuntimeError("Could not convert to IID. Invalid MAC address type: {}".format(self._type)) 151 152 @classmethod 153 def from_eui64(cls, eui64, big_endian=True): 154 if not isinstance(eui64, bytearray): 155 raise RuntimeError("Could not create MAC address from EUI64. Invalid data type: {}".format(type(eui64))) 156 157 return cls(eui64, MacAddressType.LONG) 158 159 @classmethod 160 def from_rloc16(cls, rloc16, big_endian=True): 161 if isinstance(rloc16, int): 162 mac_address = struct.pack(">H", rloc16) 163 elif isinstance(rloc16, bytearray): 164 mac_address = rloc16[:2] 165 else: 166 raise RuntimeError("Could not create MAC address from RLOC16. Invalid data type: {}".format(type(rloc16))) 167 168 return cls(mac_address, MacAddressType.SHORT) 169 170 def __eq__(self, other): 171 return (self.type == other.type) and (self.mac_address == other.mac_address) 172 173 def __repr__(self): 174 return "MacAddress(mac_address=b'{}', type={})".format(hexlify(self.mac_address), MacAddressType(self._type)) 175