• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import abc
31import io
32import struct
33
34from binascii import hexlify
35from ipaddress import ip_address
36
37import common
38
39try:
40    from itertools import izip_longest as zip_longest
41except ImportError:
42    from itertools import zip_longest
43
44# Next headers for IPv6 protocols
45IPV6_NEXT_HEADER_HOP_BY_HOP = 0
46IPV6_NEXT_HEADER_TCP = 6
47IPV6_NEXT_HEADER_UDP = 17
48IPV6_NEXT_HEADER_FRAGMENT = 44
49IPV6_NEXT_HEADER_ICMP = 58
50
51UPPER_LAYER_PROTOCOLS = [
52    IPV6_NEXT_HEADER_TCP,
53    IPV6_NEXT_HEADER_UDP,
54    IPV6_NEXT_HEADER_ICMP,
55]
56
57# ICMP Protocol codes
58ICMP_DESTINATION_UNREACHABLE = 1
59ICMP_TIME_EXCEEDED = 3
60ICMP_ECHO_REQUEST = 128
61ICMP_ECHO_RESPONSE = 129
62
63# Default hop limit for IPv6
64HOP_LIMIT_DEFAULT = 64
65
66
67def calculate_checksum(data):
68    """ Calculate checksum from data bytes.
69
70    How to calculate checksum (RFC 2460):
71        https://tools.ietf.org/html/rfc2460#page-27
72
73    Args:
74        data (bytes): input data from which checksum will be calculated
75
76    Returns:
77        int: calculated checksum
78    """
79    # Create halfwords from data bytes. Example: data[0] = 0x01, data[1] =
80    # 0xb2 => 0x01b2
81    halfwords = [((byte0 << 8) | byte1) for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00)]
82
83    checksum = 0
84    for halfword in halfwords:
85        checksum += halfword
86        checksum = (checksum & 0xffff) + (checksum >> 16)
87
88    checksum ^= 0xffff
89
90    if checksum == 0:
91        return 0xffff
92    else:
93        return checksum
94
95
96class PacketFactory(object):
97    """ Interface for classes that produce objects from data. """
98
99    def parse(self, data, message_info):
100        """ Convert data to object.
101
102        Args:
103            data (BytesIO)
104            message_info (MessageInfo)
105
106        """
107        raise NotImplementedError
108
109
110class BuildableFromBytes(object):
111    """ Interface for classes which can be built from bytes. """
112
113    @classmethod
114    def from_bytes(cls, data):
115        """ Convert data to object.
116
117        Args:
118            data (bytes)
119
120        """
121        raise NotImplementedError
122
123
124class ConvertibleToBytes(object):
125    """ Interface for classes which can be converted to bytes. """
126
127    def to_bytes(self):
128        """ Convert object to data.
129
130        Returns:
131            bytes
132        """
133        raise NotImplementedError
134
135    def __len__(self):
136        """ Length of data (in bytes).
137
138        Returns:
139            int
140        """
141        raise NotImplementedError
142
143
144class Header(object):
145    """ Interface for header classes. """
146
147    __metaclass__ = abc.ABCMeta
148
149    @abc.abstractproperty
150    def type(self):
151        """ Number which can be used in the next header field in IPv6 header or next headers.
152
153        Returns:
154            int
155        """
156
157
158class ExtensionHeader(object):
159    """ Base for classes representing Extension Headers in IPv6 packets. """
160
161    def __init__(self, next_header, hdr_ext_len=0):
162        self.next_header = next_header
163        self.hdr_ext_len = hdr_ext_len
164
165
166class UpperLayerProtocol(Header, ConvertibleToBytes):
167    """ Base for classes representing upper layer protocol payload in IPv6 packets. """
168
169    def __init__(self, header):
170        self.header = header
171
172    @property
173    def checksum(self):
174        """ Return checksum from upper layer protocol header. """
175        return self.header.checksum
176
177    @checksum.setter
178    def checksum(self, value):
179        """ Set checksum value in upper layer protocol header. """
180        self.header.checksum = value
181
182    def is_valid_checksum(self):
183        """ Return information if set checksum is valid.
184
185        It is not possible to get zero from checksum calculation.
186        Zero indicates invalid checksum value.
187
188        Returns:
189            bool
190        """
191        return self.checksum != 0
192
193
194class IPv6PseudoHeader(ConvertibleToBytes):
195    """ Class representing IPv6 pseudo header which is required to calculate
196    upper layer protocol (like e.g. UDP or ICMPv6) checksum.
197
198    This class is used only during upper layer protocol checksum calculation. Do not use it outside of this module.
199
200    """
201
202    def __init__(self, source_address, destination_address, payload_length, next_header):
203        self._source_address = self._convert_to_ipaddress(source_address)
204        self._destination_address = self._convert_to_ipaddress(destination_address)
205        self.payload_length = payload_length
206        self.next_header = next_header
207
208    def _convert_to_ipaddress(self, value):
209        if isinstance(value, bytearray):
210            value = bytes(value)
211
212        return ip_address(value)
213
214    @property
215    def source_address(self):
216        return self._source_address
217
218    @source_address.setter
219    def source_address(self, value):
220        self._source_address = self._convert_to_ipaddress(value)
221
222    @property
223    def destination_address(self):
224        return self._destination_address
225
226    @destination_address.setter
227    def destination_address(self, value):
228        self._source_address = self._convert_to_ipaddress(value)
229
230    def to_bytes(self):
231        data = bytearray()
232        data += self.source_address.packed
233        data += self.destination_address.packed
234        data += struct.pack(">I", self.payload_length)
235        data += struct.pack(">I", self.next_header)
236
237        return data
238
239
240class IPv6Header(ConvertibleToBytes, BuildableFromBytes):
241    """ Class representing IPv6 packet header. """
242
243    _version = 6
244
245    _header_length = 40
246
247    def __init__(
248        self,
249        source_address,
250        destination_address,
251        traffic_class=0,
252        flow_label=0,
253        hop_limit=64,
254        payload_length=0,
255        next_header=0,
256    ):
257        self.version = self._version
258        self._source_address = self._convert_to_ipaddress(source_address)
259        self._destination_address = self._convert_to_ipaddress(destination_address)
260        self.traffic_class = traffic_class
261        self.flow_label = flow_label
262        self.hop_limit = hop_limit
263        self.payload_length = payload_length
264        self.next_header = next_header
265
266    def _convert_to_ipaddress(self, value):
267        if isinstance(value, bytearray):
268            value = bytes(value)
269
270        return ip_address(value)
271
272    @property
273    def source_address(self):
274        return self._source_address
275
276    @source_address.setter
277    def source_address(self, value):
278        self._source_address = self._convert_to_ipaddress(value)
279
280    @property
281    def destination_address(self):
282        return self._destination_address
283
284    def to_bytes(self):
285        data = bytearray([
286            ((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F),
287            ((self.traffic_class & 0x0F) << 4) | ((self.flow_label >> 16) & 0x0F),
288            ((self.flow_label >> 8) & 0xff),
289            ((self.flow_label & 0xff)),
290        ])
291        data += struct.pack(">H", self.payload_length)
292        data += bytearray([self.next_header, self.hop_limit])
293        data += self.source_address.packed
294        data += self.destination_address.packed
295
296        return data
297
298    @classmethod
299    def from_bytes(cls, data):
300        b = bytearray(data.read(4))
301
302        (b[0] >> 4) & 0x0F
303        traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F)
304        flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3]
305
306        payload_length = struct.unpack(">H", data.read(2))[0]
307        next_header = ord(data.read(1))
308        hop_limit = ord(data.read(1))
309        src_addr = bytearray(data.read(16))
310        dst_addr = bytearray(data.read(16))
311
312        return cls(
313            src_addr,
314            dst_addr,
315            traffic_class,
316            flow_label,
317            hop_limit,
318            payload_length,
319            next_header,
320        )
321
322    def __repr__(self):
323        return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, \
324            hop_limit={}, traffic_class={}, flow_label={})".format(
325            self.source_address.compressed,
326            self.destination_address.compressed,
327            self.next_header,
328            self.payload_length,
329            self.hop_limit,
330            self.traffic_class,
331            self.flow_label,
332        )
333
334    def __len__(self):
335        return self._header_length
336
337
338class IPv6Packet(ConvertibleToBytes):
339    """ Class representing IPv6 packet.
340
341    IPv6 packet consists of IPv6 header, optional extension header, and upper layer protocol.
342
343                                            IPv6 packet
344
345    +-------------+----------------------------------+----------------------------------------------+
346    |             |                                  |                                              |
347    | IPv6 header | extension headers (zero or more) | upper layer protocol (e.g. UDP, TCP, ICMPv6) |
348    |             |                                  |                                              |
349    +-------------+----------------------------------+----------------------------------------------+
350
351    Extension headers:
352        - HopByHop
353        - Routing header (not implemented in this module)
354        - Fragment Header
355
356    Upper layer protocols:
357        - ICMPv6
358        - UDP
359        - TCP (not implemented in this module)
360
361    Example:
362        IPv6 packet construction without extension headers:
363
364        ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
365                                 ICMPv6(ICMPv6Header(128, 0),
366                                        ICMPv6EchoBody(0, 2,  bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
367                                                                     0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
368                                                                     0x41, 0x41]))))
369
370        IPv6 packet construction with extension headers:
371
372        ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
373                                 ICMPv6(ICMPv6Header(128, 0),
374                                        ICMPv6EchoBody(0, 2,  bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
375                                                                     0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
376                                                                     0x41, 0x41])),
377                                 [HopByHop(options=[
378                                     HopByHopOption(HopByHopOptionHeader(_type=0x6d),
379                                                    MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
380                                 ])])
381
382    """
383
384    def __init__(self, ipv6_header, upper_layer_protocol, extension_headers=None):
385        self.ipv6_header = ipv6_header
386
387        self.upper_layer_protocol = upper_layer_protocol
388
389        self.extension_headers = (extension_headers if extension_headers is not None else [])
390
391        self._update_next_header_values_in_headers()
392
393        if not upper_layer_protocol.is_valid_checksum():
394            self.upper_layer_protocol.checksum = self.calculate_checksum()
395
396    def _validate_checksum(self):
397        checksum = self.calculate_checksum()
398
399        if self.upper_layer_protocol.checksum != checksum:
400            raise RuntimeError("Could not create IPv6 packet. "
401                               "Invalid checksum: {}!={}".format(self.upper_layer_protocol.checksum, checksum))
402
403        self.upper_layer_protocol.checksum = checksum
404
405    def _update_payload_length_value_in_ipv6_header(self):
406        self.ipv6_header.payload_length = len(self.upper_layer_protocol) + sum(
407            [len(extension_header) for extension_header in self.extension_headers])
408
409    def _update_next_header_values_in_headers(self):
410        last_header = self.ipv6_header
411
412        for extension_header in self.extension_headers:
413            last_header.next_header = extension_header.type
414            last_header = extension_header
415
416        last_header.next_header = self.upper_layer_protocol.type
417
418    def calculate_checksum(self):
419        saved_checksum = self.upper_layer_protocol.checksum
420
421        self.upper_layer_protocol.checksum = 0
422
423        upper_layer_protocol_bytes = self.upper_layer_protocol.to_bytes()
424
425        self.upper_layer_protocol.checksum = saved_checksum
426
427        pseudo_header = IPv6PseudoHeader(
428            self.ipv6_header.source_address,
429            self.ipv6_header.destination_address,
430            len(upper_layer_protocol_bytes),
431            self.upper_layer_protocol.type,
432        )
433
434        return calculate_checksum(pseudo_header.to_bytes() + upper_layer_protocol_bytes)
435
436    def to_bytes(self):
437        self._update_payload_length_value_in_ipv6_header()
438        self._update_next_header_values_in_headers()
439        self.upper_layer_protocol.checksum = self.calculate_checksum()
440
441        ipv6_packet = self.ipv6_header.to_bytes()
442
443        for extension_header in self.extension_headers:
444            ipv6_packet += extension_header.to_bytes()
445
446        ipv6_packet += self.upper_layer_protocol.to_bytes()
447
448        return ipv6_packet
449
450    def __repr__(self):
451        return "IPv6Packet(header={}, upper_layer_protocol={})".format(self.ipv6_header, self.upper_layer_protocol)
452
453
454class UDPHeader(ConvertibleToBytes, BuildableFromBytes):
455    """ Class representing UDP datagram header.
456
457    This header is required to construct UDP datagram.
458
459    """
460
461    _header_length = 8
462
463    def __init__(self, src_port, dst_port, payload_length=0, checksum=0):
464        self.src_port = src_port
465        self.dst_port = dst_port
466
467        self._payload_length = payload_length
468        self.checksum = checksum
469
470    @property
471    def type(self):
472        return 17
473
474    @property
475    def payload_length(self):
476        return self._payload_length
477
478    @payload_length.setter
479    def payload_length(self, value):
480        self._payload_length = self._header_length + value
481
482    def to_bytes(self):
483        data = struct.pack(">H", self.src_port)
484        data += struct.pack(">H", self.dst_port)
485        data += struct.pack(">H", self.payload_length)
486        data += struct.pack(">H", self.checksum)
487
488        return data
489
490    @classmethod
491    def from_bytes(cls, data):
492        src_port = struct.unpack(">H", data.read(2))[0]
493        dst_port = struct.unpack(">H", data.read(2))[0]
494        payload_length = struct.unpack(">H", data.read(2))[0]
495        checksum = struct.unpack(">H", data.read(2))[0]
496
497        return cls(src_port, dst_port, payload_length, checksum)
498
499    def __len__(self):
500        return self._header_length
501
502
503class UDPDatagram(UpperLayerProtocol):
504    """ Class representing UDP datagram.
505
506    UDP is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
507
508    This class consists of a UDP header and payload. The example below shows how a UDP datagram can be constructed.
509
510    Example:
511        udp_dgram = UDPDatagram(UDPHeader(src_port=19788, dst_port=19788),
512                                bytes([0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
513                                       0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03,
514                                       0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80,
515                                       0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef]))
516
517    """
518
519    @property
520    def type(self):
521        return 17
522
523    def __init__(self, header, payload):
524        super(UDPDatagram, self).__init__(header)
525        self.payload = payload
526
527    def to_bytes(self):
528        self.header.payload_length = len(self.payload)
529
530        data = bytearray()
531        data += self.header.to_bytes()
532        data += self.payload.to_bytes()
533        return data
534
535    def __len__(self):
536        return len(self.header) + len(self.payload)
537
538
539class ICMPv6Header(ConvertibleToBytes, BuildableFromBytes):
540    """ Class representing ICMPv6 message header.
541
542    This header is required to construct ICMPv6 message.
543
544    """
545
546    _header_length = 4
547
548    def __init__(self, _type, code, checksum=0):
549        self.type = _type
550        self.code = code
551
552        self.checksum = checksum
553
554    def to_bytes(self):
555        return bytearray([self.type, self.code]) + struct.pack(">H", self.checksum)
556
557    @classmethod
558    def from_bytes(cls, data):
559        _type = ord(data.read(1))
560        code = ord(data.read(1))
561        checksum = struct.unpack(">H", data.read(2))[0]
562
563        return cls(_type, code, checksum)
564
565    def __len__(self):
566        return self._header_length
567
568
569class ICMPv6(UpperLayerProtocol):
570    """ Class representing ICMPv6 message.
571
572    ICMPv6 is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
573
574    This class consists of an ICMPv6 header and body. The example below shows how an ICMPv6 message can be constructed.
575
576    Example:
577        icmpv6_msg = ICMPv6(ICMPv6Header(128, 0),
578                            ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
579                                                        0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
580                                                        0x41, 0x41])))
581
582    """
583
584    @property
585    def type(self):
586        return 58
587
588    def __init__(self, header, body):
589        super(ICMPv6, self).__init__(header)
590        self.body = body
591
592    def to_bytes(self):
593        return bytearray(self.header.to_bytes() + self.body.to_bytes())
594
595    def __len__(self):
596        return len(self.header) + len(self.body)
597
598
599class FragmentHeader(ExtensionHeader):
600    """ Class representing Fragment extension header.
601
602    +-------------+----------+-----------------+-----+---+----------------+
603    | Next Header | Reserved | Fragment Offset | Res | M | Identification |
604    +-------------+----------+-----------------+-----+---+----------------+
605
606    Fragment extention header consists of:
607        - next_header type (8 bit)
608        - fragment offset which is multiple of 8 (13 bit)
609        - more_flag to indicate further data (1 bit)
610        - identification for all associated fragments (32 bit)
611    """
612
613    @property
614    def type(self):
615        return 44
616
617    @property
618    def identification(self):
619        return self._identification
620
621    @property
622    def more_flag(self):
623        return self._more_flag
624
625    @property
626    def offset(self):
627        return self._fragm_offset
628
629    def __init__(self, next_header=None, fragm_offset=0, more_flag=False, identification=0):
630        super(FragmentHeader, self).__init__(next_header, 0)
631        self._fragm_offset = fragm_offset
632        self._more_flag = more_flag
633        self._identification = identification
634
635    def callculate_offset(self, position):
636        return position >> 3
637
638    def to_bytes(self):
639        data = bytearray([self.next_header, 0x00])
640        data += bytearray([self._fragm_offset >> 5, ((self._fragm_offset << 3) | self._more_flag) & 0xff])
641        data += struct.pack(">I", self._identification)
642
643        return data
644
645    @classmethod
646    def from_bytes(cls, data):
647        next_header = struct.unpack(">B", data.read(1))[0]
648        struct.unpack(">B", data.read(1))[0]  # reserved
649        fragment_offset = struct.unpack(">H", data.read(2))[0]
650        more_flag = fragment_offset & 0x1
651        identificaton = struct.unpack(">I", data.read(4))[0]
652
653        fragment_offset = fragment_offset >> 3
654
655        return cls(next_header, fragment_offset, more_flag, identificaton)
656
657    def __len__(self):
658        return 64
659
660
661class HopByHop(ExtensionHeader):
662    """ Class representing HopByHop extension header.
663
664    HopByHop extension header consists of:
665        - next_header type
666        - extension header length which is multiple of 8
667        - options
668
669    """
670
671    _one_byte_padding = 0x00
672    _many_bytes_padding = 0x01
673
674    @property
675    def type(self):
676        return 0
677
678    def __init__(self, next_header=None, options=None, hdr_ext_len=None):
679        super(HopByHop, self).__init__(next_header, hdr_ext_len)
680        self.options = options if options is not None else []
681
682        if hdr_ext_len is not None:
683            self.hdr_ext_len = hdr_ext_len
684        else:
685            payload_length = self._calculate_payload_length()
686            self.hdr_ext_len = self._calculate_hdr_ext_len(payload_length)
687
688    def _calculate_payload_length(self):
689        payload_length = 2
690
691        for option in self.options:
692            payload_length += len(option)
693
694        return payload_length
695
696    def _calculate_hdr_ext_len(self, payload_length):
697        count = payload_length >> 3
698
699        if (payload_length & 0x7) == 0 and count > 0:
700            return count - 1
701
702        return count
703
704    def to_bytes(self):
705        data = bytearray([self.next_header, self.hdr_ext_len])
706
707        for option in self.options:
708            data += option.to_bytes()
709
710        # Padding
711        #
712        # More details:
713        #   https://tools.ietf.org/html/rfc2460#section-4.2
714        #
715        excess_bytes = len(data) & 0x7
716
717        if excess_bytes > 0:
718            padding_length = 8 - excess_bytes
719
720            if padding_length == 1:
721                data += bytearray([self._one_byte_padding])
722
723            else:
724                padding_length -= 2
725                data += bytearray([self._many_bytes_padding, padding_length])
726                data += bytearray([0x00 for _ in range(padding_length)])
727
728        return data
729
730    def __len__(self):
731        """ HopByHop extension header length
732
733        More details:
734            https://tools.ietf.org/html/rfc2460#section-4.3
735
736        """
737        return (self.hdr_ext_len + 1) * 8
738
739
740class HopByHopOptionHeader(ConvertibleToBytes, BuildableFromBytes):
741    """ Class representing HopByHop option header. """
742
743    _header_length = 2
744
745    def __init__(self, _type, length=None):
746        self.type = _type
747        self.length = length if length is not None else 0
748
749    def to_bytes(self):
750        return bytearray([self.type, self.length])
751
752    @classmethod
753    def from_bytes(cls, data):
754        _type = ord(data.read(1))
755        length = ord(data.read(1))
756        return cls(_type, length)
757
758    def __len__(self):
759        return self._header_length
760
761    def __repr__(self):
762        return "HopByHopOptionHeader(type={}, length={})".format(self.type, self.length)
763
764
765class HopByHopOption(ConvertibleToBytes):
766    """ Class representing HopByHop option.
767
768    Class consists of two elements: HopByHopOptionHeader and value (e.g. for MPLOption).
769
770    The following example shows how any HopByHop option can be constructed.
771
772    Example:
773        HopByHop(next_header=0x3a,
774                 options=[HopByHopOption(HopByHopOptionHeader(_type=0x6d),
775                                         MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
776
777    """
778
779    def __init__(self, header, value):
780        self.value = value
781
782        self.header = header
783        self.header.length = len(self.value)
784
785    def to_bytes(self):
786        return self.header.to_bytes() + self.value.to_bytes()
787
788    def __len__(self):
789        return len(self.header) + len(self.value)
790
791    def __repr__(self):
792        return "HopByHopOption(header={}, value={})".format(self.header, self.value)
793
794
795class MPLOption(ConvertibleToBytes):
796    """ Class representing MPL option. """
797
798    _header_length = 2
799
800    _seed_id_length = {0: 0, 1: 2, 2: 8, 3: 16}
801
802    def __init__(self, S, M, V, sequence, seed_id):
803        self.S = S
804        self.M = M
805        self.V = V
806        self.sequence = sequence
807        self.seed_id = seed_id
808
809    def to_bytes(self):
810        smv = (((self.S & 0x03) << 6) | ((self.M & 0x01) << 5) | ((self.V & 0x01) << 4))
811
812        return bytearray([smv, self.sequence]) + self.seed_id
813
814    @classmethod
815    def from_bytes(cls, data):
816        b = ord(data.read(1))
817
818        s = (b >> 6) & 0x03
819        m = (b >> 5) & 0x01
820        v = (b >> 4) & 0x01
821
822        sequence = ord(data.read(1))
823        seed_id = data.read(cls._seed_id_length[s])
824
825        return cls(s, m, v, sequence, seed_id)
826
827    def __len__(self):
828        return self._header_length + self._seed_id_length[self.S]
829
830    def __repr__(self):
831        return "MPLOption(S={}, M={}, V={}, sequence={}, seed_id={})".format(self.S, self.M, self.V, self.sequence,
832                                                                             hexlify(self.seed_id))
833
834
835class IPv6PacketFactory(PacketFactory):
836    """ Factory that produces IPv6 packets from data.
837
838    This factory must be initialized with factories which allow to parse extension headers and upper layer protocols.
839
840    The following example shows preferable setup of IPv6PacketFactory.
841
842    Header types:
843        0: HopByHop
844        17: UDP
845        58: ICMPv6
846
847    Option types:
848        109: MPL
849
850    ICMPv6 body types:
851        128: Echo request
852        129: Echo response
853
854    Example usage:
855
856        ipv6_factory = IPv6PacketFactory(
857            ehf={
858                0: HopByHopFactory(options_factories={
859                    109: MPLOptionFactory()
860                })
861            },
862            ulpf={
863                17: UDPDatagramFactory(dst_port_factories={
864                    19788: MLEMessageFactory(),
865                    19789: CoAPMessageFactory()
866                }),
867                58: ICMPv6Factory(body_factories={
868                    128: ICMPv6EchoBodyFactory(),
869                    129: ICMPv6EchoBodyFactory()
870                })
871            }
872        )
873
874    """
875
876    def __init__(self, ehf=None, ulpf=None):
877        """
878        ehf - Extension Header Factory
879        ulpf - Upper Layer Protocol Factory
880
881        Args:
882            ehf(dict[int: PacketFactory]): Dictionary mapping extension header types on specialized factories.
883            ulpf(dict[int: PacketFactory]): Dictionary mapping upper layer protocol types on specialized factories.
884        """
885        self._ehf = ehf if ehf is not None else {}
886        self._ulpf = ulpf if ulpf is not None else {}
887
888    def _is_extension_header(self, header_type):
889        return header_type not in UPPER_LAYER_PROTOCOLS
890
891    def _get_extension_header_factory_for(self, next_header):
892        try:
893            return self._ehf[next_header]
894        except KeyError:
895            raise RuntimeError("Could not get Extension Header factory for next_header={}.".format(next_header))
896
897    def _get_upper_layer_protocol_factory_for(self, next_header):
898        try:
899            return self._ulpf[next_header]
900        except KeyError:
901            raise RuntimeError("Could not get Upper Layer Protocol factory for next_header={}.".format(next_header))
902
903    def _parse_extension_headers(self, data, next_header, message_info):
904        extension_headers = []
905
906        while self._is_extension_header(next_header):
907            factory = self._get_extension_header_factory_for(next_header)
908
909            extension_header = factory.parse(data, message_info)
910
911            next_header = extension_header.next_header
912
913            extension_headers.append(extension_header)
914
915        return next_header, extension_headers
916
917    def _parse_upper_layer_protocol(self, data, next_header, message_info):
918        factory = self._get_upper_layer_protocol_factory_for(next_header)
919
920        return factory.parse(data, message_info)
921
922    def parse(self, data, message_info):
923        ipv6_header = IPv6Header.from_bytes(data)
924
925        message_info.source_ipv6 = ipv6_header.source_address
926        message_info.destination_ipv6 = ipv6_header.destination_address
927
928        next_header, extension_headers = self._parse_extension_headers(data, ipv6_header.next_header, message_info)
929
930        upper_layer_protocol = self._parse_upper_layer_protocol(data, next_header, message_info)
931
932        return IPv6Packet(ipv6_header, upper_layer_protocol, extension_headers)
933
934
935class HopByHopOptionsFactory(object):
936    """ Factory that produces HopByHop options. """
937
938    _one_byte_padding = 0x00
939    _many_bytes_padding = 0x01
940
941    def __init__(self, options_factories=None):
942        self._options_factories = (options_factories if options_factories is not None else {})
943
944    def _get_HopByHopOption_value_factory(self, _type):
945        try:
946            return self._options_factories[_type]
947        except KeyError:
948            raise RuntimeError("Could not find HopByHopOption value factory for type={}.".format(_type))
949
950    def parse(self, data, message_info):
951        options = []
952
953        while data.tell() < len(data.getvalue()):
954            option_header = HopByHopOptionHeader.from_bytes(data)
955
956            if option_header.type == self._one_byte_padding:
957                # skip one byte padding
958                data.read(1)
959
960            elif option_header.type == self._many_bytes_padding:
961                # skip n bytes padding
962                data.read(option_header.length)
963
964            else:
965                factory = self._get_HopByHopOption_value_factory(option_header.type)
966
967                option_data = data.read(option_header.length)
968
969                option = HopByHopOption(
970                    option_header,
971                    factory.parse(io.BytesIO(option_data), message_info),
972                )
973
974                options.append(option)
975
976        return options
977
978
979class HopByHopFactory(PacketFactory):
980    """ Factory that produces HopByHop extension headers from data. """
981
982    def __init__(self, hop_by_hop_options_factory):
983        self._hop_by_hop_options_factory = hop_by_hop_options_factory
984
985    def _calculate_extension_header_length(self, hdr_ext_len):
986        return (hdr_ext_len + 1) * 8
987
988    def parse(self, data, message_info):
989        next_header = ord(data.read(1))
990
991        hdr_ext_len = ord(data.read(1))
992
993        # Note! Two bytes were read (next_header and hdr_ext_len) so they must
994        # be substracted from header length
995        hop_by_hop_length = (self._calculate_extension_header_length(hdr_ext_len) - 2)
996
997        hop_by_hop_data = data.read(hop_by_hop_length)
998
999        options = self._hop_by_hop_options_factory.parse(io.BytesIO(hop_by_hop_data), message_info)
1000
1001        hop_by_hop = HopByHop(next_header, options, hdr_ext_len)
1002
1003        message_info.payload_length += len(hop_by_hop)
1004
1005        return hop_by_hop
1006
1007
1008class MPLOptionFactory(PacketFactory):
1009    """ Factory that produces MPL options for HopByHop extension header. """
1010
1011    def parse(self, data, message_info):
1012        return MPLOption.from_bytes(data)
1013
1014
1015class UDPHeaderFactory:
1016    """ Factory that produces UDP header. """
1017
1018    def parse(self, data, message_info):
1019        return UDPHeader.from_bytes(data)
1020
1021
1022class UdpBasedOnSrcDstPortsPayloadFactory:
1023
1024    # TODO: Unittests
1025    """ Factory that produces UDP payload. """
1026
1027    def __init__(self, src_dst_port_based_payload_factories):
1028        """
1029        Args:
1030            src_dst_port_based_payload_factories (PacketFactory):
1031                Factories parse UDP payload based on source or
1032                destination port.
1033        """
1034        self._factories = src_dst_port_based_payload_factories
1035
1036    def parse(self, data, message_info):
1037        factory = None
1038
1039        if message_info.dst_port in self._factories:
1040            factory = self._factories[message_info.dst_port]
1041
1042        if message_info.src_port in self._factories:
1043            factory = self._factories[message_info.src_port]
1044
1045        if message_info.dst_port == common.UDP_TEST_PORT:
1046            # Ignore traffic targeted to test port
1047            return None
1048        elif factory is None:
1049            raise RuntimeError("Could not find factory to build UDP payload.")
1050
1051        return factory.parse(data, message_info)
1052
1053
1054class UDPDatagramFactory(PacketFactory):
1055
1056    # TODO: Unittests
1057    """ Factory that produces UDP datagrams. """
1058
1059    def __init__(self, udp_header_factory, udp_payload_factory):
1060        self._udp_header_factory = udp_header_factory
1061        self._udp_payload_factory = udp_payload_factory
1062
1063    def parse(self, data, message_info):
1064        header = self._udp_header_factory.parse(data, message_info)
1065
1066        # Update message payload length: UDP header (8B) + payload length
1067        message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
1068
1069        message_info.src_port = header.src_port
1070        message_info.dst_port = header.dst_port
1071
1072        payload = self._udp_payload_factory.parse(data, message_info)
1073
1074        return UDPDatagram(header, payload)
1075
1076
1077class ICMPv6Factory(PacketFactory):
1078    """ Factory that produces ICMPv6 messages from data. """
1079
1080    def __init__(self, body_factories=None):
1081        self._body_factories = (body_factories if body_factories is not None else {})
1082
1083    def _get_icmpv6_body_factory(self, _type):
1084        try:
1085            return self._body_factories[_type]
1086
1087        except KeyError:
1088            if "default" not in self._body_factories:
1089                raise RuntimeError("Could not find specialized factory to parse ICMP body. "
1090                                   "Unsupported ICMP type: {}".format(_type))
1091
1092            default_factory = self._body_factories["default"]
1093
1094            print("Could not find specialized factory to parse ICMP body. "
1095                  "Take the default one: {}".format(type(default_factory)))
1096
1097            return default_factory
1098
1099    def parse(self, data, message_info):
1100        header = ICMPv6Header.from_bytes(data)
1101
1102        factory = self._get_icmpv6_body_factory(header.type)
1103
1104        message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
1105
1106        return ICMPv6(header, factory.parse(data, message_info))
1107
1108
1109class ICMPv6EchoBodyFactory(PacketFactory):
1110    """ Factory that produces ICMPv6 echo message body. """
1111
1112    def parse(self, data, message_info):
1113        return ICMPv6EchoBody.from_bytes(data)
1114
1115
1116class BytesPayload(ConvertibleToBytes, BuildableFromBytes):
1117    """ Class representing bytes payload. """
1118
1119    def __init__(self, data):
1120        self.data = data
1121
1122    def to_bytes(self):
1123        return bytearray(self.data)
1124
1125    @classmethod
1126    def from_bytes(cls, data):
1127        return cls(data)
1128
1129    def __len__(self):
1130        return len(self.data)
1131
1132
1133class BytesPayloadFactory(PacketFactory):
1134    """ Factory that produces bytes payload. """
1135
1136    def parse(self, data, message_info):
1137        return BytesPayload(data.read())
1138
1139
1140class ICMPv6EchoBody(ConvertibleToBytes, BuildableFromBytes):
1141    """ Class representing body of ICMPv6 echo messages. """
1142
1143    _header_length = 4
1144
1145    def __init__(self, identifier, sequence_number, data):
1146        self.identifier = identifier
1147        self.sequence_number = sequence_number
1148        self.data = data
1149
1150    def to_bytes(self):
1151        data = struct.pack(">H", self.identifier)
1152        data += struct.pack(">H", self.sequence_number)
1153        data += self.data
1154        return data
1155
1156    @classmethod
1157    def from_bytes(cls, data):
1158        identifier = struct.unpack(">H", data.read(2))[0]
1159        sequence_number = struct.unpack(">H", data.read(2))[0]
1160
1161        return cls(identifier, sequence_number, data.read())
1162
1163    def __len__(self):
1164        return self._header_length + len(self.data)
1165
1166
1167class ICMPv6DestinationUnreachableFactory(PacketFactory):
1168    """ Factory that produces ICMPv6 echo message body. """
1169
1170    def parse(self, data, message_info):
1171        return ICMPv6DestinationUnreachable.from_bytes(data)
1172
1173
1174class ICMPv6DestinationUnreachable(ConvertibleToBytes, BuildableFromBytes):
1175    """ Class representing body of ICMPv6 Destination Unreachable messages. """
1176
1177    _header_length = 4
1178    _unused = 0
1179
1180    def __init__(self, data):
1181        self.data = data
1182
1183    def to_bytes(self):
1184        data = bytearray(struct.pack(">I", self._unused))
1185        data += self.data
1186        return data
1187
1188    @classmethod
1189    def from_bytes(cls, data):
1190        unused = struct.unpack(">I", data.read(4))[0]
1191        if unused != 0:
1192            raise RuntimeError(
1193                "Invalid value of unused field in the ICMPv6 Destination Unreachable data. Expected value: 0.")
1194
1195        return cls(bytearray(data.read()))
1196
1197    def __len__(self):
1198        return self._header_length + len(self.data)
1199