1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import io 31import ipaddress 32import struct 33 34import coap 35import common 36import dtls 37import ipv6 38import mac802154 39import mle 40 41from enum import IntEnum 42 43 44class DropPacketException(Exception): 45 pass 46 47 48class MessageType(IntEnum): 49 MLE = 0 50 COAP = 1 51 ICMP = 2 52 ACK = 3 53 BEACON = 4 54 DATA = 5 55 COMMAND = 6 56 DTLS = 7 57 58 59class Message(object): 60 61 def __init__(self): 62 self._type = None 63 self._channel = None 64 self._mac_header = None 65 self._ipv6_packet = None 66 self._coap = None 67 self._mle = None 68 self._icmp = None 69 self._dtls = None 70 71 def _extract_udp_datagram(self, udp_datagram): 72 if isinstance(udp_datagram.payload, mle.MleMessage): 73 self._type = MessageType.MLE 74 self._mle = udp_datagram.payload 75 76 elif isinstance(udp_datagram.payload, (coap.CoapMessage, coap.CoapMessageProxy)): 77 self._type = MessageType.COAP 78 self._coap = udp_datagram.payload 79 80 # DTLS message factory returns a list of messages 81 elif isinstance(udp_datagram.payload, list): 82 self._type = MessageType.DTLS 83 self._dtls = udp_datagram.payload 84 85 def _extract_upper_layer_protocol(self, upper_layer_protocol): 86 if isinstance(upper_layer_protocol, ipv6.ICMPv6): 87 self._type = MessageType.ICMP 88 self._icmp = upper_layer_protocol 89 90 elif isinstance(upper_layer_protocol, ipv6.UDPDatagram): 91 self._extract_udp_datagram(upper_layer_protocol) 92 93 def try_extract_dtls_messages(self): 94 """Extract multiple dtls messages that are sent in a single UDP datagram 95 """ 96 if self.type != MessageType.DTLS: 97 return [self.clone()] 98 99 assert isinstance(self.dtls, list) 100 ret = [] 101 for dtls in self.dtls: 102 msg = self.clone() 103 msg._dtls = dtls 104 ret.append(msg) 105 return ret 106 107 def clone(self): 108 msg = Message() 109 msg._type = self.type 110 msg._channel = self.channel 111 msg._mac_header = self.mac_header 112 msg._ipv6_packet = self.ipv6_packet 113 msg._coap = self.coap 114 msg._mle = self.mle 115 msg._icmp = self.icmp 116 msg._dtls = self.dtls 117 return msg 118 119 @property 120 def type(self): 121 return self._type 122 123 @type.setter 124 def type(self, value): 125 self._type = value 126 127 @property 128 def channel(self): 129 return self._channel 130 131 @channel.setter 132 def channel(self, value): 133 self._channel = value 134 135 @property 136 def mac_header(self): 137 return self._mac_header 138 139 @mac_header.setter 140 def mac_header(self, value): 141 self._mac_header = value 142 143 if self._mac_header.frame_type == mac802154.MacHeader.FrameType.BEACON: 144 self._type = MessageType.BEACON 145 146 elif self._mac_header.frame_type == mac802154.MacHeader.FrameType.ACK: 147 self._type = MessageType.ACK 148 149 elif self._mac_header.frame_type == mac802154.MacHeader.FrameType.DATA: 150 self._type = MessageType.DATA 151 elif (self._mac_header.frame_type == mac802154.MacHeader.FrameType.COMMAND): 152 self._type = MessageType.COMMAND 153 else: 154 raise ValueError('Invalid mac frame type %d' % self._mac_header.frame_type) 155 156 @property 157 def ipv6_packet(self): 158 return self._ipv6_packet 159 160 @ipv6_packet.setter 161 def ipv6_packet(self, value): 162 self._ipv6_packet = value 163 self._extract_upper_layer_protocol(value.upper_layer_protocol) 164 165 @property 166 def coap(self): 167 return self._coap 168 169 @property 170 def mle(self): 171 return self._mle 172 173 @property 174 def icmp(self): 175 return self._icmp 176 177 @icmp.setter 178 def icmp(self, value): 179 self._icmp = value 180 181 @property 182 def dtls(self): 183 return self._dtls 184 185 def get_mle_message_tlv(self, tlv_class_type): 186 if self.type != MessageType.MLE: 187 raise ValueError("Invalid message type. Expected MLE message.") 188 189 for tlv in self.mle.command.tlvs: 190 if isinstance(tlv, tlv_class_type): 191 return tlv 192 193 def assertMleMessageIsType(self, command_type): 194 if self.type != MessageType.MLE: 195 raise ValueError("Invalid message type. Expected MLE message.") 196 197 assert self.mle.command.type == command_type 198 199 def assertMleMessageContainsTlv(self, tlv_class_type): 200 """To confirm if Mle message contains the TLV type. 201 202 Args: 203 tlv_class_type: tlv's type. 204 205 Returns: 206 mle.Route64: If contains the TLV, return it. 207 """ 208 if self.type != MessageType.MLE: 209 raise ValueError("Invalid message type. Expected MLE message.") 210 211 contains_tlv = False 212 for tlv in self.mle.command.tlvs: 213 if isinstance(tlv, tlv_class_type): 214 contains_tlv = True 215 break 216 217 assert contains_tlv 218 return tlv 219 220 def assertAssignedRouterQuantity(self, router_quantity): 221 """Confirm if Leader contains the Route64 TLV with router_quantity assigned Router IDs. 222 223 Args: 224 router_quantity: the quantity of router. 225 """ 226 tlv = self.assertMleMessageContainsTlv(mle.Route64) 227 router_id_mask = tlv.router_id_mask 228 229 count = 0 230 for i in range(1, 65): 231 count += router_id_mask & 1 232 router_id_mask = router_id_mask >> 1 233 assert count == router_quantity 234 235 def assertMleMessageDoesNotContainTlv(self, tlv_class_type): 236 if self.type != MessageType.MLE: 237 raise ValueError("Invalid message type. Expected MLE message.") 238 239 contains_tlv = False 240 for tlv in self.mle.command.tlvs: 241 if isinstance(tlv, tlv_class_type): 242 contains_tlv = True 243 break 244 245 assert contains_tlv is False 246 247 def assertMleMessageContainsOptionalTlv(self, tlv_class_type): 248 if self.type != MessageType.MLE: 249 raise ValueError("Invalid message type. Expected MLE message.") 250 251 contains_tlv = False 252 for tlv in self.mle.command.tlvs: 253 if isinstance(tlv, tlv_class_type): 254 contains_tlv = True 255 break 256 257 if contains_tlv: 258 print("MleMessage contains optional TLV: {}".format(tlv_class_type)) 259 else: 260 print("MleMessage doesn't contain optional TLV: {}".format(tlv_class_type)) 261 262 def get_coap_message_tlv(self, tlv_class_type): 263 if self.type != MessageType.COAP: 264 raise ValueError("Invalid message type. Expected CoAP message.") 265 266 for tlv in self.coap.payload: 267 if isinstance(tlv, tlv_class_type): 268 return tlv 269 270 def assertCoapMessageContainsTlv(self, tlv_class_type): 271 if self.type != MessageType.COAP: 272 raise ValueError("Invalid message type. Expected CoAP message.") 273 274 contains_tlv = False 275 for tlv in self.coap.payload: 276 if isinstance(tlv, tlv_class_type): 277 contains_tlv = True 278 break 279 280 assert contains_tlv 281 282 def assertCoapMessageDoesNotContainTlv(self, tlv_class_type): 283 if self.type != MessageType.COAP: 284 raise ValueError("Invalid message type. Expected COAP message.") 285 286 contains_tlv = False 287 for tlv in self.coap.payload: 288 if isinstance(tlv, tlv_class_type): 289 contains_tlv = True 290 break 291 292 assert contains_tlv is False 293 294 def assertCoapMessageContainsOptionalTlv(self, tlv_class_type): 295 if self.type != MessageType.COAP: 296 raise ValueError("Invalid message type. Expected CoAP message.") 297 298 for tlv in self.coap.payload: 299 if isinstance(tlv, tlv_class_type): 300 break 301 302 print("CoapMessage doesn't contain optional TLV: {}".format(tlv_class_type)) 303 304 def assertCoapMessageRequestUriPath(self, uri_path): 305 if self.type != MessageType.COAP: 306 raise ValueError("Invalid message type. Expected CoAP message.") 307 308 assert uri_path == self.coap.uri_path 309 310 def assertCoapMessageCode(self, code): 311 if self.type != MessageType.COAP: 312 raise ValueError("Invalid message type. Expected CoAP message.") 313 314 assert code == self.coap.code 315 316 def assertSentToNode(self, node): 317 sent_to_node = False 318 dst_addr = self.ipv6_packet.ipv6_header.destination_address 319 320 for addr in node.get_addrs(): 321 if dst_addr == ipaddress.ip_address(addr): 322 sent_to_node = True 323 324 if self.mac_header.dest_address.type == common.MacAddressType.SHORT: 325 mac_address = common.MacAddress.from_rloc16(node.get_addr16()) 326 if self.mac_header.dest_address == mac_address: 327 sent_to_node = True 328 329 elif self.mac_header.dest_address.type == common.MacAddressType.LONG: 330 mac_address = common.MacAddress.from_eui64(bytearray(node.get_addr64(), encoding="utf-8")) 331 if self.mac_header.dest_address == mac_address: 332 sent_to_node = True 333 334 assert sent_to_node 335 336 def assertSentToDestinationAddress(self, ipv6_address): 337 assert (self.ipv6_packet.ipv6_header.destination_address == ipaddress.ip_address(ipv6_address)) 338 339 def assertSentFromSourceAddress(self, ipv6_address): 340 assert (self.ipv6_packet.ipv6_header.source_address == ipaddress.ip_address(ipv6_address)) 341 342 def assertSentWithHopLimit(self, hop_limit): 343 assert self.ipv6_packet.ipv6_header.hop_limit == hop_limit 344 345 def isMacAddressTypeLong(self): 346 return self.mac_header.dest_address.type == common.MacAddressType.LONG 347 348 def get_dst_udp_port(self): 349 assert isinstance(self.ipv6_packet.upper_layer_protocol, ipv6.UDPDatagram) 350 return self.ipv6_packet.upper_layer_protocol.header.dst_port 351 352 def is_data_poll(self): 353 return self._type == MessageType.COMMAND and \ 354 self._mac_header.command_type == mac802154.MacHeader.CommandIdentifier.DATA_REQUEST 355 356 def __repr__(self): 357 if (self.type == MessageType.DTLS and self.dtls.content_type == dtls.ContentType.HANDSHAKE): 358 return "Message(type={})".format(str(self.dtls.handshake_type)) 359 return "Message(type={})".format(MessageType(self.type).name) 360 361 362class MessagesSet(object): 363 364 def __init__(self, messages, commissioning_messages=()): 365 self._messages = messages 366 self._commissioning_messages = commissioning_messages 367 368 @property 369 def messages(self): 370 return self._messages 371 372 @property 373 def commissioning_messages(self): 374 return self._commissioning_messages 375 376 def next_data_poll(self): 377 while True: 378 message = self.next_message_of(MessageType.COMMAND, False) 379 if not message: 380 break 381 elif message.is_data_poll(): 382 return message 383 384 def next_coap_message(self, code, uri_path=None, assert_enabled=True): 385 message = None 386 387 while self.messages: 388 m = self.messages.pop(0) 389 390 if m.type != MessageType.COAP: 391 continue 392 393 if uri_path is not None and m.coap.uri_path != uri_path: 394 continue 395 396 else: 397 if not m.coap.code.is_equal_dotted(code): 398 continue 399 400 message = m 401 break 402 403 if assert_enabled: 404 assert (message is not None), "Could not find CoapMessage with code: {}".format(code) 405 406 return message 407 408 def last_mle_message(self, command_type, assert_enabled=True): 409 """Get the last Mle Message with specified type from existing capture. 410 411 Args: 412 command_type: the specified mle type. 413 assert_enabled: interrupt or not when get the mle. 414 415 Returns: 416 message.Message: the last Mle Message with specified type. 417 """ 418 message = None 419 size = len(self.messages) 420 421 for i in range(size - 1, -1, -1): 422 m = self.messages[i] 423 424 if m.type != MessageType.MLE: 425 continue 426 427 # for command_type in command_types: 428 if m.mle.command.type == command_type: 429 message = m 430 break 431 432 if assert_enabled: 433 assert (message is not None), "Could not find MleMessage with type: {}".format(command_type) 434 435 return message 436 437 def next_mle_message(self, command_type, assert_enabled=True, sent_to_node=None): 438 message = self.next_mle_message_of_one_of_command_types(command_type) 439 440 if assert_enabled: 441 assert (message is not None), "Could not find MleMessage of the type: {}".format(command_type) 442 443 if sent_to_node is not None: 444 message.assertSentToNode(sent_to_node) 445 446 return message 447 448 def next_mle_message_of_one_of_command_types(self, *command_types): 449 message = None 450 451 while self.messages: 452 m = self.messages.pop(0) 453 454 if m.type != MessageType.MLE: 455 continue 456 457 command_found = False 458 459 for command_type in command_types: 460 if m.mle.command.type == command_type: 461 command_found = True 462 break 463 464 if command_found: 465 message = m 466 break 467 468 return message 469 470 def next_message(self, assert_enabled=True): 471 message = self.messages.pop(0) 472 if assert_enabled: 473 assert message is not None, "Could not find next Message" 474 return message 475 476 def next_message_of(self, message_type, assert_enabled=True): 477 message = None 478 479 while self.messages: 480 m = self.messages.pop(0) 481 if m.type != message_type: 482 continue 483 484 message = m 485 break 486 487 if assert_enabled: 488 assert (message is not None), "Could not find Message of the type: {}".format(message_type) 489 490 return message 491 492 def next_data_message(self): 493 return self.next_message_of(MessageType.DATA) 494 495 def next_command_message(self): 496 return self.next_message_of(MessageType.COMMAND) 497 498 def next_dtls_message(self, content_type, handshake_type=None): 499 while self.messages: 500 msg = self.messages.pop(0) 501 if msg.type != MessageType.DTLS: 502 continue 503 if msg.dtls.content_type != content_type: 504 continue 505 if (content_type == dtls.ContentType.HANDSHAKE and msg.dtls.handshake_type != handshake_type): 506 continue 507 return msg 508 509 t = (handshake_type if content_type == dtls.ContentType.HANDSHAKE else content_type) 510 raise ValueError("Could not find DTLS message of type: {}".format(str(t))) 511 512 def contains_icmp_message(self): 513 for m in self.messages: 514 if m.type == MessageType.ICMP: 515 return True 516 517 return False 518 519 def get_icmp_message(self, icmp_type): 520 for m in self.messages: 521 if m.type != MessageType.ICMP: 522 continue 523 524 if m.icmp.header.type == icmp_type: 525 return m 526 527 return None 528 529 def contains_mle_message(self, command_type): 530 for m in self.messages: 531 if m.type != MessageType.MLE: 532 continue 533 534 if m.mle.command.type == command_type: 535 return True 536 537 return False 538 539 def does_not_contain_coap_message(self): 540 for m in self.messages: 541 if m.type != MessageType.COAP: 542 continue 543 544 return False 545 546 return True 547 548 def clone(self): 549 """Make a copy of current MessageSet. 550 """ 551 return MessagesSet(self.messages[:], self.commissioning_messages[:]) 552 553 def __repr__(self): 554 return str(self.messages) 555 556 557class MessageFactory: 558 559 def __init__(self, lowpan_parser): 560 self._lowpan_parser = lowpan_parser 561 562 def _add_device_descriptors(self, message): 563 for tlv in message.mle.command.tlvs: 564 565 if isinstance(tlv, mle.SourceAddress): 566 mac802154.DeviceDescriptors.add(tlv.address, message.mac_header.src_address) 567 568 if isinstance(tlv, mle.Address16): 569 mac802154.DeviceDescriptors.add(tlv.address, message.mac_header.dest_address) 570 571 def _parse_mac_frame(self, data): 572 mac_frame = mac802154.MacFrame() 573 mac_frame.parse(data) 574 return mac_frame 575 576 def set_lowpan_context(self, cid, prefix): 577 self._lowpan_parser.set_lowpan_context(cid, prefix) 578 579 def create(self, data): 580 try: 581 message = Message() 582 message.channel = struct.unpack(">B", data.read(1)) 583 584 # Parse MAC header 585 mac_frame = self._parse_mac_frame(data) 586 message.mac_header = mac_frame.header 587 588 if message.mac_header.frame_type != mac802154.MacHeader.FrameType.DATA: 589 return [message] 590 591 message_info = common.MessageInfo() 592 message_info.source_mac_address = message.mac_header.src_address 593 message_info.destination_mac_address = message.mac_header.dest_address 594 595 # Create stream with 6LoWPAN datagram 596 lowpan_payload = io.BytesIO(mac_frame.payload.data) 597 598 ipv6_packet = self._lowpan_parser.parse(lowpan_payload, message_info) 599 if ipv6_packet is None: 600 return [message] 601 602 message.ipv6_packet = ipv6_packet 603 604 if message.type == MessageType.MLE: 605 self._add_device_descriptors(message) 606 607 return message.try_extract_dtls_messages() 608 609 except mac802154.KeyIdMode0Exception: 610 print('Received packet with key_id_mode = 0, cannot be handled in test scripts') 611 raise DropPacketException 612