1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import abc 31import io 32import struct 33 34from binascii import hexlify 35from ipaddress import ip_address 36 37import common 38 39try: 40 from itertools import izip_longest as zip_longest 41except ImportError: 42 from itertools import zip_longest 43 44# Next headers for IPv6 protocols 45IPV6_NEXT_HEADER_HOP_BY_HOP = 0 46IPV6_NEXT_HEADER_TCP = 6 47IPV6_NEXT_HEADER_UDP = 17 48IPV6_NEXT_HEADER_FRAGMENT = 44 49IPV6_NEXT_HEADER_ICMP = 58 50 51UPPER_LAYER_PROTOCOLS = [ 52 IPV6_NEXT_HEADER_TCP, 53 IPV6_NEXT_HEADER_UDP, 54 IPV6_NEXT_HEADER_ICMP, 55] 56 57# ICMP Protocol codes 58ICMP_DESTINATION_UNREACHABLE = 1 59ICMP_TIME_EXCEEDED = 3 60ICMP_ECHO_REQUEST = 128 61ICMP_ECHO_RESPONSE = 129 62 63# Default hop limit for IPv6 64HOP_LIMIT_DEFAULT = 64 65 66 67def calculate_checksum(data): 68 """ Calculate checksum from data bytes. 69 70 How to calculate checksum (RFC 2460): 71 https://tools.ietf.org/html/rfc2460#page-27 72 73 Args: 74 data (bytes): input data from which checksum will be calculated 75 76 Returns: 77 int: calculated checksum 78 """ 79 # Create halfwords from data bytes. Example: data[0] = 0x01, data[1] = 80 # 0xb2 => 0x01b2 81 halfwords = [((byte0 << 8) | byte1) for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00)] 82 83 checksum = 0 84 for halfword in halfwords: 85 checksum += halfword 86 checksum = (checksum & 0xffff) + (checksum >> 16) 87 88 checksum ^= 0xffff 89 90 if checksum == 0: 91 return 0xffff 92 else: 93 return checksum 94 95 96class PacketFactory(object): 97 """ Interface for classes that produce objects from data. """ 98 99 def parse(self, data, message_info): 100 """ Convert data to object. 101 102 Args: 103 data (BytesIO) 104 message_info (MessageInfo) 105 106 """ 107 raise NotImplementedError 108 109 110class BuildableFromBytes(object): 111 """ Interface for classes which can be built from bytes. """ 112 113 @classmethod 114 def from_bytes(cls, data): 115 """ Convert data to object. 116 117 Args: 118 data (bytes) 119 120 """ 121 raise NotImplementedError 122 123 124class ConvertibleToBytes(object): 125 """ Interface for classes which can be converted to bytes. """ 126 127 def to_bytes(self): 128 """ Convert object to data. 129 130 Returns: 131 bytes 132 """ 133 raise NotImplementedError 134 135 def __len__(self): 136 """ Length of data (in bytes). 137 138 Returns: 139 int 140 """ 141 raise NotImplementedError 142 143 144class Header(object): 145 """ Interface for header classes. """ 146 147 __metaclass__ = abc.ABCMeta 148 149 @abc.abstractproperty 150 def type(self): 151 """ Number which can be used in the next header field in IPv6 header or next headers. 152 153 Returns: 154 int 155 """ 156 157 158class ExtensionHeader(object): 159 """ Base for classes representing Extension Headers in IPv6 packets. """ 160 161 def __init__(self, next_header, hdr_ext_len=0): 162 self.next_header = next_header 163 self.hdr_ext_len = hdr_ext_len 164 165 166class UpperLayerProtocol(Header, ConvertibleToBytes): 167 """ Base for classes representing upper layer protocol payload in IPv6 packets. """ 168 169 def __init__(self, header): 170 self.header = header 171 172 @property 173 def checksum(self): 174 """ Return checksum from upper layer protocol header. """ 175 return self.header.checksum 176 177 @checksum.setter 178 def checksum(self, value): 179 """ Set checksum value in upper layer protocol header. """ 180 self.header.checksum = value 181 182 def is_valid_checksum(self): 183 """ Return information if set checksum is valid. 184 185 It is not possible to get zero from checksum calculation. 186 Zero indicates invalid checksum value. 187 188 Returns: 189 bool 190 """ 191 return self.checksum != 0 192 193 194class IPv6PseudoHeader(ConvertibleToBytes): 195 """ Class representing IPv6 pseudo header which is required to calculate 196 upper layer protocol (like e.g. UDP or ICMPv6) checksum. 197 198 This class is used only during upper layer protocol checksum calculation. Do not use it outside of this module. 199 200 """ 201 202 def __init__(self, source_address, destination_address, payload_length, next_header): 203 self._source_address = self._convert_to_ipaddress(source_address) 204 self._destination_address = self._convert_to_ipaddress(destination_address) 205 self.payload_length = payload_length 206 self.next_header = next_header 207 208 def _convert_to_ipaddress(self, value): 209 if isinstance(value, bytearray): 210 value = bytes(value) 211 212 return ip_address(value) 213 214 @property 215 def source_address(self): 216 return self._source_address 217 218 @source_address.setter 219 def source_address(self, value): 220 self._source_address = self._convert_to_ipaddress(value) 221 222 @property 223 def destination_address(self): 224 return self._destination_address 225 226 @destination_address.setter 227 def destination_address(self, value): 228 self._source_address = self._convert_to_ipaddress(value) 229 230 def to_bytes(self): 231 data = bytearray() 232 data += self.source_address.packed 233 data += self.destination_address.packed 234 data += struct.pack(">I", self.payload_length) 235 data += struct.pack(">I", self.next_header) 236 237 return data 238 239 240class IPv6Header(ConvertibleToBytes, BuildableFromBytes): 241 """ Class representing IPv6 packet header. """ 242 243 _version = 6 244 245 _header_length = 40 246 247 def __init__( 248 self, 249 source_address, 250 destination_address, 251 traffic_class=0, 252 flow_label=0, 253 hop_limit=64, 254 payload_length=0, 255 next_header=0, 256 ): 257 self.version = self._version 258 self._source_address = self._convert_to_ipaddress(source_address) 259 self._destination_address = self._convert_to_ipaddress(destination_address) 260 self.traffic_class = traffic_class 261 self.flow_label = flow_label 262 self.hop_limit = hop_limit 263 self.payload_length = payload_length 264 self.next_header = next_header 265 266 def _convert_to_ipaddress(self, value): 267 if isinstance(value, bytearray): 268 value = bytes(value) 269 270 return ip_address(value) 271 272 @property 273 def source_address(self): 274 return self._source_address 275 276 @source_address.setter 277 def source_address(self, value): 278 self._source_address = self._convert_to_ipaddress(value) 279 280 @property 281 def destination_address(self): 282 return self._destination_address 283 284 def to_bytes(self): 285 data = bytearray([ 286 ((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F), 287 ((self.traffic_class & 0x0F) << 4) | ((self.flow_label >> 16) & 0x0F), 288 ((self.flow_label >> 8) & 0xff), 289 ((self.flow_label & 0xff)), 290 ]) 291 data += struct.pack(">H", self.payload_length) 292 data += bytearray([self.next_header, self.hop_limit]) 293 data += self.source_address.packed 294 data += self.destination_address.packed 295 296 return data 297 298 @classmethod 299 def from_bytes(cls, data): 300 b = bytearray(data.read(4)) 301 302 (b[0] >> 4) & 0x0F 303 traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F) 304 flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3] 305 306 payload_length = struct.unpack(">H", data.read(2))[0] 307 next_header = ord(data.read(1)) 308 hop_limit = ord(data.read(1)) 309 src_addr = bytearray(data.read(16)) 310 dst_addr = bytearray(data.read(16)) 311 312 return cls( 313 src_addr, 314 dst_addr, 315 traffic_class, 316 flow_label, 317 hop_limit, 318 payload_length, 319 next_header, 320 ) 321 322 def __repr__(self): 323 return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, \ 324 hop_limit={}, traffic_class={}, flow_label={})".format( 325 self.source_address.compressed, 326 self.destination_address.compressed, 327 self.next_header, 328 self.payload_length, 329 self.hop_limit, 330 self.traffic_class, 331 self.flow_label, 332 ) 333 334 def __len__(self): 335 return self._header_length 336 337 338class IPv6Packet(ConvertibleToBytes): 339 """ Class representing IPv6 packet. 340 341 IPv6 packet consists of IPv6 header, optional extension header, and upper layer protocol. 342 343 IPv6 packet 344 345 +-------------+----------------------------------+----------------------------------------------+ 346 | | | | 347 | IPv6 header | extension headers (zero or more) | upper layer protocol (e.g. UDP, TCP, ICMPv6) | 348 | | | | 349 +-------------+----------------------------------+----------------------------------------------+ 350 351 Extension headers: 352 - HopByHop 353 - Routing header (not implemented in this module) 354 - Fragment Header 355 356 Upper layer protocols: 357 - ICMPv6 358 - UDP 359 - TCP (not implemented in this module) 360 361 Example: 362 IPv6 packet construction without extension headers: 363 364 ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"), 365 ICMPv6(ICMPv6Header(128, 0), 366 ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 367 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 368 0x41, 0x41])))) 369 370 IPv6 packet construction with extension headers: 371 372 ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"), 373 ICMPv6(ICMPv6Header(128, 0), 374 ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 375 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 376 0x41, 0x41])), 377 [HopByHop(options=[ 378 HopByHopOption(HopByHopOptionHeader(_type=0x6d), 379 MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18]))) 380 ])]) 381 382 """ 383 384 def __init__(self, ipv6_header, upper_layer_protocol, extension_headers=None): 385 self.ipv6_header = ipv6_header 386 387 self.upper_layer_protocol = upper_layer_protocol 388 389 self.extension_headers = (extension_headers if extension_headers is not None else []) 390 391 self._update_next_header_values_in_headers() 392 393 if not upper_layer_protocol.is_valid_checksum(): 394 self.upper_layer_protocol.checksum = self.calculate_checksum() 395 396 def _validate_checksum(self): 397 checksum = self.calculate_checksum() 398 399 if self.upper_layer_protocol.checksum != checksum: 400 raise RuntimeError("Could not create IPv6 packet. " 401 "Invalid checksum: {}!={}".format(self.upper_layer_protocol.checksum, checksum)) 402 403 self.upper_layer_protocol.checksum = checksum 404 405 def _update_payload_length_value_in_ipv6_header(self): 406 self.ipv6_header.payload_length = len(self.upper_layer_protocol) + sum( 407 [len(extension_header) for extension_header in self.extension_headers]) 408 409 def _update_next_header_values_in_headers(self): 410 last_header = self.ipv6_header 411 412 for extension_header in self.extension_headers: 413 last_header.next_header = extension_header.type 414 last_header = extension_header 415 416 last_header.next_header = self.upper_layer_protocol.type 417 418 def calculate_checksum(self): 419 saved_checksum = self.upper_layer_protocol.checksum 420 421 self.upper_layer_protocol.checksum = 0 422 423 upper_layer_protocol_bytes = self.upper_layer_protocol.to_bytes() 424 425 self.upper_layer_protocol.checksum = saved_checksum 426 427 pseudo_header = IPv6PseudoHeader( 428 self.ipv6_header.source_address, 429 self.ipv6_header.destination_address, 430 len(upper_layer_protocol_bytes), 431 self.upper_layer_protocol.type, 432 ) 433 434 return calculate_checksum(pseudo_header.to_bytes() + upper_layer_protocol_bytes) 435 436 def to_bytes(self): 437 self._update_payload_length_value_in_ipv6_header() 438 self._update_next_header_values_in_headers() 439 self.upper_layer_protocol.checksum = self.calculate_checksum() 440 441 ipv6_packet = self.ipv6_header.to_bytes() 442 443 for extension_header in self.extension_headers: 444 ipv6_packet += extension_header.to_bytes() 445 446 ipv6_packet += self.upper_layer_protocol.to_bytes() 447 448 return ipv6_packet 449 450 def __repr__(self): 451 return "IPv6Packet(header={}, upper_layer_protocol={})".format(self.ipv6_header, self.upper_layer_protocol) 452 453 454class UDPHeader(ConvertibleToBytes, BuildableFromBytes): 455 """ Class representing UDP datagram header. 456 457 This header is required to construct UDP datagram. 458 459 """ 460 461 _header_length = 8 462 463 def __init__(self, src_port, dst_port, payload_length=0, checksum=0): 464 self.src_port = src_port 465 self.dst_port = dst_port 466 467 self._payload_length = payload_length 468 self.checksum = checksum 469 470 @property 471 def type(self): 472 return 17 473 474 @property 475 def payload_length(self): 476 return self._payload_length 477 478 @payload_length.setter 479 def payload_length(self, value): 480 self._payload_length = self._header_length + value 481 482 def to_bytes(self): 483 data = struct.pack(">H", self.src_port) 484 data += struct.pack(">H", self.dst_port) 485 data += struct.pack(">H", self.payload_length) 486 data += struct.pack(">H", self.checksum) 487 488 return data 489 490 @classmethod 491 def from_bytes(cls, data): 492 src_port = struct.unpack(">H", data.read(2))[0] 493 dst_port = struct.unpack(">H", data.read(2))[0] 494 payload_length = struct.unpack(">H", data.read(2))[0] 495 checksum = struct.unpack(">H", data.read(2))[0] 496 497 return cls(src_port, dst_port, payload_length, checksum) 498 499 def __len__(self): 500 return self._header_length 501 502 503class UDPDatagram(UpperLayerProtocol): 504 """ Class representing UDP datagram. 505 506 UDP is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol. 507 508 This class consists of a UDP header and payload. The example below shows how a UDP datagram can be constructed. 509 510 Example: 511 udp_dgram = UDPDatagram(UDPHeader(src_port=19788, dst_port=19788), 512 bytes([0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 513 0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03, 514 0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80, 515 0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef])) 516 517 """ 518 519 @property 520 def type(self): 521 return 17 522 523 def __init__(self, header, payload): 524 super(UDPDatagram, self).__init__(header) 525 self.payload = payload 526 527 def to_bytes(self): 528 self.header.payload_length = len(self.payload) 529 530 data = bytearray() 531 data += self.header.to_bytes() 532 data += self.payload.to_bytes() 533 return data 534 535 def __len__(self): 536 return len(self.header) + len(self.payload) 537 538 539class ICMPv6Header(ConvertibleToBytes, BuildableFromBytes): 540 """ Class representing ICMPv6 message header. 541 542 This header is required to construct ICMPv6 message. 543 544 """ 545 546 _header_length = 4 547 548 def __init__(self, _type, code, checksum=0): 549 self.type = _type 550 self.code = code 551 552 self.checksum = checksum 553 554 def to_bytes(self): 555 return bytearray([self.type, self.code]) + struct.pack(">H", self.checksum) 556 557 @classmethod 558 def from_bytes(cls, data): 559 _type = ord(data.read(1)) 560 code = ord(data.read(1)) 561 checksum = struct.unpack(">H", data.read(2))[0] 562 563 return cls(_type, code, checksum) 564 565 def __len__(self): 566 return self._header_length 567 568 569class ICMPv6(UpperLayerProtocol): 570 """ Class representing ICMPv6 message. 571 572 ICMPv6 is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol. 573 574 This class consists of an ICMPv6 header and body. The example below shows how an ICMPv6 message can be constructed. 575 576 Example: 577 icmpv6_msg = ICMPv6(ICMPv6Header(128, 0), 578 ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01, 579 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 580 0x41, 0x41]))) 581 582 """ 583 584 @property 585 def type(self): 586 return 58 587 588 def __init__(self, header, body): 589 super(ICMPv6, self).__init__(header) 590 self.body = body 591 592 def to_bytes(self): 593 return bytearray(self.header.to_bytes() + self.body.to_bytes()) 594 595 def __len__(self): 596 return len(self.header) + len(self.body) 597 598 599class FragmentHeader(ExtensionHeader): 600 """ Class representing Fragment extension header. 601 602 +-------------+----------+-----------------+-----+---+----------------+ 603 | Next Header | Reserved | Fragment Offset | Res | M | Identification | 604 +-------------+----------+-----------------+-----+---+----------------+ 605 606 Fragment extention header consists of: 607 - next_header type (8 bit) 608 - fragment offset which is multiple of 8 (13 bit) 609 - more_flag to indicate further data (1 bit) 610 - identification for all associated fragments (32 bit) 611 """ 612 613 @property 614 def type(self): 615 return 44 616 617 @property 618 def identification(self): 619 return self._identification 620 621 @property 622 def more_flag(self): 623 return self._more_flag 624 625 @property 626 def offset(self): 627 return self._fragm_offset 628 629 def __init__(self, next_header=None, fragm_offset=0, more_flag=False, identification=0): 630 super(FragmentHeader, self).__init__(next_header, 0) 631 self._fragm_offset = fragm_offset 632 self._more_flag = more_flag 633 self._identification = identification 634 635 def callculate_offset(self, position): 636 return position >> 3 637 638 def to_bytes(self): 639 data = bytearray([self.next_header, 0x00]) 640 data += bytearray([self._fragm_offset >> 5, ((self._fragm_offset << 3) | self._more_flag) & 0xff]) 641 data += struct.pack(">I", self._identification) 642 643 return data 644 645 @classmethod 646 def from_bytes(cls, data): 647 next_header = struct.unpack(">B", data.read(1))[0] 648 struct.unpack(">B", data.read(1))[0] # reserved 649 fragment_offset = struct.unpack(">H", data.read(2))[0] 650 more_flag = fragment_offset & 0x1 651 identificaton = struct.unpack(">I", data.read(4))[0] 652 653 fragment_offset = fragment_offset >> 3 654 655 return cls(next_header, fragment_offset, more_flag, identificaton) 656 657 def __len__(self): 658 return 64 659 660 661class HopByHop(ExtensionHeader): 662 """ Class representing HopByHop extension header. 663 664 HopByHop extension header consists of: 665 - next_header type 666 - extension header length which is multiple of 8 667 - options 668 669 """ 670 671 _one_byte_padding = 0x00 672 _many_bytes_padding = 0x01 673 674 @property 675 def type(self): 676 return 0 677 678 def __init__(self, next_header=None, options=None, hdr_ext_len=None): 679 super(HopByHop, self).__init__(next_header, hdr_ext_len) 680 self.options = options if options is not None else [] 681 682 if hdr_ext_len is not None: 683 self.hdr_ext_len = hdr_ext_len 684 else: 685 payload_length = self._calculate_payload_length() 686 self.hdr_ext_len = self._calculate_hdr_ext_len(payload_length) 687 688 def _calculate_payload_length(self): 689 payload_length = 2 690 691 for option in self.options: 692 payload_length += len(option) 693 694 return payload_length 695 696 def _calculate_hdr_ext_len(self, payload_length): 697 count = payload_length >> 3 698 699 if (payload_length & 0x7) == 0 and count > 0: 700 return count - 1 701 702 return count 703 704 def to_bytes(self): 705 data = bytearray([self.next_header, self.hdr_ext_len]) 706 707 for option in self.options: 708 data += option.to_bytes() 709 710 # Padding 711 # 712 # More details: 713 # https://tools.ietf.org/html/rfc2460#section-4.2 714 # 715 excess_bytes = len(data) & 0x7 716 717 if excess_bytes > 0: 718 padding_length = 8 - excess_bytes 719 720 if padding_length == 1: 721 data += bytearray([self._one_byte_padding]) 722 723 else: 724 padding_length -= 2 725 data += bytearray([self._many_bytes_padding, padding_length]) 726 data += bytearray([0x00 for _ in range(padding_length)]) 727 728 return data 729 730 def __len__(self): 731 """ HopByHop extension header length 732 733 More details: 734 https://tools.ietf.org/html/rfc2460#section-4.3 735 736 """ 737 return (self.hdr_ext_len + 1) * 8 738 739 740class HopByHopOptionHeader(ConvertibleToBytes, BuildableFromBytes): 741 """ Class representing HopByHop option header. """ 742 743 _header_length = 2 744 745 def __init__(self, _type, length=None): 746 self.type = _type 747 self.length = length if length is not None else 0 748 749 def to_bytes(self): 750 return bytearray([self.type, self.length]) 751 752 @classmethod 753 def from_bytes(cls, data): 754 _type = ord(data.read(1)) 755 length = ord(data.read(1)) 756 return cls(_type, length) 757 758 def __len__(self): 759 return self._header_length 760 761 def __repr__(self): 762 return "HopByHopOptionHeader(type={}, length={})".format(self.type, self.length) 763 764 765class HopByHopOption(ConvertibleToBytes): 766 """ Class representing HopByHop option. 767 768 Class consists of two elements: HopByHopOptionHeader and value (e.g. for MPLOption). 769 770 The following example shows how any HopByHop option can be constructed. 771 772 Example: 773 HopByHop(next_header=0x3a, 774 options=[HopByHopOption(HopByHopOptionHeader(_type=0x6d), 775 MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18]))) 776 777 """ 778 779 def __init__(self, header, value): 780 self.value = value 781 782 self.header = header 783 self.header.length = len(self.value) 784 785 def to_bytes(self): 786 return self.header.to_bytes() + self.value.to_bytes() 787 788 def __len__(self): 789 return len(self.header) + len(self.value) 790 791 def __repr__(self): 792 return "HopByHopOption(header={}, value={})".format(self.header, self.value) 793 794 795class MPLOption(ConvertibleToBytes): 796 """ Class representing MPL option. """ 797 798 _header_length = 2 799 800 _seed_id_length = {0: 0, 1: 2, 2: 8, 3: 16} 801 802 def __init__(self, S, M, V, sequence, seed_id): 803 self.S = S 804 self.M = M 805 self.V = V 806 self.sequence = sequence 807 self.seed_id = seed_id 808 809 def to_bytes(self): 810 smv = (((self.S & 0x03) << 6) | ((self.M & 0x01) << 5) | ((self.V & 0x01) << 4)) 811 812 return bytearray([smv, self.sequence]) + self.seed_id 813 814 @classmethod 815 def from_bytes(cls, data): 816 b = ord(data.read(1)) 817 818 s = (b >> 6) & 0x03 819 m = (b >> 5) & 0x01 820 v = (b >> 4) & 0x01 821 822 sequence = ord(data.read(1)) 823 seed_id = data.read(cls._seed_id_length[s]) 824 825 return cls(s, m, v, sequence, seed_id) 826 827 def __len__(self): 828 return self._header_length + self._seed_id_length[self.S] 829 830 def __repr__(self): 831 return "MPLOption(S={}, M={}, V={}, sequence={}, seed_id={})".format(self.S, self.M, self.V, self.sequence, 832 hexlify(self.seed_id)) 833 834 835class IPv6PacketFactory(PacketFactory): 836 """ Factory that produces IPv6 packets from data. 837 838 This factory must be initialized with factories which allow to parse extension headers and upper layer protocols. 839 840 The following example shows preferable setup of IPv6PacketFactory. 841 842 Header types: 843 0: HopByHop 844 17: UDP 845 58: ICMPv6 846 847 Option types: 848 109: MPL 849 850 ICMPv6 body types: 851 128: Echo request 852 129: Echo response 853 854 Example usage: 855 856 ipv6_factory = IPv6PacketFactory( 857 ehf={ 858 0: HopByHopFactory(options_factories={ 859 109: MPLOptionFactory() 860 }) 861 }, 862 ulpf={ 863 17: UDPDatagramFactory(dst_port_factories={ 864 19788: MLEMessageFactory(), 865 19789: CoAPMessageFactory() 866 }), 867 58: ICMPv6Factory(body_factories={ 868 128: ICMPv6EchoBodyFactory(), 869 129: ICMPv6EchoBodyFactory() 870 }) 871 } 872 ) 873 874 """ 875 876 def __init__(self, ehf=None, ulpf=None): 877 """ 878 ehf - Extension Header Factory 879 ulpf - Upper Layer Protocol Factory 880 881 Args: 882 ehf(dict[int: PacketFactory]): Dictionary mapping extension header types on specialized factories. 883 ulpf(dict[int: PacketFactory]): Dictionary mapping upper layer protocol types on specialized factories. 884 """ 885 self._ehf = ehf if ehf is not None else {} 886 self._ulpf = ulpf if ulpf is not None else {} 887 888 def _is_extension_header(self, header_type): 889 return header_type not in UPPER_LAYER_PROTOCOLS 890 891 def _get_extension_header_factory_for(self, next_header): 892 try: 893 return self._ehf[next_header] 894 except KeyError: 895 raise RuntimeError("Could not get Extension Header factory for next_header={}.".format(next_header)) 896 897 def _get_upper_layer_protocol_factory_for(self, next_header): 898 try: 899 return self._ulpf[next_header] 900 except KeyError: 901 raise RuntimeError("Could not get Upper Layer Protocol factory for next_header={}.".format(next_header)) 902 903 def _parse_extension_headers(self, data, next_header, message_info): 904 extension_headers = [] 905 906 while self._is_extension_header(next_header): 907 factory = self._get_extension_header_factory_for(next_header) 908 909 extension_header = factory.parse(data, message_info) 910 911 next_header = extension_header.next_header 912 913 extension_headers.append(extension_header) 914 915 return next_header, extension_headers 916 917 def _parse_upper_layer_protocol(self, data, next_header, message_info): 918 factory = self._get_upper_layer_protocol_factory_for(next_header) 919 920 return factory.parse(data, message_info) 921 922 def parse(self, data, message_info): 923 ipv6_header = IPv6Header.from_bytes(data) 924 925 message_info.source_ipv6 = ipv6_header.source_address 926 message_info.destination_ipv6 = ipv6_header.destination_address 927 928 next_header, extension_headers = self._parse_extension_headers(data, ipv6_header.next_header, message_info) 929 930 upper_layer_protocol = self._parse_upper_layer_protocol(data, next_header, message_info) 931 932 return IPv6Packet(ipv6_header, upper_layer_protocol, extension_headers) 933 934 935class HopByHopOptionsFactory(object): 936 """ Factory that produces HopByHop options. """ 937 938 _one_byte_padding = 0x00 939 _many_bytes_padding = 0x01 940 941 def __init__(self, options_factories=None): 942 self._options_factories = (options_factories if options_factories is not None else {}) 943 944 def _get_HopByHopOption_value_factory(self, _type): 945 try: 946 return self._options_factories[_type] 947 except KeyError: 948 raise RuntimeError("Could not find HopByHopOption value factory for type={}.".format(_type)) 949 950 def parse(self, data, message_info): 951 options = [] 952 953 while data.tell() < len(data.getvalue()): 954 option_header = HopByHopOptionHeader.from_bytes(data) 955 956 if option_header.type == self._one_byte_padding: 957 # skip one byte padding 958 data.read(1) 959 960 elif option_header.type == self._many_bytes_padding: 961 # skip n bytes padding 962 data.read(option_header.length) 963 964 else: 965 factory = self._get_HopByHopOption_value_factory(option_header.type) 966 967 option_data = data.read(option_header.length) 968 969 option = HopByHopOption( 970 option_header, 971 factory.parse(io.BytesIO(option_data), message_info), 972 ) 973 974 options.append(option) 975 976 return options 977 978 979class HopByHopFactory(PacketFactory): 980 """ Factory that produces HopByHop extension headers from data. """ 981 982 def __init__(self, hop_by_hop_options_factory): 983 self._hop_by_hop_options_factory = hop_by_hop_options_factory 984 985 def _calculate_extension_header_length(self, hdr_ext_len): 986 return (hdr_ext_len + 1) * 8 987 988 def parse(self, data, message_info): 989 next_header = ord(data.read(1)) 990 991 hdr_ext_len = ord(data.read(1)) 992 993 # Note! Two bytes were read (next_header and hdr_ext_len) so they must 994 # be substracted from header length 995 hop_by_hop_length = (self._calculate_extension_header_length(hdr_ext_len) - 2) 996 997 hop_by_hop_data = data.read(hop_by_hop_length) 998 999 options = self._hop_by_hop_options_factory.parse(io.BytesIO(hop_by_hop_data), message_info) 1000 1001 hop_by_hop = HopByHop(next_header, options, hdr_ext_len) 1002 1003 message_info.payload_length += len(hop_by_hop) 1004 1005 return hop_by_hop 1006 1007 1008class MPLOptionFactory(PacketFactory): 1009 """ Factory that produces MPL options for HopByHop extension header. """ 1010 1011 def parse(self, data, message_info): 1012 return MPLOption.from_bytes(data) 1013 1014 1015class UDPHeaderFactory: 1016 """ Factory that produces UDP header. """ 1017 1018 def parse(self, data, message_info): 1019 return UDPHeader.from_bytes(data) 1020 1021 1022class UdpBasedOnSrcDstPortsPayloadFactory: 1023 1024 # TODO: Unittests 1025 """ Factory that produces UDP payload. """ 1026 1027 def __init__(self, src_dst_port_based_payload_factories): 1028 """ 1029 Args: 1030 src_dst_port_based_payload_factories (PacketFactory): 1031 Factories parse UDP payload based on source or 1032 destination port. 1033 """ 1034 self._factories = src_dst_port_based_payload_factories 1035 1036 def parse(self, data, message_info): 1037 factory = None 1038 1039 if message_info.dst_port in self._factories: 1040 factory = self._factories[message_info.dst_port] 1041 1042 if message_info.src_port in self._factories: 1043 factory = self._factories[message_info.src_port] 1044 1045 if message_info.dst_port == common.UDP_TEST_PORT: 1046 # Ignore traffic targeted to test port 1047 return None 1048 elif factory is None: 1049 raise RuntimeError("Could not find factory to build UDP payload.") 1050 1051 return factory.parse(data, message_info) 1052 1053 1054class UDPDatagramFactory(PacketFactory): 1055 1056 # TODO: Unittests 1057 """ Factory that produces UDP datagrams. """ 1058 1059 def __init__(self, udp_header_factory, udp_payload_factory): 1060 self._udp_header_factory = udp_header_factory 1061 self._udp_payload_factory = udp_payload_factory 1062 1063 def parse(self, data, message_info): 1064 header = self._udp_header_factory.parse(data, message_info) 1065 1066 # Update message payload length: UDP header (8B) + payload length 1067 message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell()) 1068 1069 message_info.src_port = header.src_port 1070 message_info.dst_port = header.dst_port 1071 1072 payload = self._udp_payload_factory.parse(data, message_info) 1073 1074 return UDPDatagram(header, payload) 1075 1076 1077class ICMPv6Factory(PacketFactory): 1078 """ Factory that produces ICMPv6 messages from data. """ 1079 1080 def __init__(self, body_factories=None): 1081 self._body_factories = (body_factories if body_factories is not None else {}) 1082 1083 def _get_icmpv6_body_factory(self, _type): 1084 try: 1085 return self._body_factories[_type] 1086 1087 except KeyError: 1088 if "default" not in self._body_factories: 1089 raise RuntimeError("Could not find specialized factory to parse ICMP body. " 1090 "Unsupported ICMP type: {}".format(_type)) 1091 1092 default_factory = self._body_factories["default"] 1093 1094 print("Could not find specialized factory to parse ICMP body. " 1095 "Take the default one: {}".format(type(default_factory))) 1096 1097 return default_factory 1098 1099 def parse(self, data, message_info): 1100 header = ICMPv6Header.from_bytes(data) 1101 1102 factory = self._get_icmpv6_body_factory(header.type) 1103 1104 message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell()) 1105 1106 return ICMPv6(header, factory.parse(data, message_info)) 1107 1108 1109class ICMPv6EchoBodyFactory(PacketFactory): 1110 """ Factory that produces ICMPv6 echo message body. """ 1111 1112 def parse(self, data, message_info): 1113 return ICMPv6EchoBody.from_bytes(data) 1114 1115 1116class BytesPayload(ConvertibleToBytes, BuildableFromBytes): 1117 """ Class representing bytes payload. """ 1118 1119 def __init__(self, data): 1120 self.data = data 1121 1122 def to_bytes(self): 1123 return bytearray(self.data) 1124 1125 @classmethod 1126 def from_bytes(cls, data): 1127 return cls(data) 1128 1129 def __len__(self): 1130 return len(self.data) 1131 1132 1133class BytesPayloadFactory(PacketFactory): 1134 """ Factory that produces bytes payload. """ 1135 1136 def parse(self, data, message_info): 1137 return BytesPayload(data.read()) 1138 1139 1140class ICMPv6EchoBody(ConvertibleToBytes, BuildableFromBytes): 1141 """ Class representing body of ICMPv6 echo messages. """ 1142 1143 _header_length = 4 1144 1145 def __init__(self, identifier, sequence_number, data): 1146 self.identifier = identifier 1147 self.sequence_number = sequence_number 1148 self.data = data 1149 1150 def to_bytes(self): 1151 data = struct.pack(">H", self.identifier) 1152 data += struct.pack(">H", self.sequence_number) 1153 data += self.data 1154 return data 1155 1156 @classmethod 1157 def from_bytes(cls, data): 1158 identifier = struct.unpack(">H", data.read(2))[0] 1159 sequence_number = struct.unpack(">H", data.read(2))[0] 1160 1161 return cls(identifier, sequence_number, data.read()) 1162 1163 def __len__(self): 1164 return self._header_length + len(self.data) 1165 1166 1167class ICMPv6DestinationUnreachableFactory(PacketFactory): 1168 """ Factory that produces ICMPv6 echo message body. """ 1169 1170 def parse(self, data, message_info): 1171 return ICMPv6DestinationUnreachable.from_bytes(data) 1172 1173 1174class ICMPv6DestinationUnreachable(ConvertibleToBytes, BuildableFromBytes): 1175 """ Class representing body of ICMPv6 Destination Unreachable messages. """ 1176 1177 _header_length = 4 1178 _unused = 0 1179 1180 def __init__(self, data): 1181 self.data = data 1182 1183 def to_bytes(self): 1184 data = bytearray(struct.pack(">I", self._unused)) 1185 data += self.data 1186 return data 1187 1188 @classmethod 1189 def from_bytes(cls, data): 1190 unused = struct.unpack(">I", data.read(4))[0] 1191 if unused != 0: 1192 raise RuntimeError( 1193 "Invalid value of unused field in the ICMPv6 Destination Unreachable data. Expected value: 0.") 1194 1195 return cls(bytearray(data.read())) 1196 1197 def __len__(self): 1198 return self._header_length + len(self.data) 1199