1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import hmac 31import hashlib 32import struct 33 34from binascii import hexlify 35 36from Crypto.Cipher import AES 37 38 39class CryptoEngine: 40 """ Class responsible for encryption and decryption of data. """ 41 42 def __init__(self, crypto_material_creator): 43 """ 44 Args: 45 network_key (bytearray) 46 47 """ 48 self._crypto_material_creator = crypto_material_creator 49 50 @property 51 def mic_length(self): 52 return self._crypto_material_creator.mic_length 53 54 def encrypt(self, data, message_info): 55 """ Encrypt message. 56 57 Args: 58 data (bytearray) 59 message_info (MessageInfo) 60 61 Returns: 62 tuple: Encrypted message (bytearray), MIC (bytearray) 63 64 """ 65 key, nonce, auth_data = self._crypto_material_creator.create_key_and_nonce_and_authenticated_data(message_info) 66 67 cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=self.mic_length) 68 cipher.update(auth_data) 69 70 return cipher.encrypt_and_digest(bytes(data)) 71 72 def decrypt(self, enc_data, mic, message_info): 73 """ Decrypt MLE message. 74 75 Args: 76 enc_data (bytearray) 77 mic (bytearray) 78 message_info (MessageInfo) 79 80 Returns: 81 bytearray: Decrypted message. 82 83 """ 84 key, nonce, auth_data = self._crypto_material_creator.create_key_and_nonce_and_authenticated_data(message_info) 85 86 cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=self.mic_length) 87 cipher.update(auth_data) 88 89 dec_data = cipher.decrypt_and_verify(bytes(enc_data), bytes(mic)) 90 return bytearray(dec_data) 91 92 93class CryptoMaterialCreator(object): 94 95 _salt = b'Thread' 96 97 def __init__(self, network_key): 98 """ 99 Args: 100 network_key (bytearray) 101 102 """ 103 self.network_key = network_key 104 105 def _generate_keys(self, sequence_counter): 106 """ Generate MLE and MAC keys. 107 108 Read more: 7.1.4. Key Generation - Thread v1.1 Specification Final 109 110 Args: 111 sequence_counter (int) 112 113 Returns: 114 tuple: MLE and MAC as bytes 115 116 """ 117 k = self.network_key 118 s = struct.pack(">L", sequence_counter) + self._salt 119 d = hmac.new(k, s, digestmod=hashlib.sha256).digest() 120 121 mle = d[:16] 122 mac = d[16:] 123 return mle, mac 124 125 def create_key_and_nonce_and_authenticated_data(self, message_info): 126 raise NotImplementedError 127 128 @property 129 def mic_length(self): 130 raise NotImplementedError 131 132 133class MacCryptoMaterialCreator(CryptoMaterialCreator): 134 135 def __init__(self, network_key): 136 """ 137 Args: 138 network_key (bytearray) 139 140 """ 141 super(MacCryptoMaterialCreator, self).__init__(network_key) 142 143 def _create_nonce(self, eui64, frame_counter, security_level): 144 """ Create CCM Nonce required by AES-128 CCM for encryption and decryption. 145 146 Read more: 7.6.3.2 CCM Nonce - Std 802.15.4-2006 147 148 Args: 149 eui64 (bytes) 150 frame_counter (int) 151 security_level (int) 152 153 Returns: 154 bytes: created Nonce 155 156 """ 157 return bytes(eui64 + struct.pack(">LB", frame_counter, security_level)) 158 159 def _create_authenticated_data(self, mhr, auxiliary_security_header, extra_open_fields): 160 """ Create Authenticated Data 161 162 Read more: 7.6.3.3 CCM prerequisites - Std 802.15.4-2006 163 164 Args: 165 mhr (bytes) 166 auxiliary_security_header (bytes) 167 extra_open_fields (bytes) 168 169 Returns: 170 bytes: Authenticated Data 171 172 """ 173 return bytes(mhr + auxiliary_security_header + extra_open_fields) 174 175 def create_key_and_nonce_and_authenticated_data(self, message_info): 176 _, mac_key = self._generate_keys(message_info.aux_sec_hdr.sequence_counter) 177 178 nonce = self._create_nonce( 179 message_info.source_mac_address, 180 message_info.aux_sec_hdr.frame_counter, 181 message_info.aux_sec_hdr.security_level, 182 ) 183 184 auth_data = self._create_authenticated_data( 185 message_info.mhr_bytes, 186 message_info.aux_sec_hdr_bytes, 187 message_info.extra_open_fields, 188 ) 189 190 return mac_key, nonce, auth_data 191 192 @property 193 def mic_length(self): 194 return 4 195 196 197class MleCryptoMaterialCreator(CryptoMaterialCreator): 198 199 def __init__(self, network_key): 200 """ 201 Args: 202 network_key (bytearray) 203 204 """ 205 super(MleCryptoMaterialCreator, self).__init__(network_key) 206 207 def _create_nonce(self, source_eui64, frame_counter, security_level): 208 """ Create CCM Nonce required by AES-128 CCM for encryption and decryption. 209 210 Read more: 7.6.3.2 CCM Nonce - Std 802.15.4-2006 211 212 Args: 213 eui64 (bytearray) 214 frame_counter (int) 215 security_level (int) 216 217 Returns: 218 bytes: created Nonce 219 220 """ 221 return bytes(source_eui64[:8] + struct.pack(">LB", frame_counter, security_level)) 222 223 def _create_authenticated_data(self, source_address, destination_address, auxiliary_security_header): 224 """ Create Authenticated Data 225 226 Read more: 4.8 - Thread v1.0 Specification 227 228 Args: 229 source_address (ip_address) 230 destination_address (ip_address) 231 auxiliary_security_header (bytearray) 232 233 Returns: 234 bytes: Authenticated Data 235 236 """ 237 return bytes(source_address.packed + destination_address.packed + auxiliary_security_header) 238 239 def create_key_and_nonce_and_authenticated_data(self, message_info): 240 mle_key, _ = self._generate_keys(message_info.aux_sec_hdr.sequence_counter) 241 242 nonce = self._create_nonce( 243 message_info.source_mac_address.mac_address, 244 message_info.aux_sec_hdr.frame_counter, 245 message_info.aux_sec_hdr.security_level, 246 ) 247 248 auth_data = self._create_authenticated_data( 249 message_info.source_ipv6, 250 message_info.destination_ipv6, 251 message_info.aux_sec_hdr_bytes, 252 ) 253 254 return mle_key, nonce, auth_data 255 256 @property 257 def mic_length(self): 258 return 4 259 260 261class AuxiliarySecurityHeader: 262 263 def __init__( 264 self, 265 key_id_mode, 266 security_level, 267 frame_counter, 268 key_id, 269 big_endian=True, 270 ): 271 """ 272 Args: 273 key_id_mode (int) 274 security_level (int) 275 frame_counter (int) 276 key_id (bytearray) 277 """ 278 self._key_id_mode = key_id_mode 279 self._security_level = security_level 280 self._frame_counter = frame_counter 281 self._key_id = key_id 282 self._big_endian = big_endian 283 284 @property 285 def sequence_counter(self): 286 """ Compute or extract sequence counter based on currently set Key Index Mode. """ 287 288 if self.key_id_mode == 0: 289 key_source = self.key_id[:8] 290 format = ">Q" if self._big_endian else "<Q" 291 elif self.key_id_mode == 1: 292 # Try to guess valid Key Sequence Counter based on Key Index. This 293 # one should work for now. 294 return self.key_index - 1 295 elif self.key_id_mode == 2: 296 # In this mode sequence counter is stored on the first four bytes 297 # of Key ID. 298 key_source = self.key_id[:4] 299 format = ">I" if self._big_endian else "<I" 300 else: 301 raise ValueError("Unsupported Key Index Mode: {}".format(self.key_id_mode)) 302 303 return struct.unpack(format, key_source)[0] 304 305 @property 306 def key_index(self): 307 return struct.unpack(">B", self.key_id[-1:])[0] 308 309 @property 310 def key_id_mode(self): 311 return self._key_id_mode 312 313 @property 314 def security_level(self): 315 return self._security_level 316 317 @property 318 def frame_counter(self): 319 return self._frame_counter 320 321 @property 322 def key_id(self): 323 return self._key_id 324 325 def __repr__(self): 326 return "AuxiliarySecurityHeader(key_id_mode={}, security_level={}, frame_counter={}, key_id={})".format( 327 self.key_id_mode, 328 self.security_level, 329 self.frame_counter, 330 hexlify(self.key_id), 331 ) 332 333 334class AuxiliarySecurityHeaderFactory: 335 336 _SECURITY_CONTROL_LENGTH = 1 337 _FRAME_COUNTER_LENGTH = 4 338 339 _KEY_ID_LENGTH_KEY_ID_0 = 0 340 _KEY_ID_LENGTH_KEY_ID_1 = 1 341 _KEY_ID_LENGTH_KEY_ID_2 = 5 342 _KEY_ID_LENGTH_KEY_ID_3 = 9 343 344 _key_id_lengths = { 345 0: _KEY_ID_LENGTH_KEY_ID_0, 346 1: _KEY_ID_LENGTH_KEY_ID_1, 347 2: _KEY_ID_LENGTH_KEY_ID_2, 348 3: _KEY_ID_LENGTH_KEY_ID_3, 349 } 350 351 def _parse_security_control(self, security_control_byte): 352 security_level = security_control_byte & 0x07 353 key_id_mode = (security_control_byte >> 3) & 0x03 354 355 return security_level, key_id_mode 356 357 def _parse_frame_counter(self, frame_counter_bytes): 358 return struct.unpack("<I", frame_counter_bytes)[0] 359 360 def _key_id_length(self, key_id_mode): 361 return self._key_id_lengths[key_id_mode] 362 363 def parse(self, data, message_info): 364 security_control_bytes = bytearray(data.read(self._SECURITY_CONTROL_LENGTH)) 365 frame_counter_bytes = bytearray(data.read(self._FRAME_COUNTER_LENGTH)) 366 367 security_level, key_id_mode = self._parse_security_control(security_control_bytes[0]) 368 frame_counter = self._parse_frame_counter(frame_counter_bytes) 369 370 key_id_length = self._key_id_length(key_id_mode) 371 key_id_bytes = bytearray(data.read(key_id_length)) 372 373 aux_sec_hdr = AuxiliarySecurityHeader(key_id_mode, security_level, frame_counter, key_id_bytes) 374 375 message_info.aux_sec_hdr_bytes = (security_control_bytes + frame_counter_bytes + key_id_bytes) 376 message_info.aux_sec_hdr = aux_sec_hdr 377 378 return aux_sec_hdr 379