1#!/usr/bin/env python3 2# 3# Copyright (c) 2017-2018, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import binascii 31 32import ipaddress 33import ipv6 34import network_data 35import network_layer 36import common 37import config 38import mesh_cop 39import mle 40 41from enum import IntEnum 42 43 44class CheckType(IntEnum): 45 CONTAIN = 0 46 NOT_CONTAIN = 1 47 OPTIONAL = 2 48 49 50class NetworkDataCheckType: 51 PREFIX_CNT = 1 52 PREFIX_CONTENT = 2 53 54 55def check_address_query(command_msg, source_node, destination_address): 56 """Verify source_node sent a properly formatted Address Query Request message to the destination_address. 57 """ 58 command_msg.assertCoapMessageContainsTlv(network_layer.TargetEid) 59 60 source_rloc = source_node.get_ip6_address(config.ADDRESS_TYPE.RLOC) 61 assert (ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address), ( 62 "Error: The IPv6 source address is not the RLOC of the originator. The source node's rloc is: " + 63 str(ipv6.ip_address(source_rloc)) + ", but the source_address in command msg is: " + 64 str(command_msg.ipv6_packet.ipv6_header.source_address)) 65 66 if isinstance(destination_address, bytearray): 67 destination_address = bytes(destination_address) 68 69 assert (ipv6.ip_address(destination_address) == command_msg.ipv6_packet.ipv6_header.destination_address 70 ), "Error: The IPv6 destination address is not expected." 71 72 73def check_address_notification(command_msg, source_node, destination_node): 74 """Verify source_node sent a properly formatted Address Notification command message to destination_node. 75 """ 76 command_msg.assertCoapMessageRequestUriPath('/a/an') 77 command_msg.assertCoapMessageContainsTlv(network_layer.TargetEid) 78 command_msg.assertCoapMessageContainsTlv(network_layer.Rloc16) 79 command_msg.assertCoapMessageContainsTlv(network_layer.MlEid) 80 81 source_rloc = source_node.get_ip6_address(config.ADDRESS_TYPE.RLOC) 82 assert (ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address 83 ), "Error: The IPv6 source address is not the RLOC of the originator." 84 85 destination_rloc = destination_node.get_ip6_address(config.ADDRESS_TYPE.RLOC) 86 assert (ipv6.ip_address(destination_rloc) == command_msg.ipv6_packet.ipv6_header.destination_address 87 ), "Error: The IPv6 destination address is not the RLOC of the destination." 88 89 90def check_address_error_notification(command_msg, source_node, destination_address): 91 """Verify source_node sent a properly formatted Address Error Notification command message to destination_address. 92 """ 93 command_msg.assertCoapMessageRequestUriPath('/a/ae') 94 command_msg.assertCoapMessageContainsTlv(network_layer.TargetEid) 95 command_msg.assertCoapMessageContainsTlv(network_layer.MlEid) 96 97 source_rloc = source_node.get_ip6_address(config.ADDRESS_TYPE.RLOC) 98 assert (ipv6.ip_address(source_rloc) == command_msg.ipv6_packet.ipv6_header.source_address), ( 99 "Error: The IPv6 source address is not the RLOC of the originator. The source node's rloc is: " + 100 str(ipv6.ip_address(source_rloc)) + ", but the source_address in command msg is: " + 101 str(command_msg.ipv6_packet.ipv6_header.source_address)) 102 103 if isinstance(destination_address, bytearray): 104 destination_address = bytes(destination_address) 105 106 assert (ipv6.ip_address(destination_address) == command_msg.ipv6_packet.ipv6_header.destination_address), ( 107 "Error: The IPv6 destination address is not expected. The destination node's rloc is: " + 108 str(ipv6.ip_address(destination_address)) + ", but the destination_address in command msg is: " + 109 str(command_msg.ipv6_packet.ipv6_header.destination_address)) 110 111 112def check_address_solicit(command_msg, was_router): 113 command_msg.assertCoapMessageRequestUriPath('/a/as') 114 command_msg.assertCoapMessageContainsTlv(network_layer.MacExtendedAddress) 115 command_msg.assertCoapMessageContainsTlv(network_layer.Status) 116 if was_router: 117 command_msg.assertCoapMessageContainsTlv(network_layer.Rloc16) 118 else: 119 command_msg.assertMleMessageDoesNotContainTlv(network_layer.Rloc16) 120 121 122def check_address_release(command_msg, destination_node): 123 """Verify the message is a properly formatted address release destined to the given node. 124 """ 125 command_msg.assertCoapMessageRequestUriPath('/a/ar') 126 command_msg.assertCoapMessageContainsTlv(network_layer.Rloc16) 127 command_msg.assertCoapMessageContainsTlv(network_layer.MacExtendedAddress) 128 129 destination_rloc = destination_node.get_ip6_address(config.ADDRESS_TYPE.RLOC) 130 assert (ipv6.ip_address(destination_rloc) == command_msg.ipv6_packet.ipv6_header.destination_address 131 ), "Error: The destination is not RLOC address" 132 133 134def check_tlv_request_tlv(command_msg, check_type, tlv_id): 135 """Verify if TLV Request TLV contains specified TLV ID 136 """ 137 tlv_request_tlv = command_msg.get_mle_message_tlv(mle.TlvRequest) 138 139 if check_type == CheckType.CONTAIN: 140 assert (tlv_request_tlv is not None), "Error: The msg doesn't contain TLV Request TLV" 141 assert any( 142 tlv_id == tlv 143 for tlv in tlv_request_tlv.tlvs), "Error: The msg doesn't contain TLV Request TLV ID: {}".format(tlv_id) 144 145 elif check_type == CheckType.NOT_CONTAIN: 146 if tlv_request_tlv is not None: 147 assert (any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs) is 148 False), "Error: The msg contains TLV Request TLV ID: {}".format(tlv_id) 149 150 elif check_type == CheckType.OPTIONAL: 151 if tlv_request_tlv is not None: 152 if any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs): 153 print("TLV Request TLV contains TLV ID: {}".format(tlv_id)) 154 else: 155 print("TLV Request TLV doesn't contain TLV ID: {}".format(tlv_id)) 156 else: 157 print("The msg doesn't contain TLV Request TLV") 158 159 else: 160 raise ValueError("Invalid check type") 161 162 163def check_link_request( 164 command_msg, 165 source_address=CheckType.OPTIONAL, 166 leader_data=CheckType.OPTIONAL, 167 tlv_request_address16=CheckType.OPTIONAL, 168 tlv_request_route64=CheckType.OPTIONAL, 169 tlv_request_link_margin=CheckType.OPTIONAL, 170): 171 """Verify a properly formatted Link Request command message. 172 """ 173 command_msg.assertMleMessageContainsTlv(mle.Challenge) 174 command_msg.assertMleMessageContainsTlv(mle.Version) 175 176 check_mle_optional_tlv(command_msg, source_address, mle.SourceAddress) 177 check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData) 178 179 check_tlv_request_tlv(command_msg, tlv_request_address16, mle.TlvType.ADDRESS16) 180 check_tlv_request_tlv(command_msg, tlv_request_route64, mle.TlvType.ROUTE64) 181 check_tlv_request_tlv(command_msg, tlv_request_link_margin, mle.TlvType.LINK_MARGIN) 182 183 184def check_link_accept( 185 command_msg, 186 destination_node, 187 leader_data=CheckType.OPTIONAL, 188 link_margin=CheckType.OPTIONAL, 189 mle_frame_counter=CheckType.OPTIONAL, 190 challenge=CheckType.OPTIONAL, 191 address16=CheckType.OPTIONAL, 192 route64=CheckType.OPTIONAL, 193 tlv_request_link_margin=CheckType.OPTIONAL, 194): 195 """verify a properly formatted link accept command message. 196 """ 197 command_msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter) 198 command_msg.assertMleMessageContainsTlv(mle.SourceAddress) 199 command_msg.assertMleMessageContainsTlv(mle.Response) 200 command_msg.assertMleMessageContainsTlv(mle.Version) 201 202 check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData) 203 check_mle_optional_tlv(command_msg, link_margin, mle.LinkMargin) 204 check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter) 205 check_mle_optional_tlv(command_msg, challenge, mle.Challenge) 206 check_mle_optional_tlv(command_msg, address16, mle.Address16) 207 check_mle_optional_tlv(command_msg, route64, mle.Route64) 208 209 check_tlv_request_tlv(command_msg, tlv_request_link_margin, mle.TlvType.LINK_MARGIN) 210 211 destination_link_local = destination_node.get_ip6_address(config.ADDRESS_TYPE.LINK_LOCAL) 212 assert (ipv6.ip_address(destination_link_local) == command_msg.ipv6_packet.ipv6_header.destination_address 213 ), "Error: The destination is unexpected" 214 215 216def check_icmp_path(sniffer, path, nodes, icmp_type=ipv6.ICMP_ECHO_REQUEST): 217 """Verify icmp message is forwarded along the path. 218 """ 219 len_path = len(path) 220 221 # Verify icmp message is forwarded to the next node of the path. 222 for i in range(0, len_path): 223 node_msg = sniffer.get_messages_sent_by(path[i]) 224 node_icmp_msg = node_msg.get_icmp_message(icmp_type) 225 226 if i < len_path - 1: 227 next_node = nodes[path[i + 1]] 228 next_node_rloc16 = next_node.get_addr16() 229 assert (next_node_rloc16 == node_icmp_msg.mac_header.dest_address.rloc), "Error: The path is unexpected." 230 else: 231 return True 232 233 return False 234 235 236def check_id_set(command_msg, router_id): 237 """Check the command_msg's Route64 tlv to verify router_id is an active router. 238 """ 239 tlv = command_msg.assertMleMessageContainsTlv(mle.Route64) 240 return (tlv.router_id_mask >> (63 - router_id)) & 1 241 242 243def get_routing_cost(command_msg, router_id): 244 """Check the command_msg's Route64 tlv to get the routing cost to router. 245 """ 246 tlv = command_msg.assertMleMessageContainsTlv(mle.Route64) 247 248 # Get router's mask pos 249 # Turn the number into binary string. Need to consider the preceding 0 250 # omitted during conversion. 251 router_id_mask_str = bin(tlv.router_id_mask).replace('0b', '') 252 prefix_len = 64 - len(router_id_mask_str) 253 routing_entry_pos = 0 254 255 for i in range(0, router_id - prefix_len): 256 if router_id_mask_str[i] == '1': 257 routing_entry_pos += 1 258 259 assert router_id_mask_str[router_id - prefix_len] == '1', \ 260 (("Error: The router isn't in the topology. \n", 261 "route64 tlv is: %s. \nrouter_id is: %s. \nrouting_entry_pos is: %s. \nrouter_id_mask_str is: %s.") % 262 (tlv, router_id, routing_entry_pos, router_id_mask_str)) 263 264 return tlv.link_quality_and_route_data[routing_entry_pos].route 265 266 267def check_mle_optional_tlv(command_msg, type, tlv): 268 if type == CheckType.CONTAIN: 269 command_msg.assertMleMessageContainsTlv(tlv) 270 elif type == CheckType.NOT_CONTAIN: 271 command_msg.assertMleMessageDoesNotContainTlv(tlv) 272 elif type == CheckType.OPTIONAL: 273 command_msg.assertMleMessageContainsOptionalTlv(tlv) 274 else: 275 raise ValueError("Invalid check type") 276 277 278def check_mle_advertisement(command_msg): 279 command_msg.assertSentWithHopLimit(255) 280 command_msg.assertSentToDestinationAddress(config.LINK_LOCAL_ALL_NODES_ADDRESS) 281 command_msg.assertMleMessageContainsTlv(mle.SourceAddress) 282 command_msg.assertMleMessageContainsTlv(mle.LeaderData) 283 command_msg.assertMleMessageContainsTlv(mle.Route64) 284 285 286def check_parent_request(command_msg, is_first_request): 287 """Verify a properly formatted Parent Request command message. 288 """ 289 if command_msg.mle.aux_sec_hdr.key_id_mode != 0x2: 290 raise ValueError("The Key Identifier Mode of the Security Control Field SHALL be set to 0x02") 291 292 command_msg.assertSentWithHopLimit(255) 293 command_msg.assertSentToDestinationAddress(config.LINK_LOCAL_ALL_ROUTERS_ADDRESS) 294 command_msg.assertMleMessageContainsTlv(mle.Mode) 295 command_msg.assertMleMessageContainsTlv(mle.Challenge) 296 command_msg.assertMleMessageContainsTlv(mle.Version) 297 scan_mask = command_msg.assertMleMessageContainsTlv(mle.ScanMask) 298 if not scan_mask.router: 299 raise ValueError("Parent request without R bit set") 300 if is_first_request: 301 if scan_mask.end_device: 302 raise ValueError("First parent request with E bit set") 303 elif not scan_mask.end_device: 304 raise ValueError("Second parent request without E bit set") 305 306 307def check_parent_response(command_msg, mle_frame_counter=CheckType.OPTIONAL): 308 """Verify a properly formatted Parent Response command message. 309 """ 310 command_msg.assertMleMessageContainsTlv(mle.Challenge) 311 command_msg.assertMleMessageContainsTlv(mle.Connectivity) 312 command_msg.assertMleMessageContainsTlv(mle.LeaderData) 313 command_msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter) 314 command_msg.assertMleMessageContainsTlv(mle.LinkMargin) 315 command_msg.assertMleMessageContainsTlv(mle.Response) 316 command_msg.assertMleMessageContainsTlv(mle.SourceAddress) 317 command_msg.assertMleMessageContainsTlv(mle.Version) 318 319 check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter) 320 321 322def check_child_id_request( 323 command_msg, 324 tlv_request=CheckType.OPTIONAL, 325 mle_frame_counter=CheckType.OPTIONAL, 326 address_registration=CheckType.OPTIONAL, 327 active_timestamp=CheckType.OPTIONAL, 328 pending_timestamp=CheckType.OPTIONAL, 329 route64=CheckType.OPTIONAL, 330): 331 """Verify a properly formatted Child Id Request command message. 332 """ 333 if command_msg.mle.aux_sec_hdr.key_id_mode != 0x2: 334 raise ValueError("The Key Identifier Mode of the Security Control Field SHALL be set to 0x02") 335 336 command_msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter) 337 command_msg.assertMleMessageContainsTlv(mle.Mode) 338 command_msg.assertMleMessageContainsTlv(mle.Response) 339 command_msg.assertMleMessageContainsTlv(mle.Timeout) 340 command_msg.assertMleMessageContainsTlv(mle.Version) 341 342 check_mle_optional_tlv(command_msg, tlv_request, mle.TlvRequest) 343 check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter) 344 check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration) 345 check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp) 346 check_mle_optional_tlv(command_msg, pending_timestamp, mle.PendingTimestamp) 347 check_mle_optional_tlv(command_msg, route64, mle.Route64) 348 349 check_tlv_request_tlv(command_msg, CheckType.CONTAIN, mle.TlvType.ADDRESS16) 350 check_tlv_request_tlv(command_msg, CheckType.CONTAIN, mle.TlvType.NETWORK_DATA) 351 352 353def check_child_id_response( 354 command_msg, 355 route64=CheckType.OPTIONAL, 356 network_data=CheckType.OPTIONAL, 357 address_registration=CheckType.OPTIONAL, 358 active_timestamp=CheckType.OPTIONAL, 359 pending_timestamp=CheckType.OPTIONAL, 360 active_operational_dataset=CheckType.OPTIONAL, 361 pending_operational_dataset=CheckType.OPTIONAL, 362 network_data_check=None, 363): 364 """Verify a properly formatted Child Id Response command message. 365 """ 366 command_msg.assertMleMessageContainsTlv(mle.SourceAddress) 367 command_msg.assertMleMessageContainsTlv(mle.LeaderData) 368 command_msg.assertMleMessageContainsTlv(mle.Address16) 369 370 check_mle_optional_tlv(command_msg, route64, mle.Route64) 371 check_mle_optional_tlv(command_msg, network_data, mle.NetworkData) 372 check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration) 373 check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp) 374 check_mle_optional_tlv(command_msg, pending_timestamp, mle.PendingTimestamp) 375 check_mle_optional_tlv(command_msg, active_operational_dataset, mle.ActiveOperationalDataset) 376 check_mle_optional_tlv(command_msg, pending_operational_dataset, mle.PendingOperationalDataset) 377 378 if network_data_check is not None: 379 network_data_tlv = command_msg.assertMleMessageContainsTlv(mle.NetworkData) 380 network_data_check.check(network_data_tlv) 381 382 383def check_prefix(prefix): 384 """Verify if a prefix contains 6loWPAN sub-TLV and border router sub-TLV 385 """ 386 assert contains_tlv(prefix.sub_tlvs, network_data.BorderRouter), 'Prefix doesn\'t contain a border router sub-TLV!' 387 assert contains_tlv(prefix.sub_tlvs, network_data.LowpanId), 'Prefix doesn\'t contain a LowpanId sub-TLV!' 388 389 390def check_child_update_request_from_child( 391 command_msg, 392 source_address=CheckType.OPTIONAL, 393 leader_data=CheckType.OPTIONAL, 394 challenge=CheckType.OPTIONAL, 395 time_out=CheckType.OPTIONAL, 396 address_registration=CheckType.OPTIONAL, 397 tlv_request_tlv=CheckType.OPTIONAL, 398 active_timestamp=CheckType.OPTIONAL, 399 CIDs=(), 400): 401 402 command_msg.assertMleMessageContainsTlv(mle.Mode) 403 check_mle_optional_tlv(command_msg, source_address, mle.SourceAddress) 404 check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData) 405 check_mle_optional_tlv(command_msg, challenge, mle.Challenge) 406 check_mle_optional_tlv(command_msg, time_out, mle.Timeout) 407 check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration) 408 check_mle_optional_tlv(command_msg, tlv_request_tlv, mle.TlvRequest) 409 check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp) 410 411 if (address_registration == CheckType.CONTAIN) and len(CIDs) > 0: 412 _check_address_registration(command_msg, CIDs) 413 414 415def check_coap_optional_tlv(coap_msg, type, tlv): 416 if type == CheckType.CONTAIN: 417 coap_msg.assertCoapMessageContainsTlv(tlv) 418 elif type == CheckType.NOT_CONTAIN: 419 coap_msg.assertCoapMessageDoesNotContainTlv(tlv) 420 elif type == CheckType.OPTIONAL: 421 coap_msg.assertCoapMessageContainsOptionalTlv(tlv) 422 else: 423 raise ValueError("Invalid check type") 424 425 426def check_router_id_cached(node, router_id, cached=True): 427 """Verify if the node has cached any entries based on the router ID 428 """ 429 eidcaches = node.get_eidcaches() 430 if cached: 431 assert any(router_id == (int(rloc, 16) >> 10) for (_, rloc) in eidcaches) 432 else: 433 assert (any(router_id == (int(rloc, 16) >> 10) for (_, rloc) in eidcaches) is False) 434 435 436def contains_tlv(sub_tlvs, tlv_type): 437 """Verify if a specific type of tlv is included in a sub-tlv list. 438 """ 439 return any(isinstance(sub_tlv, tlv_type) for sub_tlv in sub_tlvs) 440 441 442def contains_tlvs(sub_tlvs, tlv_types): 443 """Verify if all types of tlv in a list are included in a sub-tlv list. 444 """ 445 return all((any(isinstance(sub_tlv, tlv_type) for sub_tlv in sub_tlvs)) for tlv_type in tlv_types) 446 447 448def check_secure_mle_key_id_mode(command_msg, key_id_mode): 449 """Verify if the mle command message sets the right key id mode. 450 """ 451 assert isinstance(command_msg.mle, mle.MleMessageSecured) 452 assert command_msg.mle.aux_sec_hdr.key_id_mode == key_id_mode 453 454 455def check_data_response(command_msg, network_data_check=None, active_timestamp=CheckType.OPTIONAL): 456 """Verify a properly formatted Data Response command message. 457 """ 458 check_secure_mle_key_id_mode(command_msg, 0x02) 459 command_msg.assertMleMessageContainsTlv(mle.SourceAddress) 460 command_msg.assertMleMessageContainsTlv(mle.LeaderData) 461 check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp) 462 if network_data_check is not None: 463 network_data_tlv = command_msg.assertMleMessageContainsTlv(mle.NetworkData) 464 network_data_check.check(network_data_tlv) 465 466 467def check_child_update_request_from_parent( 468 command_msg, 469 leader_data=CheckType.OPTIONAL, 470 network_data=CheckType.OPTIONAL, 471 challenge=CheckType.OPTIONAL, 472 tlv_request=CheckType.OPTIONAL, 473 active_timestamp=CheckType.OPTIONAL, 474): 475 """Verify a properly formatted Child Update Request(from parent) command message. 476 """ 477 check_secure_mle_key_id_mode(command_msg, 0x02) 478 479 command_msg.assertMleMessageContainsTlv(mle.SourceAddress) 480 check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData) 481 check_mle_optional_tlv(command_msg, network_data, mle.NetworkData) 482 check_mle_optional_tlv(command_msg, challenge, mle.Challenge) 483 check_mle_optional_tlv(command_msg, tlv_request, mle.TlvRequest) 484 check_mle_optional_tlv(command_msg, active_timestamp, mle.ActiveTimestamp) 485 486 487def check_child_update_response( 488 command_msg, 489 timeout=CheckType.OPTIONAL, 490 address_registration=CheckType.OPTIONAL, 491 address16=CheckType.OPTIONAL, 492 leader_data=CheckType.OPTIONAL, 493 network_data=CheckType.OPTIONAL, 494 response=CheckType.OPTIONAL, 495 link_layer_frame_counter=CheckType.OPTIONAL, 496 mle_frame_counter=CheckType.OPTIONAL, 497 CIDs=(), 498): 499 """Verify a properly formatted Child Update Response from parent 500 """ 501 check_secure_mle_key_id_mode(command_msg, 0x02) 502 503 command_msg.assertMleMessageContainsTlv(mle.SourceAddress) 504 command_msg.assertMleMessageContainsTlv(mle.Mode) 505 check_mle_optional_tlv(command_msg, timeout, mle.Timeout) 506 check_mle_optional_tlv(command_msg, address_registration, mle.AddressRegistration) 507 check_mle_optional_tlv(command_msg, address16, mle.Address16) 508 check_mle_optional_tlv(command_msg, leader_data, mle.LeaderData) 509 check_mle_optional_tlv(command_msg, network_data, mle.NetworkData) 510 check_mle_optional_tlv(command_msg, response, mle.Response) 511 check_mle_optional_tlv(command_msg, link_layer_frame_counter, mle.LinkLayerFrameCounter) 512 check_mle_optional_tlv(command_msg, mle_frame_counter, mle.MleFrameCounter) 513 514 if (address_registration == CheckType.CONTAIN) and len(CIDs) > 0: 515 _check_address_registration(command_msg, CIDs) 516 517 518def _check_address_registration(command_msg, CIDs=()): 519 addresses = command_msg.assertMleMessageContainsTlv(mle.AddressRegistration).addresses 520 for cid in CIDs: 521 found = False 522 for address in addresses: 523 if isinstance(address, mle.AddressCompressed): 524 if cid == address.cid: 525 found = True 526 break 527 assert found, "AddressRegistration TLV doesn't have CID {} ".format(cid) 528 529 530def get_sub_tlv(tlvs, tlv_type): 531 for sub_tlv in tlvs: 532 if isinstance(sub_tlv, tlv_type): 533 return sub_tlv 534 535 536def check_address_registration_tlv( 537 command_msg, 538 full_address, 539): 540 """Check whether or not a full IPv6 address in AddressRegistrationTlv. 541 """ 542 found = False 543 addr = ipaddress.ip_address(full_address) 544 addresses = command_msg.assertMleMessageContainsTlv(mle.AddressRegistration).addresses 545 546 for item in addresses: 547 if isinstance(item, mle.AddressFull) and ipaddress.ip_address(item.ipv6_address) == addr: 548 found = True 549 break 550 551 return found 552 553 554def check_compressed_address_registration_tlv(command_msg, cid, iid, cid_present_once=False): 555 '''Check whether or not a compressed IPv6 address in AddressRegistrationTlv. 556 note: only compare the iid part of the address. 557 558 Args: 559 command_msg (MleMessage) : The Mle message to check. 560 cid (int): The context id of the domain prefix. 561 iid (string): The Interface Identifier. 562 cid_present_once(boolean): True if cid entry should apprear only once in AR Tlv. 563 False otherwise. 564 ''' 565 found = False 566 cid_cnt = 0 567 568 addresses = command_msg.assertMleMessageContainsTlv(mle.AddressRegistration).addresses 569 570 for item in addresses: 571 if isinstance(item, mle.AddressCompressed): 572 if cid == item.cid: 573 cid_cnt = cid_cnt + 1 574 if iid == item.iid.hex(): 575 found = True 576 break 577 assert found, 'Error: Expected (cid, iid):({},{}) Not Found'.format(cid, iid) 578 579 assert cid_present_once == (cid_cnt == 1), 'Error: Expected cid present {} but present {}'.format( 580 'once' if cid_present_once else '', cid_cnt) 581 582 583def assert_contains_tlv(tlvs, check_type, tlv_type): 584 """Assert a tlv list contains specific tlv and return the first qualified. 585 """ 586 tlvs = [tlv for tlv in tlvs if isinstance(tlv, tlv_type)] 587 if check_type is CheckType.CONTAIN: 588 assert tlvs 589 return tlvs[0] 590 elif check_type is CheckType.NOT_CONTAIN: 591 assert not tlvs 592 return None 593 elif check_type is CheckType.OPTIONAL: 594 return None 595 else: 596 raise ValueError("Invalid check type: {}".format(check_type)) 597 598 599def check_discovery_request(command_msg, thread_version: str = None): 600 """Verify a properly formatted Thread Discovery Request command message. 601 """ 602 assert not isinstance(command_msg.mle, mle.MleMessageSecured) 603 tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs 604 request = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.DiscoveryRequest) 605 assert not thread_version or thread_version in ['1.1', '1.2'] 606 if thread_version == '1.1': 607 assert request.version == config.THREAD_VERSION_1_1 608 elif thread_version == '1.2': 609 assert request.version == config.THREAD_VERSION_1_2 610 611 612def check_discovery_response(command_msg, 613 request_src_addr, 614 steering_data=CheckType.OPTIONAL, 615 thread_version: str = None): 616 """Verify a properly formatted Thread Discovery Response command message. 617 """ 618 assert not isinstance(command_msg.mle, mle.MleMessageSecured) 619 assert (command_msg.mac_header.src_address.type == common.MacAddressType.LONG) 620 assert command_msg.mac_header.dest_address == request_src_addr 621 622 tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs 623 response = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.DiscoveryResponse) 624 assert not thread_version or thread_version in ['1.1', '1.2'] 625 if thread_version == '1.1': 626 assert response.version == config.THREAD_VERSION_1_1 627 elif thread_version == '1.2': 628 assert response.version == config.THREAD_VERSION_1_2 629 assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.ExtendedPanid) 630 assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.NetworkName) 631 assert_contains_tlv(tlvs, steering_data, mesh_cop.SteeringData) 632 assert_contains_tlv(tlvs, steering_data, mesh_cop.JoinerUdpPort) 633 634 check_type = (CheckType.CONTAIN if response.native_flag else CheckType.OPTIONAL) 635 assert_contains_tlv(tlvs, check_type, mesh_cop.CommissionerUdpPort) 636 637 638def get_joiner_udp_port_in_discovery_response(command_msg): 639 """Get the udp port specified in a DISCOVERY RESPONSE message 640 """ 641 tlvs = command_msg.assertMleMessageContainsTlv(mle.ThreadDiscovery).tlvs 642 udp_port_tlv = assert_contains_tlv(tlvs, CheckType.CONTAIN, mesh_cop.JoinerUdpPort) 643 return udp_port_tlv.udp_port 644 645 646def check_joiner_commissioning_messages(commissioning_messages, url=''): 647 """Verify COAP messages sent by joiner while commissioning process. 648 """ 649 print(commissioning_messages) 650 assert len(commissioning_messages) >= 4 651 join_fin_req = commissioning_messages[0] 652 assert join_fin_req.type == mesh_cop.MeshCopMessageType.JOIN_FIN_REQ 653 if url: 654 provisioning_url = assert_contains_tlv(join_fin_req.tlvs, CheckType.CONTAIN, mesh_cop.ProvisioningUrl) 655 assert url == provisioning_url.url 656 else: 657 assert_contains_tlv(join_fin_req.tlvs, CheckType.NOT_CONTAIN, mesh_cop.ProvisioningUrl) 658 659 join_ent_rsp = commissioning_messages[3] 660 assert join_ent_rsp.type == mesh_cop.MeshCopMessageType.JOIN_ENT_RSP 661 662 663def check_commissioner_commissioning_messages(commissioning_messages, state=mesh_cop.MeshCopState.ACCEPT): 664 """Verify COAP messages sent by commissioner while commissioning process. 665 """ 666 assert len(commissioning_messages) >= 2 667 join_fin_rsq = commissioning_messages[1] 668 assert join_fin_rsq.type == mesh_cop.MeshCopMessageType.JOIN_FIN_RSP 669 rsq_state = assert_contains_tlv(join_fin_rsq.tlvs, CheckType.CONTAIN, mesh_cop.State) 670 assert rsq_state.state == state 671 672 673def check_joiner_router_commissioning_messages(commissioning_messages): 674 """Verify COAP messages sent by joiner router while commissioning process. 675 """ 676 if len(commissioning_messages) >= 4: 677 join_ent_ntf = commissioning_messages[2] 678 else: 679 join_ent_ntf = commissioning_messages[0] 680 assert join_ent_ntf.type == mesh_cop.MeshCopMessageType.JOIN_ENT_NTF 681 return None 682 683 684def check_payload_same(tp1, tp2): 685 """Verfiy two payloads are totally the same. 686 A payload is a tuple of tlvs. 687 """ 688 assert len(tp1) == len(tp2) 689 for tlv in tp2: 690 peer_tlv = get_sub_tlv(tp1, type(tlv)) 691 assert (peer_tlv is not None and 692 peer_tlv == tlv), 'peer_tlv:{}, tlv:{} type:{}'.format(peer_tlv, tlv, type(tlv)) 693 694 695def check_coap_message(msg, payloads, dest_addrs=None): 696 if dest_addrs is not None: 697 found = False 698 for dest in dest_addrs: 699 if msg.ipv6_packet.ipv6_header.destination_address == dest: 700 found = True 701 break 702 assert found, 'Destination address incorrect' 703 check_payload_same(msg.coap.payload, payloads) 704 705 706class SinglePrefixCheck: 707 708 def __init__(self, prefix=None, border_router_16=None): 709 self._prefix = prefix 710 self._border_router_16 = border_router_16 711 712 def check(self, prefix_tlv): 713 border_router_tlv = assert_contains_tlv(prefix_tlv.sub_tlvs, CheckType.CONTAIN, network_data.BorderRouter) 714 assert_contains_tlv(prefix_tlv.sub_tlvs, CheckType.CONTAIN, network_data.LowpanId) 715 result = True 716 if self._prefix is not None: 717 result &= self._prefix == binascii.hexlify(prefix_tlv.prefix) 718 if self._border_router_16 is not None: 719 result &= (self._border_router_16 == border_router_tlv.border_router_16) 720 return result 721 722 723class PrefixesCheck: 724 725 def __init__(self, prefix_cnt=0, prefix_check_list=()): 726 self._prefix_cnt = prefix_cnt 727 self._prefix_check_list = prefix_check_list 728 729 def check(self, prefix_tlvs): 730 # if prefix_cnt is given, then check count only 731 if self._prefix_cnt > 0: 732 assert (len(prefix_tlvs) >= self._prefix_cnt), 'prefix count is less than expected' 733 else: 734 for prefix_check in self._prefix_check_list: 735 found = False 736 for prefix_tlv in prefix_tlvs: 737 if prefix_check.check(prefix_tlv): 738 found = True 739 break 740 assert found, 'Some prefix is absent: {}'.format(prefix_check) 741 742 743class CommissioningDataCheck: 744 745 def __init__(self, stable=None, sub_tlv_type_list=()): 746 self._stable = stable 747 self._sub_tlv_type_list = sub_tlv_type_list 748 749 def check(self, commissioning_data_tlv): 750 if self._stable is not None: 751 assert (self._stable == commissioning_data_tlv.stable), 'Commissioning Data stable flag is not correct' 752 assert contains_tlvs(commissioning_data_tlv.sub_tlvs, 753 self._sub_tlv_type_list), 'Some sub tlvs are missing in Commissioning Data' 754 755 756class NetworkDataCheck: 757 758 def __init__(self, prefixes_check=None, commissioning_data_check=None): 759 self._prefixes_check = prefixes_check 760 self._commissioning_data_check = commissioning_data_check 761 762 def check(self, network_data_tlv): 763 if self._prefixes_check is not None: 764 prefix_tlvs = [tlv for tlv in network_data_tlv.tlvs if isinstance(tlv, network_data.Prefix)] 765 self._prefixes_check.check(prefix_tlvs) 766 if self._commissioning_data_check is not None: 767 commissioning_data_tlv = assert_contains_tlv( 768 network_data_tlv.tlvs, 769 CheckType.CONTAIN, 770 network_data.CommissioningData, 771 ) 772 self._commissioning_data_check.check(commissioning_data_tlv) 773