1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import io 31import random 32import struct 33import unittest 34 35import common 36import config 37import ipv6 38import lowpan 39 40 41def create_default_lowpan_parser(context_manager): 42 return lowpan.LowpanParser( 43 lowpan_mesh_header_factory=lowpan.LowpanMeshHeaderFactory(), 44 lowpan_decompressor=config.create_default_lowpan_decompressor(context_manager), 45 lowpan_fragements_buffers_manager=lowpan.LowpanFragmentsBuffersManager(), 46 ipv6_packet_factory=ipv6.IPv6PacketFactory( 47 ehf=config.create_default_ipv6_extension_headers_factories(), 48 ulpf={ 49 17: 50 ipv6.UDPDatagramFactory(udp_header_factory=ipv6.UDPHeaderFactory(), 51 udp_payload_factory=ipv6.BytesPayloadFactory()), 52 58: 53 ipv6.ICMPv6Factory(body_factories=config.create_default_ipv6_icmp_body_factories()) 54 })) 55 56 57def any_tf(): 58 return random.getrandbits(2) 59 60 61def any_nh(): 62 return random.getrandbits(1) 63 64 65def any_hlim(): 66 return random.getrandbits(2) 67 68 69def any_cid(): 70 return random.getrandbits(1) 71 72 73def any_sac(): 74 return random.getrandbits(1) 75 76 77def any_sam(): 78 return random.getrandbits(2) 79 80 81def any_m(): 82 return random.getrandbits(1) 83 84 85def any_dac(): 86 return random.getrandbits(1) 87 88 89def any_dam(): 90 return random.getrandbits(2) 91 92 93def any_ecn(): 94 return random.getrandbits(2) 95 96 97def any_dscp(): 98 return random.getrandbits(6) 99 100 101def any_flow_label(): 102 return random.getrandbits(6) 103 104 105def any_hop_limit(): 106 return random.getrandbits(8) 107 108 109def any_src_addr(): 110 return bytearray([random.getrandbits(8) for _ in range(16)]) 111 112 113def any_dst_addr(): 114 return bytearray([random.getrandbits(8) for _ in range(16)]) 115 116 117def any_eui64(): 118 return bytearray([random.getrandbits(8) for _ in range(8)]) 119 120 121def any_rloc16(): 122 return bytearray([random.getrandbits(8) for _ in range(2)]) 123 124 125def any_48bits_addr(): 126 return bytearray([random.getrandbits(8) for _ in range(6)]) 127 128 129def any_32bits_addr(): 130 return bytearray([random.getrandbits(8) for _ in range(4)]) 131 132 133def any_8bits_addr(): 134 return bytearray([random.getrandbits(8)]) 135 136 137def any_c(): 138 return random.getrandbits(1) 139 140 141def any_p(): 142 return random.getrandbits(2) 143 144 145def any_src_port(): 146 return random.getrandbits(16) 147 148 149def any_dst_port(): 150 return random.getrandbits(16) 151 152 153def any_compressable_src_port(): 154 return 0xf000 + random.getrandbits(8) 155 156 157def any_compressable_dst_port(): 158 return 0xf000 + random.getrandbits(8) 159 160 161def any_nibble_src_port(): 162 return 0xf0b0 + random.getrandbits(4) 163 164 165def any_nibble_dst_port(): 166 return 0xf0b0 + random.getrandbits(4) 167 168 169def any_checksum(): 170 return random.getrandbits(16) 171 172 173def any_next_header(): 174 return random.getrandbits(8) 175 176 177def any_sci(): 178 return random.getrandbits(4) 179 180 181def any_dci(): 182 return random.getrandbits(4) 183 184 185def any_src_mac_addr(): 186 return bytearray([random.getrandbits(8) for _ in range(8)]) 187 188 189def any_dst_mac_addr(): 190 return bytearray([random.getrandbits(8) for _ in range(8)]) 191 192 193def any_context(): 194 prefix = bytearray([random.getrandbits(8) for _ in range(random.randint(2, 15))]) 195 prefix_length = len(prefix) 196 return lowpan.Context(prefix, prefix_length * 8) 197 198 199def any_mac_address(): 200 length = random.choice([2, 8]) 201 if length == 2: 202 return common.MacAddress.from_rloc16(bytearray([random.getrandbits(8) for _ in range(length)])) 203 elif length == 8: 204 return common.MacAddress.from_eui64(bytearray([random.getrandbits(8) for _ in range(length)])) 205 206 207def any_hops_left(): 208 return random.getrandbits(8) 209 210 211def any_data(length=None): 212 length = length if length is not None else random.randint(1, 64) 213 return bytearray([random.getrandbits(8) for _ in range(length)]) 214 215 216def any_datagram_size(): 217 return random.getrandbits(11) 218 219 220def any_datagram_tag(): 221 return random.getrandbits(16) 222 223 224def any_datagram_offset(): 225 return random.getrandbits(8) 226 227 228class TestLowpanIPHC(unittest.TestCase): 229 230 def test_should_create_LowpanIPHC_object_when_from_bytes_classmethod_called(self): 231 # GIVEN 232 tf = any_tf() 233 nh = any_nh() 234 hlim = any_hlim() 235 cid = any_cid() 236 sac = any_sac() 237 sam = any_sam() 238 m = any_m() 239 dac = any_dac() 240 dam = any_dam() 241 242 byte0 = (3 << 5) | (tf << 3) | (nh << 2) | hlim 243 byte1 = (cid << 7) | (sac << 6) | (sam << 4) | (m << 3) | (dac << 2) | dam 244 245 data_bytes = bytearray([byte0, byte1]) 246 247 # WHEN 248 actual = lowpan.LowpanIPHC.from_bytes(data_bytes) 249 250 # THEN 251 self.assertEqual(tf, actual.tf) 252 self.assertEqual(nh, actual.nh) 253 self.assertEqual(hlim, actual.hlim) 254 self.assertEqual(cid, actual.cid) 255 self.assertEqual(sac, actual.sac) 256 self.assertEqual(sam, actual.sam) 257 self.assertEqual(m, actual.m) 258 self.assertEqual(dac, actual.dac) 259 self.assertEqual(dam, actual.dam) 260 261 262class TestLowpanParser(unittest.TestCase): 263 264 def test_should_parse_6lo_with_mesh_hdr_that_contains_hlim_stored_on_2_bytes_when_decompress_method_called(self): 265 # GIVEN 266 lowpan_packet = bytearray([ 267 0xbf, 0x13, 0x90, 0x00, 0x48, 0x01, 0x7c, 0x77, 0x3f, 0xf2, 0xbf, 0xc0, 0x00, 0x24, 0xb1, 0x62, 0x44, 0x02, 268 0xf0, 0xba, 0x0d, 0xff, 0x04, 0x01, 0x00, 0x02, 0x02, 0x08, 0x00, 0x07, 0x09, 0x50, 0x20, 0x00, 0x20, 0x00, 269 0x08, 0x00, 0x00, 0x00 270 ]) 271 272 ipv6_packet = bytearray([ 273 0x60, 0x00, 0x00, 0x00, 0x00, 0x21, 0x11, 0x3f, 0xfd, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 274 0x00, 0xff, 0xfe, 0x00, 0x90, 0x00, 0xfd, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 275 0xfe, 0x00, 0x48, 0x01, 0xf0, 0xbf, 0xc0, 0x00, 0x00, 0x21, 0xe2, 0xdd, 0x62, 0x44, 0x02, 0xf0, 0xba, 0x0d, 276 0xff, 0x04, 0x01, 0x00, 0x02, 0x02, 0x08, 0x00, 0x07, 0x09, 0x50, 0x20, 0x00, 0x20, 0x00, 0x08, 0x00, 0x00, 277 0x00 278 ]) 279 280 message_info = common.MessageInfo() 281 message_info.source_mac_address = common.MacAddress.from_eui64( 282 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 283 message_info.destination_mac_address = common.MacAddress.from_eui64( 284 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 285 286 context_manager = lowpan.ContextManager() 287 context_manager[0] = lowpan.Context(prefix="fd00:db8::/64") 288 289 parser = create_default_lowpan_parser(context_manager) 290 291 # WHEN 292 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 293 294 # THEN 295 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 296 297 def test_should_parse_6lo_with_uncompressed_udp_and_without_hbh_when_decompress_method_called(self): 298 # GIVEN 299 lowpan_packet = bytearray([ 300 0x7a, 0x33, 0x11, 0x16, 0x33, 0x16, 0x34, 0x00, 0x14, 0xcf, 0x63, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 301 0x04, 0x4e, 0x92, 0xbb, 0x53 302 ]) 303 304 ipv6_packet = bytearray([ 305 0x60, 0x00, 0x00, 0x00, 0x00, 0x14, 0x11, 0x40, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 306 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 307 0xfe, 0xac, 0xff, 0x17, 0x16, 0x33, 0x16, 0x34, 0x00, 0x14, 0xcf, 0x63, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 308 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 309 ]) 310 311 message_info = common.MessageInfo() 312 message_info.source_mac_address = common.MacAddress.from_eui64( 313 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 314 message_info.destination_mac_address = common.MacAddress.from_eui64( 315 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 316 317 parser = create_default_lowpan_parser(context_manager=None) 318 319 # WHEN 320 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 321 322 # THEN 323 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 324 325 def test_should_parse_6lo_with_compressed_udp_and_without_hbh_when_decompress_method_called(self): 326 # GIVEN 327 lowpan_packet = bytearray([ 328 0x7e, 0x33, 0xf0, 0x16, 0x33, 0x16, 0x34, 0x04, 0xd2, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 329 0x92, 0xbb, 0x53 330 ]) 331 332 ipv6_packet = bytearray([ 333 0x60, 0x00, 0x00, 0x00, 0x00, 0x14, 0x11, 0x40, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 334 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 335 0xfe, 0xac, 0xff, 0x17, 0x16, 0x33, 0x16, 0x34, 0x00, 0x14, 0xcf, 0x63, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 336 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 337 ]) 338 339 message_info = common.MessageInfo() 340 message_info.source_mac_address = common.MacAddress.from_eui64( 341 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 342 message_info.destination_mac_address = common.MacAddress.from_eui64( 343 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 344 345 parser = create_default_lowpan_parser(context_manager=None) 346 347 # WHEN 348 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 349 350 # THEN 351 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 352 353 def test_should_parse_6lo_with_uncompressed_udp_and_with_uncompressed_hbh_when_decompress_method_called(self): 354 # GIVEN 355 lowpan_packet = bytearray([ 356 0x7a, 0x33, 0x00, 0x11, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x16, 0x33, 0x16, 0x34, 0x00, 0x0c, 0x04, 357 0xd2, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 358 ]) 359 360 ipv6_packet = bytearray([ 361 0x60, 0x00, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x40, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 362 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 363 0xfe, 0xac, 0xff, 0x17, 0x11, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x16, 0x33, 0x16, 0x34, 0x00, 0x14, 364 0xcf, 0x63, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 365 ]) 366 367 message_info = common.MessageInfo() 368 message_info.source_mac_address = common.MacAddress.from_eui64( 369 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 370 message_info.destination_mac_address = common.MacAddress.from_eui64( 371 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 372 373 parser = create_default_lowpan_parser(context_manager=None) 374 375 # WHEN 376 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 377 378 # THEN 379 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 380 381 def test_should_parse_6lo_with_uncompressed_udp_and_with_compressed_hbh_when_decompress_method_called(self): 382 # GIVEN 383 lowpan_packet = bytearray([ 384 0x7e, 0x33, 0xe0, 0x11, 0x06, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x16, 0x33, 0x16, 0x34, 0x00, 0x0c, 0x04, 385 0xd2, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 386 ]) 387 388 ipv6_packet = bytearray([ 389 0x60, 0x00, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x40, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 390 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 391 0xfe, 0xac, 0xff, 0x17, 0x11, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x16, 0x33, 0x16, 0x34, 0x00, 0x14, 392 0xcf, 0x63, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 393 ]) 394 395 message_info = common.MessageInfo() 396 message_info.source_mac_address = common.MacAddress.from_eui64( 397 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 398 message_info.destination_mac_address = common.MacAddress.from_eui64( 399 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 400 401 parser = create_default_lowpan_parser(context_manager=None) 402 403 # WHEN 404 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 405 406 # THEN 407 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 408 409 def test_should_parse_6lo_with_compressed_udp_and_with_compressed_hbh_when_decompress_method_called(self): 410 # GIVEN 411 lowpan_packet = bytearray([ 412 0x7e, 0x33, 0xe1, 0x06, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0xf0, 0x16, 0x33, 0x16, 0x34, 0x04, 0xd2, 0x80, 413 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 414 ]) 415 416 ipv6_packet = bytearray([ 417 0x60, 0x00, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x40, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 418 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 419 0xfe, 0xac, 0xff, 0x17, 0x11, 0x00, 0x6d, 0x04, 0x40, 0x02, 0x00, 0x18, 0x16, 0x33, 0x16, 0x34, 0x00, 0x14, 420 0xcf, 0x63, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 421 ]) 422 423 message_info = common.MessageInfo() 424 message_info.source_mac_address = common.MacAddress.from_eui64( 425 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 426 message_info.destination_mac_address = common.MacAddress.from_eui64( 427 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 428 429 parser = create_default_lowpan_parser(context_manager=None) 430 431 # WHEN 432 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 433 434 # THEN 435 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 436 437 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called(self): 438 # GIVEN 439 lowpan_packet = bytearray([ 440 0x7a, 0xd5, 0xaa, 0x3a, 0x02, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x01, 0x36, 0x29, 0x96, 0xff, 0xfe, 0xac, 441 0xff, 0x18, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 442 ]) 443 444 ipv6_packet = bytearray([ 445 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 446 0x99, 0xff, 0xfe, 0x22, 0x11, 0x01, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 447 0xfe, 0xac, 0xff, 0x18, 0x80, 0x00, 0x97, 0xf3, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 448 ]) 449 450 message_info = common.MessageInfo() 451 message_info.source_mac_address = common.MacAddress.from_eui64( 452 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 453 message_info.destination_mac_address = common.MacAddress.from_eui64( 454 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 455 456 context_manager = lowpan.ContextManager() 457 context_manager[10] = lowpan.Context(prefix="2000:0db8::/64") 458 459 parser = create_default_lowpan_parser(context_manager) 460 461 # WHEN 462 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 463 464 # THEN 465 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 466 467 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_1(self): 468 # GIVEN 469 lowpan_packet = bytearray([ 470 0x7a, 0xd5, 0xaa, 0x3a, 0x02, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x01, 0x36, 0x29, 0x96, 0xff, 0xfe, 0xac, 471 0xff, 0x18, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 472 ]) 473 474 ipv6_packet = bytearray([ 475 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 476 0x99, 0xff, 0xfe, 0x22, 0x11, 0x01, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 477 0xfe, 0xac, 0xff, 0x18, 0x80, 0x00, 0x97, 0xf3, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 478 ]) 479 480 message_info = common.MessageInfo() 481 message_info.source_mac_address = common.MacAddress.from_eui64( 482 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 483 message_info.destination_mac_address = common.MacAddress.from_eui64( 484 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 485 486 context_manager = lowpan.ContextManager() 487 context_manager[10] = lowpan.Context(prefix="2000:0db8::/64") 488 489 parser = create_default_lowpan_parser(context_manager) 490 491 # WHEN 492 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 493 494 # THEN 495 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 496 497 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_2(self): 498 # GIVEN 499 lowpan_packet = bytearray([ 500 0x7a, 0xf0, 0xa0, 0x3a, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 0xfe, 0xdd, 501 0x2a, 0xfe, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 502 ]) 503 504 ipv6_packet = bytearray([ 505 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 506 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 507 0xfe, 0xdd, 0x2a, 0xfe, 0x80, 0x00, 0xb3, 0xf3, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 508 ]) 509 510 message_info = common.MessageInfo() 511 message_info.source_mac_address = common.MacAddress.from_eui64( 512 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 513 message_info.destination_mac_address = common.MacAddress.from_eui64( 514 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 515 516 context_manager = lowpan.ContextManager() 517 context_manager[10] = lowpan.Context(prefix="2000:0db8::/64") 518 519 parser = create_default_lowpan_parser(context_manager) 520 521 # WHEN 522 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 523 524 # THEN 525 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 526 527 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_3(self): 528 # GIVEN 529 lowpan_packet = bytearray([ 530 0x7a, 0xd5, 0xaa, 0x3a, 0x02, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x01, 0x36, 0x29, 0x96, 0xff, 0xfe, 0xac, 531 0xff, 0x18, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 532 ]) 533 534 ipv6_packet = bytearray([ 535 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 536 0x99, 0xff, 0xfe, 0x22, 0x11, 0x01, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 537 0xfe, 0xac, 0xff, 0x18, 0x80, 0x00, 0x97, 0xf3, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 538 ]) 539 540 message_info = common.MessageInfo() 541 message_info.source_mac_address = common.MacAddress.from_eui64( 542 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 543 message_info.destination_mac_address = common.MacAddress.from_eui64( 544 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 545 546 context_manager = lowpan.ContextManager() 547 context_manager[10] = lowpan.Context(prefix="2000:0db8::/64") 548 549 parser = create_default_lowpan_parser(context_manager) 550 551 # WHEN 552 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 553 554 # THEN 555 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 556 557 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_4(self): 558 # GIVEN 559 lowpan_packet = bytearray([ 560 0x7a, 0xf5, 0xaa, 0x3a, 0x36, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x18, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 561 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 562 ]) 563 564 ipv6_packet = bytearray([ 565 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 566 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x36, 0x29, 0x96, 0xff, 567 0xfe, 0xac, 0xff, 0x18, 0x80, 0x00, 0x97, 0xf4, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 568 ]) 569 570 message_info = common.MessageInfo() 571 message_info.source_mac_address = common.MacAddress.from_eui64( 572 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 573 message_info.destination_mac_address = common.MacAddress.from_eui64( 574 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 575 576 context_manager = lowpan.ContextManager() 577 context_manager[10] = lowpan.Context(prefix="2000:0db8::/64") 578 579 parser = create_default_lowpan_parser(context_manager) 580 581 # WHEN 582 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 583 584 # THEN 585 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 586 587 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_5(self): 588 # GIVEN 589 lowpan_packet = bytearray( 590 [0x7a, 0xf7, 0xac, 0x3a, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53]) 591 592 ipv6_packet = bytearray([ 593 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x02, 0x99, 594 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 595 0xfe, 0xdd, 0x2a, 0xfe, 0x80, 0x00, 0xb3, 0xf3, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 596 ]) 597 598 message_info = common.MessageInfo() 599 message_info.source_mac_address = common.MacAddress.from_eui64( 600 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 601 message_info.destination_mac_address = common.MacAddress.from_eui64( 602 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 603 604 context_manager = lowpan.ContextManager() 605 context_manager[10] = lowpan.Context(prefix="2000:0db8::/64") 606 context_manager[12] = lowpan.Context(prefix="200d:1456:1255:0000:2514:46ff:fedd:2afe/128") 607 608 parser = create_default_lowpan_parser(context_manager) 609 610 # WHEN 611 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 612 613 # THEN 614 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 615 616 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_6(self): 617 # GIVEN 618 lowpan_packet = bytearray([ 619 0x7a, 0xf0, 0xc0, 0x3a, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x54, 0x00, 0x00, 0x12, 0x54, 0x11, 0xff, 0xfe, 0x1c, 620 0x7e, 0xff, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 621 ]) 622 623 ipv6_packet = bytearray([ 624 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 625 0x46, 0xff, 0xfe, 0xdd, 0x2a, 0xfe, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x54, 0x00, 0x00, 0x12, 0x54, 0x11, 0xff, 626 0xfe, 0x1c, 0x7e, 0xff, 0x80, 0x00, 0xa5, 0x40, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 627 ]) 628 629 message_info = common.MessageInfo() 630 message_info.source_mac_address = common.MacAddress.from_eui64( 631 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 632 message_info.destination_mac_address = common.MacAddress.from_eui64( 633 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 634 635 context_manager = lowpan.ContextManager() 636 context_manager[12] = lowpan.Context(prefix="200d:1456:1255:0000:2514:46ff:fedd:2afe/128") 637 638 parser = create_default_lowpan_parser(context_manager) 639 640 # WHEN 641 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 642 643 # THEN 644 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 645 646 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_7(self): 647 # GIVEN 648 lowpan_packet = bytearray([ 649 0x7a, 0xd0, 0xd0, 0x3a, 0x00, 0x02, 0x98, 0xff, 0xfe, 0x22, 0x12, 0x00, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 650 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 0xfe, 0xdd, 0x2a, 0xfe, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 651 0x4e, 0x92, 0xbb, 0x53 652 ]) 653 654 ipv6_packet = bytearray([ 655 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0xaa, 0xbb, 0xcc, 0xdd, 0x00, 0x00, 0x00, 0x00, 0x77, 0x82, 656 0x98, 0xff, 0xfe, 0x22, 0x12, 0x00, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 657 0xfe, 0xdd, 0x2a, 0xfe, 0x80, 0x00, 0xf5, 0x28, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 658 ]) 659 660 message_info = common.MessageInfo() 661 message_info.source_mac_address = common.MacAddress.from_eui64( 662 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 663 message_info.destination_mac_address = common.MacAddress.from_eui64( 664 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 665 666 context_manager = lowpan.ContextManager() 667 context_manager[13] = lowpan.Context(prefix="AABB:CCDD:0000:0000:7796::/75") 668 669 parser = create_default_lowpan_parser(context_manager) 670 671 # WHEN 672 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 673 674 # THEN 675 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 676 677 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_8(self): 678 # GIVEN 679 lowpan_packet = bytearray([ 680 0x7a, 0xf0, 0xd0, 0x3a, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 0xfe, 0xdd, 681 0x2a, 0xfe, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 682 ]) 683 684 ipv6_packet = bytearray([ 685 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0xaa, 0xbb, 0xcc, 0xdd, 0x00, 0x00, 0x00, 0x00, 0x77, 0x99, 686 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 687 0xfe, 0xdd, 0x2a, 0xfe, 0x80, 0x00, 0xf5, 0x11, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 688 ]) 689 690 message_info = common.MessageInfo() 691 message_info.source_mac_address = common.MacAddress.from_eui64( 692 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 693 message_info.destination_mac_address = common.MacAddress.from_eui64( 694 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 695 696 context_manager = lowpan.ContextManager() 697 context_manager[13] = lowpan.Context(prefix="AABB:CCDD:0000:0000:7796::/75") 698 699 parser = create_default_lowpan_parser(context_manager) 700 701 # WHEN 702 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 703 704 # THEN 705 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 706 707 def test_should_parse_6lo_with_compressed_icmp_and_without_compressed_hbh_when_decompress_method_called_9(self): 708 # GIVEN 709 lowpan_packet = bytearray([ 710 0x7a, 0xf0, 0xd0, 0x3a, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 0xfe, 0xdd, 711 0x2a, 0xfe, 0x80, 0x00, 0xfa, 0xa5, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 712 ]) 713 714 ipv6_packet = bytearray([ 715 0x60, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x3a, 0x40, 0xaa, 0xbb, 0xcc, 0xdd, 0x00, 0x00, 0x00, 0x00, 0x77, 0x99, 716 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00, 0x20, 0x0d, 0x14, 0x56, 0x12, 0x55, 0x00, 0x00, 0x25, 0x14, 0x46, 0xff, 717 0xfe, 0xdd, 0x2a, 0xfe, 0x80, 0x00, 0xf5, 0x11, 0x0b, 0xc0, 0x00, 0x04, 0x4e, 0x92, 0xbb, 0x53 718 ]) 719 720 message_info = common.MessageInfo() 721 message_info.source_mac_address = common.MacAddress.from_eui64( 722 bytearray([0x00, 0x99, 0x99, 0xff, 0xfe, 0x22, 0x11, 0x00])) 723 message_info.destination_mac_address = common.MacAddress.from_eui64( 724 bytearray([0x34, 0x29, 0x96, 0xff, 0xfe, 0xac, 0xff, 0x17])) 725 context_manager = lowpan.ContextManager() 726 context_manager[13] = lowpan.Context(prefix="AABB:CCDD:0000:0000:7796::/75") 727 728 parser = create_default_lowpan_parser(context_manager) 729 730 # WHEN 731 actual_ipv6_packet = parser.parse(io.BytesIO(lowpan_packet), message_info) 732 733 # THEN 734 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 735 736 def test_should_defragment_big_IPv6_packet_when_parse_method_called_with_fragments_in_random_order(self): 737 # GIVEN 738 fragment_1 = bytearray([ 739 0xC5, 0x00, 0x31, 0x9F, 0x7A, 0x33, 0x3A, 0x80, 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 740 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 741 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 0x43, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 742 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 743 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x80, 0x00, 0xFA, 744 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x4C, 0x66, 0x4E 745 ]) 746 747 fragment_2 = bytearray([ 748 0xE5, 0x00, 0x31, 0x9F, 0x11, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 749 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 0xa3, 0x53, 0x11, 0x44, 0x66, 0xFE, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 750 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 751 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x80, 0x00, 0xFA, 752 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0x1B, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x22, 0xBB, 0x53, 0x1A, 0x44, 753 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA 754 ]) 755 756 fragment_3 = bytearray([ 757 0xE5, 0x00, 0x31, 0x9F, 0x1D, 0x44, 0x54, 0x12, 0xD3, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 758 0x44, 0x66, 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 759 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0xC0, 760 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x44, 0xCC, 0x4E, 0x92, 0xBB, 0x53, 761 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 0x43, 0x53, 762 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBC, 0x53, 0x1A, 0x44, 0x66, 0x77 763 ]) 764 765 fragment_4 = bytearray([ 766 0xE5, 0x00, 0x31, 0x9F, 0x29, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 767 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 768 0x1A, 0x80, 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 769 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 770 0x43, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 771 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99 772 ]) 773 774 fragment_5 = bytearray([ 775 0xE5, 0x00, 0x31, 0x9F, 0x35, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 776 0xBB, 0x53, 0x1A, 0x80, 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x4C, 0x66, 777 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x44, 778 0x54, 0x12, 0xa3, 0x53, 0x11, 0x44, 0x66, 0xFE, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x1A, 0x92, 779 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 780 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A 781 ]) 782 783 fragment_6 = bytearray([ 784 0xE5, 0x00, 0x31, 0x9F, 0x41, 0x80, 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0x1B, 0x53, 0x11, 785 0x44, 0x66, 0x4E, 0x22, 0xBB, 0x53, 0x1A, 0x44, 0x67, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 786 0xAA, 0x44, 0x54, 0x12, 0xD3, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 787 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 788 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0xC0, 0x00, 0xFA, 0x15, 0x0B, 789 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x44, 0xCC, 0x4E 790 ]) 791 792 fragment_7 = bytearray([ 793 0xE5, 0x00, 0x31, 0x9F, 0x4D, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 794 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 0x43, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBC, 0x53, 0x1A, 0x44, 0x66, 795 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 796 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBA, 0x53, 0x1A, 0x60, 0x00, 0x00, 797 0x00, 0x00, 0x10, 0x3A, 0x64, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x11, 0x12, 798 0x13, 0x14, 0x15, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 799 ]) 800 801 fragment_8 = bytearray([ 802 0xE5, 0x00, 0x31, 0x9F, 0x59, 0x02, 0x00, 0x1A, 0x2A, 0x3F, 0x09, 0xAB, 0x43, 0x60, 0x00, 0xF0, 0x00, 0x00, 803 0x10, 0x3A, 0x64, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x11, 0x12, 0x13, 0x14, 804 0x15, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x1A, 0x2A, 0x3F, 0x09, 0xAB, 0x43, 0x80, 805 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 806 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 0x43, 0x53, 807 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBC, 0x53, 0x1A, 0x44, 0x66, 0x77 808 ]) 809 810 fragment_9 = bytearray([ 811 0xE5, 0x00, 0x31, 0x9F, 0x65, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 812 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 813 0x1A, 0x80, 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x4C, 0x66, 0x4E, 0x92, 814 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 815 0xa3, 0x53, 0x11, 0x44, 0x66, 0xFE, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 816 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x4D, 0x66, 0x77, 0x99 817 ]) 818 819 fragment_10 = bytearray([ 820 0xE5, 0x00, 0x31, 0x9F, 0x71, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 821 0xBB, 0x53, 0x1A, 0x80, 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0x1B, 0x53, 0x11, 0x44, 0x66, 822 0x4E, 0x22, 0xBB, 0x51, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x44, 823 0x54, 0x12, 0xD3, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x1A, 0x92, 824 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 825 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A 826 ]) 827 828 fragment_11 = bytearray([ 829 0xE5, 0x00, 0x31, 0x9F, 0x7D, 0xC0, 0x00, 0xFA, 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 830 0x44, 0xCC, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 831 0xAA, 0x44, 0x54, 0x12, 0x4A, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBC, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 832 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 833 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x80, 0x00, 0xFA, 0xA5, 0x0B, 834 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x4E 835 ]) 836 837 fragment_12 = bytearray([ 838 0xE5, 0x00, 0x31, 0x9F, 0x89, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 839 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 0x43, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x3A, 0x44, 0x66, 840 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 841 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x80, 0x00, 0xFA, 842 0xA5, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x4C, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x44, 843 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA 844 ]) 845 846 fragment_13 = bytearray([ 847 0xE5, 0x00, 0x31, 0x9F, 0x95, 0x44, 0x54, 0x12, 0xa3, 0x53, 0x11, 0x44, 0x66, 0xFE, 0x92, 0xBB, 0x53, 0x1A, 848 0x44, 0x66, 0x77, 0x99, 0x1A, 0x92, 0xBB, 0x53, 0x11, 0x44, 0x66, 0x92, 0xBB, 0x53, 0x1A, 0x44, 0x66, 0x77, 849 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x11, 0x44, 0x66, 0x4E, 0x92, 0xBB, 0x53, 0x1A, 0x80, 850 0x00, 0xFA, 0xA5, 0x1B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0x1B, 0x53, 0x11, 0x44, 0x66, 0x4E, 0x22, 0xBB, 0x53, 851 0x1A, 0x44, 0x66, 0x77, 0x99, 0x15, 0xB3, 0x00, 0x54, 0xCC, 0x54, 0x01, 0xAA, 0x44, 0x54, 0x12, 0xD3, 0x53, 852 0x11, 0x44, 0x66 853 ]) 854 855 message_info = common.MessageInfo() 856 message_info.source_mac_address = common.MacAddress.from_eui64( 857 bytearray([0x00, 0x00, 0x00, 0x11, 0x12, 0x13, 0x14, 0x15])) 858 message_info.destination_mac_address = common.MacAddress.from_eui64( 859 bytearray([0x00, 0x00, 0x1A, 0x2A, 0x3F, 0x09, 0xAB, 0x43])) 860 861 parser = create_default_lowpan_parser(context_manager=None) 862 863 # WHEN 864 self.assertIsNone(parser.parse(io.BytesIO(fragment_4), message_info)) 865 self.assertIsNone(parser.parse(io.BytesIO(fragment_2), message_info)) 866 self.assertIsNone(parser.parse(io.BytesIO(fragment_3), message_info)) 867 self.assertIsNone(parser.parse(io.BytesIO(fragment_13), message_info)) 868 self.assertIsNone(parser.parse(io.BytesIO(fragment_5), message_info)) 869 self.assertIsNone(parser.parse(io.BytesIO(fragment_6), message_info)) 870 self.assertIsNone(parser.parse(io.BytesIO(fragment_7), message_info)) 871 self.assertIsNone(parser.parse(io.BytesIO(fragment_8), message_info)) 872 self.assertIsNone(parser.parse(io.BytesIO(fragment_9), message_info)) 873 self.assertIsNone(parser.parse(io.BytesIO(fragment_10), message_info)) 874 self.assertIsNone(parser.parse(io.BytesIO(fragment_11), message_info)) 875 self.assertIsNone(parser.parse(io.BytesIO(fragment_12), message_info)) 876 actual_ipv6_packet = parser.parse(io.BytesIO(fragment_1), message_info) 877 878 # THEN 879 ipv6_packet = bytearray([ 880 0x60, 881 0x00, 882 0x00, 883 0x00, 884 0x04, 885 0xD8, 886 0x3A, 887 0x40, 888 0xfe, 889 0x80, 890 0x00, 891 0x00, 892 0x00, 893 0x00, 894 0x00, 895 0x00, 896 0x02, 897 0x00, 898 0x00, 899 0x11, 900 0x12, 901 0x13, 902 0x14, 903 0x15, 904 0xfe, 905 0x80, 906 0x00, 907 0x00, 908 0x00, 909 0x00, 910 0x00, 911 0x00, 912 0x02, 913 0x00, 914 0x1A, 915 0x2A, 916 0x3F, 917 0x09, 918 0xAB, 919 0x43, # / * 40 * / 920 0x80, 921 0x00, 922 0xAB, 923 0x64, 924 0x0B, 925 0xC0, 926 0x00, 927 0x04, 928 0x4E, 929 0x92, 930 0xBB, 931 0x53, 932 0x11, 933 0x44, 934 0x66, 935 0x4E, 936 0x92, 937 0xBB, 938 0x53, 939 0x1A, 940 0x44, 941 0x66, 942 0x77, 943 0x99, 944 0x15, 945 0xB3, 946 0x00, 947 0x54, 948 0xCC, 949 0x54, 950 0x01, 951 0xAA, 952 0x44, 953 0x54, 954 0x12, 955 0x43, 956 0x53, 957 0x11, 958 0x44, 959 0x66, 960 0x4E, 961 0x92, 962 0xBB, 963 0x53, 964 0x1A, 965 0x44, 966 0x66, 967 0x77, 968 0x99, 969 0x1A, 970 0x92, 971 0xBB, 972 0x53, 973 0x11, 974 0x44, 975 0x66, 976 0x92, 977 0xBB, 978 0x53, 979 0x1A, 980 0x44, 981 0x66, 982 0x77, 983 0x99, 984 0x15, 985 0xB3, 986 0x00, 987 0x54, 988 0xCC, 989 0x54, 990 0x01, 991 0xAA, 992 0x11, 993 0x44, 994 0x66, 995 0x4E, 996 0x92, 997 0xBB, 998 0x53, 999 0x1A, # / * 120 * / 1000 0x80, 1001 0x00, 1002 0xFA, 1003 0xA5, 1004 0x0B, 1005 0xC0, 1006 0x00, 1007 0x04, 1008 0x4E, 1009 0x92, 1010 0xBB, 1011 0x53, 1012 0x11, 1013 0x4C, 1014 0x66, 1015 0x4E, 1016 0x92, 1017 0xBB, 1018 0x53, 1019 0x1A, 1020 0x44, 1021 0x66, 1022 0x77, 1023 0x99, 1024 0x15, 1025 0xB3, 1026 0x00, 1027 0x54, 1028 0xCC, 1029 0x54, 1030 0x01, 1031 0xAA, 1032 0x44, 1033 0x54, 1034 0x12, 1035 0xa3, 1036 0x53, 1037 0x11, 1038 0x44, 1039 0x66, 1040 0xFE, 1041 0x92, 1042 0xBB, 1043 0x53, 1044 0x1A, 1045 0x44, 1046 0x66, 1047 0x77, 1048 0x99, 1049 0x1A, 1050 0x92, 1051 0xBB, 1052 0x53, 1053 0x11, 1054 0x44, 1055 0x66, 1056 0x92, 1057 0xBB, 1058 0x53, 1059 0x1A, 1060 0x44, 1061 0x66, 1062 0x77, 1063 0x99, 1064 0x15, 1065 0xB3, 1066 0x00, 1067 0x54, 1068 0xCC, 1069 0x54, 1070 0x01, 1071 0xAA, 1072 0x11, 1073 0x44, 1074 0x66, 1075 0x4E, 1076 0x92, 1077 0xBB, 1078 0x53, 1079 0x1A, # / * 200 * / 1080 0x80, 1081 0x00, 1082 0xFA, 1083 0xA5, 1084 0x0B, 1085 0xC0, 1086 0x00, 1087 0x04, 1088 0x4E, 1089 0x92, 1090 0x1B, 1091 0x53, 1092 0x11, 1093 0x44, 1094 0x66, 1095 0x4E, 1096 0x22, 1097 0xBB, 1098 0x53, 1099 0x1A, 1100 0x44, 1101 0x66, 1102 0x77, 1103 0x99, 1104 0x15, 1105 0xB3, 1106 0x00, 1107 0x54, 1108 0xCC, 1109 0x54, 1110 0x01, 1111 0xAA, 1112 0x44, 1113 0x54, 1114 0x12, 1115 0xD3, 1116 0x53, 1117 0x11, 1118 0x44, 1119 0x66, 1120 0x4E, 1121 0x92, 1122 0xBB, 1123 0x53, 1124 0x1A, 1125 0x44, 1126 0x66, 1127 0x77, 1128 0x99, 1129 0x1A, 1130 0x92, 1131 0xBB, 1132 0x53, 1133 0x11, 1134 0x44, 1135 0x66, 1136 0x92, 1137 0xBB, 1138 0x53, 1139 0x1A, 1140 0x44, 1141 0x66, 1142 0x77, 1143 0x99, 1144 0x15, 1145 0xB3, 1146 0x00, 1147 0x54, 1148 0xCC, 1149 0x54, 1150 0x01, 1151 0xAA, 1152 0x11, 1153 0x44, 1154 0x66, 1155 0x4E, 1156 0x92, 1157 0xBB, 1158 0x53, 1159 0x1A, # / * 280 * / 1160 0xC0, 1161 0x00, 1162 0xFA, 1163 0xA5, 1164 0x0B, 1165 0xC0, 1166 0x00, 1167 0x04, 1168 0x4E, 1169 0x92, 1170 0xBB, 1171 0x53, 1172 0x11, 1173 0x44, 1174 0xCC, 1175 0x4E, 1176 0x92, 1177 0xBB, 1178 0x53, 1179 0x1A, 1180 0x44, 1181 0x66, 1182 0x77, 1183 0x99, 1184 0x15, 1185 0xB3, 1186 0x00, 1187 0x54, 1188 0xCC, 1189 0x54, 1190 0x01, 1191 0xAA, 1192 0x44, 1193 0x54, 1194 0x12, 1195 0x43, 1196 0x53, 1197 0x11, 1198 0x44, 1199 0x66, 1200 0x4E, 1201 0x92, 1202 0xBC, 1203 0x53, 1204 0x1A, 1205 0x44, 1206 0x66, 1207 0x77, 1208 0x99, 1209 0x1A, 1210 0x92, 1211 0xBB, 1212 0x53, 1213 0x11, 1214 0x44, 1215 0x66, 1216 0x92, 1217 0xBB, 1218 0x53, 1219 0x1A, 1220 0x44, 1221 0x66, 1222 0x77, 1223 0x99, 1224 0x15, 1225 0xB3, 1226 0x00, 1227 0x54, 1228 0xCC, 1229 0x54, 1230 0x01, 1231 0xAA, 1232 0x11, 1233 0x44, 1234 0x66, 1235 0x4E, 1236 0x92, 1237 0xBB, 1238 0x53, 1239 0x1A, # / * 360 * / 1240 0x80, 1241 0x00, 1242 0xFA, 1243 0xA5, 1244 0x0B, 1245 0xC0, 1246 0x00, 1247 0x04, 1248 0x4E, 1249 0x92, 1250 0xBB, 1251 0x53, 1252 0x11, 1253 0x44, 1254 0x66, 1255 0x4E, 1256 0x92, 1257 0xBB, 1258 0x53, 1259 0x1A, 1260 0x44, 1261 0x66, 1262 0x77, 1263 0x99, 1264 0x15, 1265 0xB3, 1266 0x00, 1267 0x54, 1268 0xCC, 1269 0x54, 1270 0x01, 1271 0xAA, 1272 0x44, 1273 0x54, 1274 0x12, 1275 0x43, 1276 0x53, 1277 0x11, 1278 0x44, 1279 0x66, 1280 0x4E, 1281 0x92, 1282 0xBB, 1283 0x53, 1284 0x1A, 1285 0x44, 1286 0x66, 1287 0x77, 1288 0x99, 1289 0x1A, 1290 0x92, 1291 0xBB, 1292 0x53, 1293 0x11, 1294 0x44, 1295 0x66, 1296 0x92, 1297 0xBB, 1298 0x53, 1299 0x1A, 1300 0x44, 1301 0x66, 1302 0x77, 1303 0x99, 1304 0x15, 1305 0xB3, 1306 0x00, 1307 0x54, 1308 0xCC, 1309 0x54, 1310 0x01, 1311 0xAA, 1312 0x11, 1313 0x44, 1314 0x66, 1315 0x4E, 1316 0x92, 1317 0xBB, 1318 0x53, 1319 0x1A, 1320 0x80, 1321 0x00, 1322 0xFA, 1323 0xA5, 1324 0x0B, 1325 0xC0, 1326 0x00, 1327 0x04, 1328 0x4E, 1329 0x92, 1330 0xBB, 1331 0x53, 1332 0x11, 1333 0x4C, 1334 0x66, 1335 0x4E, 1336 0x92, 1337 0xBB, 1338 0x53, 1339 0x1A, 1340 0x44, 1341 0x66, 1342 0x77, 1343 0x99, 1344 0x15, 1345 0xB3, 1346 0x00, 1347 0x54, 1348 0xCC, 1349 0x54, 1350 0x01, 1351 0xAA, 1352 0x44, 1353 0x54, 1354 0x12, 1355 0xa3, 1356 0x53, 1357 0x11, 1358 0x44, 1359 0x66, 1360 0xFE, 1361 0x92, 1362 0xBB, 1363 0x53, 1364 0x1A, 1365 0x44, 1366 0x66, 1367 0x77, 1368 0x99, 1369 0x1A, 1370 0x92, 1371 0xBB, 1372 0x53, 1373 0x11, 1374 0x44, 1375 0x66, 1376 0x92, 1377 0xBB, 1378 0x53, 1379 0x1A, 1380 0x44, 1381 0x66, 1382 0x77, 1383 0x99, 1384 0x15, 1385 0xB3, 1386 0x00, 1387 0x54, 1388 0xCC, 1389 0x54, 1390 0x01, 1391 0xAA, 1392 0x11, 1393 0x44, 1394 0x66, 1395 0x4E, 1396 0x92, 1397 0xBB, 1398 0x53, 1399 0x1A, 1400 0x80, 1401 0x00, 1402 0xFA, 1403 0xA5, 1404 0x0B, 1405 0xC0, 1406 0x00, 1407 0x04, 1408 0x4E, 1409 0x92, 1410 0x1B, 1411 0x53, 1412 0x11, 1413 0x44, 1414 0x66, 1415 0x4E, 1416 0x22, 1417 0xBB, 1418 0x53, 1419 0x1A, 1420 0x44, 1421 0x67, 1422 0x77, 1423 0x99, 1424 0x15, 1425 0xB3, 1426 0x00, 1427 0x54, 1428 0xCC, 1429 0x54, 1430 0x01, 1431 0xAA, 1432 0x44, 1433 0x54, 1434 0x12, 1435 0xD3, 1436 0x53, 1437 0x11, 1438 0x44, 1439 0x66, 1440 0x4E, 1441 0x92, 1442 0xBB, 1443 0x53, 1444 0x1A, 1445 0x44, 1446 0x66, 1447 0x77, 1448 0x99, 1449 0x1A, 1450 0x92, 1451 0xBB, 1452 0x53, 1453 0x11, 1454 0x44, 1455 0x66, 1456 0x92, 1457 0xBB, 1458 0x53, 1459 0x1A, 1460 0x44, 1461 0x66, 1462 0x77, 1463 0x99, 1464 0x15, 1465 0xB3, 1466 0x00, 1467 0x54, 1468 0xCC, 1469 0x54, 1470 0x01, 1471 0xAA, 1472 0x11, 1473 0x44, 1474 0x66, 1475 0x4E, 1476 0x92, 1477 0xBB, 1478 0x53, 1479 0x1A, 1480 0xC0, 1481 0x00, 1482 0xFA, 1483 0x15, 1484 0x0B, 1485 0xC0, 1486 0x00, 1487 0x04, 1488 0x4E, 1489 0x92, 1490 0xBB, 1491 0x53, 1492 0x11, 1493 0x44, 1494 0xCC, 1495 0x4E, 1496 0x92, 1497 0xBB, 1498 0x53, 1499 0x1A, 1500 0x44, 1501 0x66, 1502 0x77, 1503 0x99, 1504 0x15, 1505 0xB3, 1506 0x00, 1507 0x54, 1508 0xCC, 1509 0x54, 1510 0x01, 1511 0xAA, 1512 0x44, 1513 0x54, 1514 0x12, 1515 0x43, 1516 0x53, 1517 0x11, 1518 0x44, 1519 0x66, 1520 0x4E, 1521 0x92, 1522 0xBC, 1523 0x53, 1524 0x1A, 1525 0x44, 1526 0x66, 1527 0x77, 1528 0x99, 1529 0x1A, 1530 0x92, 1531 0xBB, 1532 0x53, 1533 0x11, 1534 0x44, 1535 0x66, 1536 0x92, 1537 0xBB, 1538 0x53, 1539 0x1A, 1540 0x44, 1541 0x66, 1542 0x77, 1543 0x99, 1544 0x15, 1545 0xB3, 1546 0x00, 1547 0x54, 1548 0xCC, 1549 0x54, 1550 0x01, 1551 0xAA, 1552 0x11, 1553 0x44, 1554 0x66, 1555 0x4E, 1556 0x92, 1557 0xBA, 1558 0x53, 1559 0x1A, 1560 0x60, 1561 0x00, 1562 0x00, 1563 0x00, 1564 0x00, 1565 0x10, 1566 0x3A, 1567 0x64, 1568 0xfe, 1569 0x80, 1570 0x00, 1571 0x00, 1572 0x00, 1573 0x00, 1574 0x00, 1575 0x00, 1576 0x02, 1577 0x00, 1578 0x00, 1579 0x11, 1580 0x12, 1581 0x13, 1582 0x14, 1583 0x15, 1584 0xfe, 1585 0x80, 1586 0x00, 1587 0x00, 1588 0x00, 1589 0x00, 1590 0x00, 1591 0x00, 1592 0x02, 1593 0x00, 1594 0x1A, 1595 0x2A, 1596 0x3F, 1597 0x09, 1598 0xAB, 1599 0x43, # / * 720 * / 1600 0x60, 1601 0x00, 1602 0xF0, 1603 0x00, 1604 0x00, 1605 0x10, 1606 0x3A, 1607 0x64, 1608 0xfe, 1609 0x80, 1610 0x00, 1611 0x00, 1612 0x00, 1613 0x00, 1614 0x00, 1615 0x00, 1616 0x02, 1617 0x00, 1618 0x00, 1619 0x11, 1620 0x12, 1621 0x13, 1622 0x14, 1623 0x15, 1624 0xfe, 1625 0x80, 1626 0x00, 1627 0x00, 1628 0x00, 1629 0x00, 1630 0x00, 1631 0x00, 1632 0x02, 1633 0x00, 1634 0x1A, 1635 0x2A, 1636 0x3F, 1637 0x09, 1638 0xAB, 1639 0x43, 1640 0x80, 1641 0x00, 1642 0xFA, 1643 0xA5, 1644 0x0B, 1645 0xC0, 1646 0x00, 1647 0x04, 1648 0x4E, 1649 0x92, 1650 0xBB, 1651 0x53, 1652 0x11, 1653 0x44, 1654 0x66, 1655 0x4E, 1656 0x92, 1657 0xBB, 1658 0x53, 1659 0x1A, 1660 0x44, 1661 0x66, 1662 0x77, 1663 0x99, 1664 0x15, 1665 0xB3, 1666 0x00, 1667 0x54, 1668 0xCC, 1669 0x54, 1670 0x01, 1671 0xAA, 1672 0x44, 1673 0x54, 1674 0x12, 1675 0x43, 1676 0x53, 1677 0x11, 1678 0x44, 1679 0x66, 1680 0x4E, 1681 0x92, 1682 0xBC, 1683 0x53, 1684 0x1A, 1685 0x44, 1686 0x66, 1687 0x77, 1688 0x99, 1689 0x1A, 1690 0x92, 1691 0xBB, 1692 0x53, 1693 0x11, 1694 0x44, 1695 0x66, 1696 0x92, 1697 0xBB, 1698 0x53, 1699 0x1A, 1700 0x44, 1701 0x66, 1702 0x77, 1703 0x99, 1704 0x15, 1705 0xB3, 1706 0x00, 1707 0x54, 1708 0xCC, 1709 0x54, 1710 0x01, 1711 0xAA, 1712 0x11, 1713 0x44, 1714 0x66, 1715 0x4E, 1716 0x92, 1717 0xBB, 1718 0x53, 1719 0x1A, 1720 0x80, 1721 0x00, 1722 0xFA, 1723 0xA5, 1724 0x0B, 1725 0xC0, 1726 0x00, 1727 0x04, 1728 0x4E, 1729 0x92, 1730 0xBB, 1731 0x53, 1732 0x11, 1733 0x4C, 1734 0x66, 1735 0x4E, 1736 0x92, 1737 0xBB, 1738 0x53, 1739 0x1A, 1740 0x44, 1741 0x66, 1742 0x77, 1743 0x99, 1744 0x15, 1745 0xB3, 1746 0x00, 1747 0x54, 1748 0xCC, 1749 0x54, 1750 0x01, 1751 0xAA, 1752 0x44, 1753 0x54, 1754 0x12, 1755 0xa3, 1756 0x53, 1757 0x11, 1758 0x44, 1759 0x66, 1760 0xFE, 1761 0x92, 1762 0xBB, 1763 0x53, 1764 0x1A, 1765 0x44, 1766 0x66, 1767 0x77, 1768 0x99, 1769 0x1A, 1770 0x92, 1771 0xBB, 1772 0x53, 1773 0x11, 1774 0x44, 1775 0x66, 1776 0x92, 1777 0xBB, 1778 0x53, 1779 0x1A, 1780 0x4D, 1781 0x66, 1782 0x77, 1783 0x99, 1784 0x15, 1785 0xB3, 1786 0x00, 1787 0x54, 1788 0xCC, 1789 0x54, 1790 0x01, 1791 0xAA, 1792 0x11, 1793 0x44, 1794 0x66, 1795 0x4E, 1796 0x92, 1797 0xBB, 1798 0x53, 1799 0x1A, 1800 0x80, 1801 0x00, 1802 0xFA, 1803 0xA5, 1804 0x0B, 1805 0xC0, 1806 0x00, 1807 0x04, 1808 0x4E, 1809 0x92, 1810 0x1B, 1811 0x53, 1812 0x11, 1813 0x44, 1814 0x66, 1815 0x4E, 1816 0x22, 1817 0xBB, 1818 0x51, 1819 0x1A, 1820 0x44, 1821 0x66, 1822 0x77, 1823 0x99, 1824 0x15, 1825 0xB3, 1826 0x00, 1827 0x54, 1828 0xCC, 1829 0x54, 1830 0x01, 1831 0xAA, 1832 0x44, 1833 0x54, 1834 0x12, 1835 0xD3, 1836 0x53, 1837 0x11, 1838 0x44, 1839 0x66, 1840 0x4E, 1841 0x92, 1842 0xBB, 1843 0x53, 1844 0x1A, 1845 0x44, 1846 0x66, 1847 0x77, 1848 0x99, 1849 0x1A, 1850 0x92, 1851 0xBB, 1852 0x53, 1853 0x11, 1854 0x44, 1855 0x66, 1856 0x92, 1857 0xBB, 1858 0x53, 1859 0x1A, 1860 0x44, 1861 0x66, 1862 0x77, 1863 0x99, 1864 0x15, 1865 0xB3, 1866 0x00, 1867 0x54, 1868 0xCC, 1869 0x54, 1870 0x01, 1871 0xAA, 1872 0x11, 1873 0x44, 1874 0x66, 1875 0x4E, 1876 0x92, 1877 0xBB, 1878 0x53, 1879 0x1A, 1880 0xC0, 1881 0x00, 1882 0xFA, 1883 0xA5, 1884 0x0B, 1885 0xC0, 1886 0x00, 1887 0x04, 1888 0x4E, 1889 0x92, 1890 0xBB, 1891 0x53, 1892 0x11, 1893 0x44, 1894 0xCC, 1895 0x4E, 1896 0x92, 1897 0xBB, 1898 0x53, 1899 0x1A, 1900 0x44, 1901 0x66, 1902 0x77, 1903 0x99, 1904 0x15, 1905 0xB3, 1906 0x00, 1907 0x54, 1908 0xCC, 1909 0x54, 1910 0x01, 1911 0xAA, 1912 0x44, 1913 0x54, 1914 0x12, 1915 0x4A, 1916 0x53, 1917 0x11, 1918 0x44, 1919 0x66, 1920 0x4E, 1921 0x92, 1922 0xBC, 1923 0x53, 1924 0x1A, 1925 0x44, 1926 0x66, 1927 0x77, 1928 0x99, 1929 0x1A, 1930 0x92, 1931 0xBB, 1932 0x53, 1933 0x11, 1934 0x44, 1935 0x66, 1936 0x92, 1937 0xBB, 1938 0x53, 1939 0x1A, 1940 0x44, 1941 0x66, 1942 0x77, 1943 0x99, 1944 0x15, 1945 0xB3, 1946 0x00, 1947 0x54, 1948 0xCC, 1949 0x54, 1950 0x01, 1951 0xAA, 1952 0x11, 1953 0x44, 1954 0x66, 1955 0x4E, 1956 0x92, 1957 0xBB, 1958 0x53, 1959 0x1A, # / * 1080 * / 1960 0x80, 1961 0x00, 1962 0xFA, 1963 0xA5, 1964 0x0B, 1965 0xC0, 1966 0x00, 1967 0x04, 1968 0x4E, 1969 0x92, 1970 0xBB, 1971 0x53, 1972 0x11, 1973 0x44, 1974 0x66, 1975 0x4E, 1976 0x92, 1977 0xBB, 1978 0x53, 1979 0x1A, 1980 0x44, 1981 0x66, 1982 0x77, 1983 0x99, 1984 0x15, 1985 0xB3, 1986 0x00, 1987 0x54, 1988 0xCC, 1989 0x54, 1990 0x01, 1991 0xAA, 1992 0x44, 1993 0x54, 1994 0x12, 1995 0x43, 1996 0x53, 1997 0x11, 1998 0x44, 1999 0x66, 2000 0x4E, 2001 0x92, 2002 0xBB, 2003 0x53, 2004 0x3A, 2005 0x44, 2006 0x66, 2007 0x77, 2008 0x99, 2009 0x1A, 2010 0x92, 2011 0xBB, 2012 0x53, 2013 0x11, 2014 0x44, 2015 0x66, 2016 0x92, 2017 0xBB, 2018 0x53, 2019 0x1A, 2020 0x44, 2021 0x66, 2022 0x77, 2023 0x99, 2024 0x15, 2025 0xB3, 2026 0x00, 2027 0x54, 2028 0xCC, 2029 0x54, 2030 0x01, 2031 0xAA, 2032 0x11, 2033 0x44, 2034 0x66, 2035 0x4E, 2036 0x92, 2037 0xBB, 2038 0x53, 2039 0x1A, 2040 0x80, 2041 0x00, 2042 0xFA, 2043 0xA5, 2044 0x0B, 2045 0xC0, 2046 0x00, 2047 0x04, 2048 0x4E, 2049 0x92, 2050 0xBB, 2051 0x53, 2052 0x11, 2053 0x4C, 2054 0x66, 2055 0x4E, 2056 0x92, 2057 0xBB, 2058 0x53, 2059 0x1A, 2060 0x44, 2061 0x66, 2062 0x77, 2063 0x99, 2064 0x15, 2065 0xB3, 2066 0x00, 2067 0x54, 2068 0xCC, 2069 0x54, 2070 0x01, 2071 0xAA, 2072 0x44, 2073 0x54, 2074 0x12, 2075 0xa3, 2076 0x53, 2077 0x11, 2078 0x44, 2079 0x66, 2080 0xFE, 2081 0x92, 2082 0xBB, 2083 0x53, 2084 0x1A, 2085 0x44, 2086 0x66, 2087 0x77, 2088 0x99, 2089 0x1A, 2090 0x92, 2091 0xBB, 2092 0x53, 2093 0x11, 2094 0x44, 2095 0x66, 2096 0x92, 2097 0xBB, 2098 0x53, 2099 0x1A, 2100 0x44, 2101 0x66, 2102 0x77, 2103 0x99, 2104 0x15, 2105 0xB3, 2106 0x00, 2107 0x54, 2108 0xCC, 2109 0x54, 2110 0x01, 2111 0xAA, 2112 0x11, 2113 0x44, 2114 0x66, 2115 0x4E, 2116 0x92, 2117 0xBB, 2118 0x53, 2119 0x1A, 2120 0x80, 2121 0x00, 2122 0xFA, 2123 0xA5, 2124 0x1B, 2125 0xC0, 2126 0x00, 2127 0x04, 2128 0x4E, 2129 0x92, 2130 0x1B, 2131 0x53, 2132 0x11, 2133 0x44, 2134 0x66, 2135 0x4E, 2136 0x22, 2137 0xBB, 2138 0x53, 2139 0x1A, 2140 0x44, 2141 0x66, 2142 0x77, 2143 0x99, 2144 0x15, 2145 0xB3, 2146 0x00, 2147 0x54, 2148 0xCC, 2149 0x54, 2150 0x01, 2151 0xAA, 2152 0x44, 2153 0x54, 2154 0x12, 2155 0xD3, 2156 0x53, 2157 0x11, 2158 0x44, 2159 0x66 2160 ]) 2161 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 2162 2163 def test_should_defragment_IPv6_packet_when_parse_method_called_with_fragments(self): 2164 # GIVEN 2165 message_info = common.MessageInfo() 2166 message_info.source_mac_address = common.MacAddress.from_eui64( 2167 bytearray([0x00, 0x00, 0x00, 0x11, 0x12, 0x13, 0x14, 0x15])) 2168 message_info.destination_mac_address = common.MacAddress.from_eui64( 2169 bytearray([0x00, 0x00, 0x1A, 0x2A, 0x3F, 0x09, 0xAB, 0x43])) 2170 2171 fragment_1 = bytearray( 2172 [0xC0, 0x38, 0x12, 0x34, 0x7A, 0x33, 0x3A, 0x80, 0x00, 0x1A, 0x33, 0x0B, 0xC0, 0x00, 0x04]) 2173 2174 fragment_2 = bytearray([0xE0, 0x38, 0x12, 0x34, 0x06, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x12, 0x13, 0x14]) 2175 2176 parser = create_default_lowpan_parser(None) 2177 2178 # WHEN 2179 self.assertIsNone(parser.parse(io.BytesIO(fragment_1), message_info=message_info)) 2180 actual_ipv6_packet = parser.parse(io.BytesIO(fragment_2), message_info=message_info) 2181 2182 # THEN 2183 ipv6_packet = bytearray([ 2184 0x60, 0x00, 0x00, 0x00, 0x00, 0x10, 0x3A, 0x40, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 2185 0x00, 0x11, 0x12, 0x13, 0x14, 0x15, 0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x1A, 0x2A, 2186 0x3F, 0x09, 0xAB, 0x43, 0x80, 0x00, 0x1A, 0x33, 0x0B, 0xC0, 0x00, 0x04, 0x4E, 0x92, 0xBB, 0x53, 0x11, 0x12, 2187 0x13, 0x14 2188 ]) 2189 self.assertEqual(ipv6_packet, actual_ipv6_packet.to_bytes()) 2190 2191 2192class TestLowpanUdpHeaderFactory(unittest.TestCase): 2193 2194 def test_should_parse_udp_datagram_ports_when_decompress_udp_ports_method_called_with_udphc_p_eq_0(self): 2195 # GIVEN 2196 factory = lowpan.LowpanUdpHeaderFactory() 2197 2198 p = factory.UDP_HC_P_BOTH_FULL 2199 2200 udphc = lowpan.LowpanUDPHC(any_c(), p) 2201 2202 src_port = any_src_port() 2203 dst_port = any_dst_port() 2204 2205 data_bytes = struct.pack(">H", src_port) + struct.pack(">H", dst_port) 2206 2207 # WHEN 2208 actual_src_port, actual_dst_port = factory._decompress_udp_ports(udphc, io.BytesIO(data_bytes)) 2209 2210 # THEN 2211 self.assertEqual(src_port, actual_src_port) 2212 self.assertEqual(dst_port, actual_dst_port) 2213 self.assertEqual(0, p) 2214 2215 def test_should_parse_udp_datagram_ports_when_decompress_udp_ports_method_called_with_udphc_p_eq_1(self): 2216 # GIVEN 2217 factory = lowpan.LowpanUdpHeaderFactory() 2218 2219 p = factory.UDP_HC_P_DST_COMPR 2220 2221 udphc = lowpan.LowpanUDPHC(any_c(), p) 2222 2223 src_port = any_src_port() 2224 dst_port = any_compressable_dst_port() 2225 2226 data_bytes = struct.pack(">H", src_port) + bytearray([struct.pack(">H", dst_port)[1]]) 2227 2228 # WHEN 2229 actual_src_port, actual_dst_port = factory._decompress_udp_ports(udphc, io.BytesIO(data_bytes)) 2230 2231 # THEN 2232 self.assertEqual(1, p) 2233 self.assertEqual(src_port, actual_src_port) 2234 self.assertEqual(dst_port, actual_dst_port) 2235 2236 def test_should_parse_udp_datagram_ports_when_decompress_udp_ports_method_called_with_udphc_p_eq_2(self): 2237 # GIVEN 2238 factory = lowpan.LowpanUdpHeaderFactory() 2239 2240 p = factory.UDP_HC_P_SRC_COMPR 2241 2242 udphc = lowpan.LowpanUDPHC(any_c(), p) 2243 2244 src_port = any_compressable_src_port() 2245 dst_port = any_dst_port() 2246 2247 data_bytes = bytearray([struct.pack(">H", src_port)[1]]) + struct.pack(">H", dst_port) 2248 2249 # WHEN 2250 actual_src_port, actual_dst_port = factory._decompress_udp_ports(udphc, io.BytesIO(data_bytes)) 2251 2252 # THEN 2253 self.assertEqual(2, p) 2254 self.assertEqual(src_port, actual_src_port) 2255 self.assertEqual(dst_port, actual_dst_port) 2256 2257 def test_should_parse_udp_datagram_ports_when_decompress_udp_ports_method_called_with_udphc_p_eq_3(self): 2258 # GIVEN 2259 factory = lowpan.LowpanUdpHeaderFactory() 2260 2261 p = factory.UDP_HC_P_BOTH_COMPR 2262 2263 udphc = lowpan.LowpanUDPHC(any_c(), p) 2264 2265 src_port = any_nibble_src_port() 2266 dst_port = any_nibble_dst_port() 2267 2268 data_bytes = bytearray([((src_port & 0x0F) << 4) | (dst_port & 0x0F)]) 2269 2270 # WHEN 2271 actual_src_port, actual_dst_port = factory._decompress_udp_ports(udphc, io.BytesIO(data_bytes)) 2272 2273 # THEN 2274 self.assertEqual(3, p) 2275 self.assertEqual(src_port, actual_src_port) 2276 self.assertEqual(dst_port, actual_dst_port) 2277 2278 def test_should_parse_udp_datagram_checksum_when_decompress_udp_checksum_called_with_udphc_c_eq_0(self): 2279 # GIVEN 2280 factory = lowpan.LowpanUdpHeaderFactory() 2281 2282 c = factory.UDP_HC_C_INLINE 2283 2284 udphc = lowpan.LowpanUDPHC(c, any_p()) 2285 2286 checksum = any_checksum() 2287 2288 data_bytes = struct.pack(">H", checksum) 2289 2290 # WHEN 2291 actual_checksum = factory._decompress_udp_checksum(udphc, io.BytesIO(data_bytes)) 2292 2293 # THEN 2294 self.assertEqual(0, c) 2295 self.assertEqual(checksum, actual_checksum) 2296 2297 def test_should_parse_udp_datagram_checksum_when_decompress_udp_checksum_called_with_udphc_c_eq_1(self): 2298 # GIVEN 2299 factory = lowpan.LowpanUdpHeaderFactory() 2300 2301 c = factory.UDP_HC_C_ELIDED 2302 2303 udphc = lowpan.LowpanUDPHC(c, any_p()) 2304 2305 data_bytes = bytearray() 2306 2307 # WHEN 2308 actual_checksum = factory._decompress_udp_checksum(udphc, io.BytesIO(data_bytes)) 2309 2310 # THEN 2311 self.assertEqual(1, c) 2312 self.assertEqual(0, actual_checksum) 2313 2314 2315class TestLowpanIpv6HeaderFactory(unittest.TestCase): 2316 2317 IPV6_LINKLOCAL_PREFIX = bytearray([0xfe, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) 2318 2319 def test_should_parse_traffic_class_and_flow_label_when_decompress_tf_method_called_with_iphc_tf_eq_0(self): 2320 # GIVEN 2321 ecn = any_ecn() 2322 dscp = any_dscp() 2323 flow_label = any_flow_label() 2324 2325 data_bytes = bytearray() 2326 data_bytes.append((ecn << 6) | dscp) 2327 data_bytes.append((flow_label >> 16) & 0x0F) 2328 data_bytes.append((flow_label >> 8) & 0xFF) 2329 data_bytes.append(flow_label & 0xFF) 2330 2331 factory = lowpan.LowpanIpv6HeaderFactory() 2332 2333 tf = factory.IPHC_TF_4B 2334 iphc = lowpan.LowpanIPHC(tf, any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2335 any_dam()) 2336 2337 # WHEN 2338 actual_traffic_class, actual_flow_label = factory._decompress_tf(iphc, io.BytesIO(data_bytes)) 2339 2340 # THEN 2341 self.assertEqual(0, tf) 2342 self.assertEqual((dscp << 2) | ecn, actual_traffic_class) 2343 self.assertEqual(flow_label, actual_flow_label) 2344 2345 def test_should_parse_traffic_class_and_flow_label_when_decompress_tf_method_called_with_iphc_tf_eq_1(self): 2346 # GIVEN 2347 ecn = any_ecn() 2348 flow_label = any_flow_label() 2349 2350 data_bytes = bytearray() 2351 data_bytes.append((ecn << 6) | (flow_label >> 16) & 0x0F) 2352 data_bytes.append((flow_label >> 8) & 0xFF) 2353 data_bytes.append(flow_label & 0xFF) 2354 2355 factory = lowpan.LowpanIpv6HeaderFactory() 2356 2357 tf = factory.IPHC_TF_3B 2358 iphc = lowpan.LowpanIPHC(tf, any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2359 any_dam()) 2360 2361 # WHEN 2362 actual_traffic_class, actual_flow_label = factory._decompress_tf(iphc, io.BytesIO(data_bytes)) 2363 2364 # THEN 2365 self.assertEqual(1, tf) 2366 self.assertEqual(ecn, actual_traffic_class) 2367 self.assertEqual(flow_label, actual_flow_label) 2368 2369 def test_should_parse_traffic_class_and_flow_label_when_decompress_tf_method_called_with_iphc_tf_eq_2(self): 2370 # GIVEN 2371 ecn = any_ecn() 2372 dscp = any_dscp() 2373 2374 data_bytes = bytearray([(ecn << 6) | dscp]) 2375 2376 factory = lowpan.LowpanIpv6HeaderFactory() 2377 2378 tf = factory.IPHC_TF_1B 2379 iphc = lowpan.LowpanIPHC(tf, any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2380 any_dam()) 2381 2382 # WHEN 2383 actual_traffic_class, actual_flow_label = factory._decompress_tf(iphc, io.BytesIO(data_bytes)) 2384 2385 # THEN 2386 self.assertEqual(2, tf) 2387 self.assertEqual((dscp << 2) | ecn, actual_traffic_class) 2388 self.assertEqual(0, actual_flow_label) 2389 2390 def test_should_parse_traffic_class_and_flow_label_when_decompress_tf_method_called_with_iphc_tf_eq_3(self): 2391 data_bytes = bytearray() 2392 2393 factory = lowpan.LowpanIpv6HeaderFactory() 2394 2395 tf = factory.IPHC_TF_ELIDED 2396 iphc = lowpan.LowpanIPHC(tf, any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2397 any_dam()) 2398 2399 # WHEN 2400 actual_traffic_class, actual_flow_label = factory._decompress_tf(iphc, io.BytesIO(data_bytes)) 2401 2402 # THEN 2403 self.assertEqual(3, tf) 2404 self.assertEqual(0, actual_traffic_class) 2405 self.assertEqual(0, actual_flow_label) 2406 2407 def test_should_parse_traffic_class_and_flow_label_when_decompress_nh_method_called_with_iphc_nh_eq_0(self): 2408 # GIVEN 2409 factory = lowpan.LowpanIpv6HeaderFactory() 2410 2411 next_header = any_next_header() 2412 2413 nh = factory.IPHC_NH_INLINE 2414 iphc = lowpan.LowpanIPHC(any_tf(), nh, any_hlim(), any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2415 any_dam()) 2416 2417 data_bytes = bytearray([next_header]) 2418 2419 # WHEN 2420 actual_next_header = factory._decompress_nh(iphc, io.BytesIO(data_bytes)) 2421 2422 # THEN 2423 self.assertEqual(0, nh) 2424 self.assertEqual(next_header, actual_next_header) 2425 2426 def test_should_parse_traffic_class_and_flow_label_when_decompress_nh_method_called_with_iphc_nh_eq_1(self): 2427 # GIVEN 2428 factory = lowpan.LowpanIpv6HeaderFactory() 2429 2430 nh = factory.IPHC_NH_COMPRESSED 2431 iphc = lowpan.LowpanIPHC(any_tf(), nh, any_hlim(), any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2432 any_dam()) 2433 2434 data_bytes = bytearray() 2435 2436 # WHEN 2437 actual_next_header = factory._decompress_nh(iphc, io.BytesIO(data_bytes)) 2438 2439 # THEN 2440 self.assertEqual(1, nh) 2441 self.assertEqual(None, actual_next_header) 2442 2443 def test_should_parse_hop_limit_when_decompress_hlim_called_with_iphc_hlim_eq_0(self): 2444 # GIVEN 2445 hop_limit = any_hop_limit() 2446 2447 factory = lowpan.LowpanIpv6HeaderFactory() 2448 2449 hlim = factory.IPHC_HLIM_INLINE 2450 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), hlim, any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2451 any_dam()) 2452 2453 data_bytes = bytearray([hop_limit]) 2454 2455 # WHEN 2456 actual_hop_limit = factory._decompress_hlim(iphc, io.BytesIO(data_bytes)) 2457 2458 # THEN 2459 self.assertEqual(0, hlim) 2460 self.assertEqual(hop_limit, actual_hop_limit) 2461 2462 def test_should_parse_hop_limit_when_decompress_hlim_called_with_iphc_hlim_eq_1(self): 2463 # GIVEN 2464 factory = lowpan.LowpanIpv6HeaderFactory() 2465 2466 hlim = factory.IPHC_HLIM_1 2467 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), hlim, any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2468 any_dam()) 2469 2470 data_bytes = bytearray() 2471 2472 # WHEN 2473 actual_hop_limit = factory._decompress_hlim(iphc, io.BytesIO(data_bytes)) 2474 2475 # THEN 2476 self.assertEqual(1, hlim) 2477 self.assertEqual(1, actual_hop_limit) 2478 2479 def test_should_parse_hop_limit_when_decompress_hlim_called_with_iphc_hlim_eq_2(self): 2480 # GIVEN 2481 factory = lowpan.LowpanIpv6HeaderFactory() 2482 2483 hlim = factory.IPHC_HLIM_64 2484 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), hlim, any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2485 any_dam()) 2486 2487 data_bytes = bytearray() 2488 2489 # WHEN 2490 actual_hop_limit = factory._decompress_hlim(iphc, io.BytesIO(data_bytes)) 2491 2492 # THEN 2493 self.assertEqual(2, hlim) 2494 self.assertEqual(64, actual_hop_limit) 2495 2496 def test_should_parse_hop_limit_when_decompress_hlim_called_with_iphc_hlim_eq_3(self): 2497 # GIVEN 2498 factory = lowpan.LowpanIpv6HeaderFactory() 2499 2500 hlim = factory.IPHC_HLIM_255 2501 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), hlim, any_cid(), any_sac(), any_sam(), any_m(), any_dac(), 2502 any_dam()) 2503 2504 data_bytes = bytearray() 2505 2506 # WHEN 2507 actual_hop_limit = factory._decompress_hlim(iphc, io.BytesIO(data_bytes)) 2508 2509 # THEN 2510 self.assertEqual(3, hlim) 2511 self.assertEqual(255, actual_hop_limit) 2512 2513 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_0_and_sam_eq_0(self): 2514 # GIVEN 2515 factory = lowpan.LowpanIpv6HeaderFactory() 2516 2517 src_addr = any_src_addr() 2518 2519 sac = factory.IPHC_SAC_STATELESS 2520 sam = factory.IPHC_SAM_128B 2521 2522 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2523 2524 # WHEN 2525 actual_src_addr = factory._decompress_src_addr(iphc, any_src_mac_addr(), any_sci(), io.BytesIO(src_addr)) 2526 2527 # THEN 2528 self.assertEqual(0, sac) 2529 self.assertEqual(0, sam) 2530 self.assertEqual(bytes(src_addr), actual_src_addr) 2531 2532 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_0_and_sam_eq_1(self): 2533 # GIVEN 2534 factory = lowpan.LowpanIpv6HeaderFactory() 2535 2536 eui64 = any_eui64() 2537 2538 sac = factory.IPHC_SAC_STATELESS 2539 sam = factory.IPHC_SAM_64B 2540 2541 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2542 2543 # WHEN 2544 actual_src_addr = factory._decompress_src_addr(iphc, any_src_mac_addr(), any_sci(), io.BytesIO(eui64)) 2545 2546 # THEN 2547 self.assertEqual(0, sac) 2548 self.assertEqual(1, sam) 2549 self.assertEqual(self.IPV6_LINKLOCAL_PREFIX + eui64, actual_src_addr) 2550 2551 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_0_and_sam_eq_2(self): 2552 # GIVEN 2553 factory = lowpan.LowpanIpv6HeaderFactory() 2554 2555 rloc16 = any_rloc16() 2556 2557 sac = factory.IPHC_SAC_STATELESS 2558 sam = factory.IPHC_SAM_16B 2559 2560 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2561 2562 # WHEN 2563 actual_src_addr = factory._decompress_src_addr(iphc, any_src_mac_addr(), any_sci(), io.BytesIO(rloc16)) 2564 2565 # THEN 2566 self.assertEqual(0, sac) 2567 self.assertEqual(2, sam) 2568 self.assertEqual(self.IPV6_LINKLOCAL_PREFIX + bytearray([0x00, 0x00, 0x00, 0xff, 0xfe, 0x00]) + rloc16, 2569 actual_src_addr) 2570 2571 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_0_and_sam_eq_3(self): 2572 # GIVEN 2573 factory = lowpan.LowpanIpv6HeaderFactory() 2574 2575 src_mac_addr = common.MacAddress.from_eui64(any_src_mac_addr()) 2576 2577 sac = factory.IPHC_SAC_STATELESS 2578 sam = factory.IPHC_SAM_ELIDED 2579 2580 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2581 2582 data_bytes = bytearray([]) 2583 2584 # WHEN 2585 actual_src_addr = factory._decompress_src_addr(iphc, src_mac_addr, any_sci(), io.BytesIO(data_bytes)) 2586 2587 # THEN 2588 self.assertEqual(0, sac) 2589 self.assertEqual(3, sam) 2590 self.assertEqual( 2591 self.IPV6_LINKLOCAL_PREFIX + bytearray([src_mac_addr.mac_address[0] ^ 0x02]) + 2592 src_mac_addr.mac_address[1:], actual_src_addr) 2593 2594 def _merge_prefix_and_address(self, prefix, prefix_length, address): 2595 total_bytes = 16 2596 2597 prefix_length_in_bytes = int(prefix_length / 8) 2598 2599 if (prefix_length_in_bytes + len(address)) > total_bytes: 2600 total_bytes -= prefix_length_in_bytes 2601 2602 return prefix[:prefix_length_in_bytes] + address[-total_bytes:] 2603 2604 else: 2605 total_bytes -= prefix_length_in_bytes 2606 total_bytes -= len(address) 2607 2608 return prefix[:prefix_length_in_bytes] + bytearray([0x00] * total_bytes) + address 2609 2610 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_1_and_sam_eq_0(self): 2611 # GIVEN 2612 factory = lowpan.LowpanIpv6HeaderFactory(None) 2613 2614 src_addr = any_src_addr() 2615 2616 sac = factory.IPHC_SAC_STATEFUL 2617 sam = factory.IPHC_SAM_UNSPECIFIED 2618 2619 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2620 2621 # WHEN 2622 actual_src_addr = factory._decompress_src_addr(iphc, any_src_mac_addr(), any_sci(), io.BytesIO(src_addr)) 2623 2624 # THEN 2625 self.assertEqual(1, sac) 2626 self.assertEqual(0, sam) 2627 self.assertEqual(bytearray([0x00] * 16), actual_src_addr) 2628 2629 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_1_and_sam_eq_1(self): 2630 # GIVEN 2631 sci = any_sci() 2632 2633 context = any_context() 2634 2635 context_manager = lowpan.ContextManager() 2636 context_manager[sci] = context 2637 2638 factory = lowpan.LowpanIpv6HeaderFactory(context_manager) 2639 2640 eui64 = any_eui64() 2641 2642 sac = factory.IPHC_SAC_STATEFUL 2643 sam = factory.IPHC_SAM_64B 2644 2645 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2646 2647 src_addr = self._merge_prefix_and_address(context.prefix, context.prefix_length, eui64) 2648 2649 # WHEN 2650 actual_src_addr = factory._decompress_src_addr(iphc, any_src_mac_addr(), sci, io.BytesIO(eui64)) 2651 2652 # THEN 2653 self.assertEqual(1, sac) 2654 self.assertEqual(1, sam) 2655 self.assertEqual(src_addr, actual_src_addr) 2656 2657 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_1_and_sam_eq_2(self): 2658 # GIVEN 2659 sci = any_sci() 2660 2661 context = any_context() 2662 2663 context_manager = lowpan.ContextManager() 2664 context_manager[sci] = context 2665 2666 factory = lowpan.LowpanIpv6HeaderFactory(context_manager) 2667 2668 rloc16 = any_rloc16() 2669 2670 sac = factory.IPHC_SAC_STATEFUL 2671 sam = factory.IPHC_SAM_16B 2672 2673 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2674 2675 iid = bytearray([0x00, 0x00, 0x00, 0xff, 0xfe, 0x00]) + rloc16 2676 2677 src_addr = self._merge_prefix_and_address(context.prefix, context.prefix_length, iid) 2678 2679 # WHEN 2680 actual_src_addr = factory._decompress_src_addr(iphc, any_src_mac_addr(), sci, io.BytesIO(rloc16)) 2681 2682 # THEN 2683 self.assertEqual(1, sac) 2684 self.assertEqual(2, sam) 2685 self.assertEqual(src_addr, actual_src_addr) 2686 2687 def test_should_parse_source_address_when_decompress_src_addr_called_with_sac_eq_1_and_sam_eq_3(self): 2688 # GIVEN 2689 sci = any_sci() 2690 2691 context = any_context() 2692 2693 context_manager = lowpan.ContextManager() 2694 context_manager[sci] = context 2695 2696 factory = lowpan.LowpanIpv6HeaderFactory(context_manager) 2697 2698 src_mac_addr = common.MacAddress.from_eui64(any_src_mac_addr()) 2699 2700 sac = factory.IPHC_SAC_STATEFUL 2701 sam = factory.IPHC_SAM_0B 2702 2703 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), sac, sam, any_m(), any_dac(), any_dam()) 2704 2705 iid = bytearray([src_mac_addr.mac_address[0] ^ 0x02]) + src_mac_addr.mac_address[1:] 2706 2707 src_addr = self._merge_prefix_and_address(context.prefix, context.prefix_length, iid) 2708 2709 data_bytes = bytearray([]) 2710 2711 # WHEN 2712 actual_src_addr = factory._decompress_src_addr(iphc, src_mac_addr, sci, io.BytesIO(data_bytes)) 2713 2714 # THEN 2715 self.assertEqual(1, sac) 2716 self.assertEqual(3, sam) 2717 self.assertEqual(src_addr, actual_src_addr) 2718 2719 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_0_and_dam_eq_0(self): 2720 # GIVEN 2721 factory = lowpan.LowpanIpv6HeaderFactory() 2722 2723 ipv6_addr = any_dst_addr() 2724 2725 m = factory.IPHC_M_NO 2726 dac = factory.IPHC_DAC_STATELESS 2727 dam = factory.IPHC_DAM_128B 2728 2729 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2730 2731 dst_mac_addr = bytearray([0x00] * 8) 2732 2733 # WHEN 2734 actual_dst_addr = factory._decompress_dst_addr(iphc, dst_mac_addr, any_dci(), io.BytesIO(ipv6_addr)) 2735 2736 # THEN 2737 self.assertEqual(0, m) 2738 self.assertEqual(0, dac) 2739 self.assertEqual(0, dam) 2740 self.assertEqual(ipv6_addr, actual_dst_addr) 2741 2742 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_0_and_dam_eq_1(self): 2743 # GIVEN 2744 factory = lowpan.LowpanIpv6HeaderFactory() 2745 2746 eui64 = any_eui64() 2747 2748 m = factory.IPHC_M_NO 2749 dac = factory.IPHC_DAC_STATELESS 2750 dam = factory.IPHC_DAM_64B 2751 2752 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2753 2754 # WHEN 2755 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), any_dci(), io.BytesIO(eui64)) 2756 2757 # THEN 2758 self.assertEqual(0, m) 2759 self.assertEqual(0, dac) 2760 self.assertEqual(1, dam) 2761 self.assertEqual(self.IPV6_LINKLOCAL_PREFIX + eui64, actual_dst_addr) 2762 2763 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_0_and_dam_eq_2(self): 2764 # GIVEN 2765 factory = lowpan.LowpanIpv6HeaderFactory() 2766 2767 rloc16 = any_rloc16() 2768 2769 m = factory.IPHC_M_NO 2770 dac = factory.IPHC_DAC_STATELESS 2771 dam = factory.IPHC_DAM_16B 2772 2773 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2774 2775 # WHEN 2776 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), any_dci(), io.BytesIO(rloc16)) 2777 2778 # THEN 2779 self.assertEqual(0, m) 2780 self.assertEqual(0, dac) 2781 self.assertEqual(2, dam) 2782 self.assertEqual(self.IPV6_LINKLOCAL_PREFIX + bytearray([0x00, 0x00, 0x00, 0xff, 0xfe, 0x00]) + rloc16, 2783 actual_dst_addr) 2784 2785 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_0_and_dam_eq_3(self): 2786 # GIVEN 2787 factory = lowpan.LowpanIpv6HeaderFactory() 2788 2789 dst_mac_addr = common.MacAddress.from_eui64(any_dst_mac_addr()) 2790 2791 m = factory.IPHC_M_NO 2792 dac = factory.IPHC_DAC_STATELESS 2793 dam = factory.IPHC_DAM_ELIDED 2794 2795 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2796 2797 data_bytes = bytearray([]) 2798 2799 # WHEN 2800 actual_dst_addr = factory._decompress_dst_addr(iphc, dst_mac_addr, any_dci(), io.BytesIO(data_bytes)) 2801 2802 # THEN 2803 self.assertEqual(0, m) 2804 self.assertEqual(0, dac) 2805 self.assertEqual(3, dam) 2806 self.assertEqual( 2807 self.IPV6_LINKLOCAL_PREFIX + bytearray([dst_mac_addr.mac_address[0] ^ 0x02]) + 2808 dst_mac_addr.mac_address[1:], actual_dst_addr) 2809 2810 def test_should_raise_RuntimeError_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_1_and_dam_eq_0(self): 2811 # GIVEN 2812 factory = lowpan.LowpanIpv6HeaderFactory() 2813 2814 ipv6_addr = any_dst_addr() 2815 2816 m = factory.IPHC_M_NO 2817 dac = factory.IPHC_DAC_STATEFUL 2818 dam = factory.IPHC_DAM_128B 2819 2820 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2821 2822 # WHEN 2823 self.assertRaises(RuntimeError, factory._decompress_dst_addr, iphc, any_dst_mac_addr(), any_dci(), 2824 io.BytesIO(ipv6_addr)) 2825 2826 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_1_and_dam_eq_1(self): 2827 # GIVEN 2828 dci = any_dci() 2829 2830 context = any_context() 2831 2832 context_manager = lowpan.ContextManager() 2833 context_manager[dci] = context 2834 2835 factory = lowpan.LowpanIpv6HeaderFactory(context_manager) 2836 2837 eui64 = any_eui64() 2838 2839 m = factory.IPHC_M_NO 2840 dac = factory.IPHC_DAC_STATEFUL 2841 dam = factory.IPHC_DAM_64B 2842 2843 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2844 2845 dst_addr = self._merge_prefix_and_address(context.prefix, context.prefix_length, eui64) 2846 2847 # WHEN 2848 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), dci, io.BytesIO(eui64)) 2849 2850 # THEN 2851 self.assertEqual(0, m) 2852 self.assertEqual(1, dac) 2853 self.assertEqual(1, dam) 2854 self.assertEqual(dst_addr, actual_dst_addr) 2855 2856 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_1_and_dam_eq_2(self): 2857 # GIVEN 2858 dci = any_dci() 2859 2860 context = any_context() 2861 2862 context_manager = lowpan.ContextManager() 2863 context_manager[dci] = context 2864 2865 factory = lowpan.LowpanIpv6HeaderFactory(context_manager) 2866 2867 rloc16 = any_rloc16() 2868 2869 m = factory.IPHC_M_NO 2870 dac = factory.IPHC_DAC_STATEFUL 2871 dam = factory.IPHC_DAM_16B 2872 2873 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2874 2875 iid = bytearray([0x00, 0x00, 0x00, 0xff, 0xfe, 0x00]) + rloc16 2876 2877 dst_addr = self._merge_prefix_and_address(context.prefix, context.prefix_length, iid) 2878 2879 # WHEN 2880 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), dci, io.BytesIO(rloc16)) 2881 2882 # THEN 2883 self.assertEqual(0, m) 2884 self.assertEqual(1, dac) 2885 self.assertEqual(2, dam) 2886 self.assertEqual(dst_addr, actual_dst_addr) 2887 2888 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_0_and_dac_eq_1_and_dam_eq_3(self): 2889 # GIVEN 2890 dci = any_dci() 2891 2892 context = any_context() 2893 2894 context_manager = lowpan.ContextManager() 2895 context_manager[dci] = context 2896 2897 factory = lowpan.LowpanIpv6HeaderFactory(context_manager) 2898 2899 dst_mac_addr = common.MacAddress.from_eui64(any_dst_mac_addr()) 2900 2901 m = factory.IPHC_M_NO 2902 dac = factory.IPHC_DAC_STATEFUL 2903 dam = factory.IPHC_DAM_0B 2904 2905 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2906 2907 iid = bytearray([dst_mac_addr.mac_address[0] ^ 0x02]) + dst_mac_addr.mac_address[1:] 2908 2909 dst_addr = self._merge_prefix_and_address(context.prefix, context.prefix_length, iid) 2910 2911 data_bytes = bytearray([]) 2912 2913 # WHEN 2914 actual_dst_addr = factory._decompress_dst_addr(iphc, dst_mac_addr, dci, io.BytesIO(data_bytes)) 2915 2916 # THEN 2917 self.assertEqual(0, m) 2918 self.assertEqual(1, dac) 2919 self.assertEqual(3, dam) 2920 self.assertEqual(dst_addr, actual_dst_addr) 2921 2922 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_0_and_dam_eq_0(self): 2923 # GIVEN 2924 factory = lowpan.LowpanIpv6HeaderFactory() 2925 2926 ipv6_addr = any_dst_addr() 2927 2928 m = factory.IPHC_M_YES 2929 dac = factory.IPHC_DAC_STATELESS 2930 dam = factory.IPHC_DAM_128B 2931 2932 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2933 2934 # WHEN 2935 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), any_dci(), io.BytesIO(ipv6_addr)) 2936 2937 # THEN 2938 self.assertEqual(1, m) 2939 self.assertEqual(0, dac) 2940 self.assertEqual(0, dam) 2941 self.assertEqual(ipv6_addr, actual_dst_addr) 2942 2943 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_0_and_dam_eq_1(self): 2944 # GIVEN 2945 factory = lowpan.LowpanIpv6HeaderFactory() 2946 2947 addr48b = any_48bits_addr() 2948 2949 m = factory.IPHC_M_YES 2950 dac = factory.IPHC_DAC_STATELESS 2951 dam = factory.IPHC_DAM_48B 2952 2953 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2954 2955 expected_dst_addr = bytearray([ 2956 0xff, addr48b[0], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, addr48b[1], addr48b[2], addr48b[3], 2957 addr48b[4], addr48b[5] 2958 ]) 2959 2960 # WHEN 2961 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), any_dci(), io.BytesIO(addr48b)) 2962 2963 # THEN 2964 self.assertEqual(1, m) 2965 self.assertEqual(0, dac) 2966 self.assertEqual(1, dam) 2967 self.assertEqual(expected_dst_addr, actual_dst_addr) 2968 2969 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_0_and_dam_eq_2(self): 2970 # GIVEN 2971 factory = lowpan.LowpanIpv6HeaderFactory() 2972 2973 addr32b = any_32bits_addr() 2974 2975 m = factory.IPHC_M_YES 2976 dac = factory.IPHC_DAC_STATELESS 2977 dam = factory.IPHC_DAM_32B 2978 2979 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 2980 2981 expected_dst_addr = bytearray([ 2982 0xff, addr32b[0], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, addr32b[1], addr32b[2], 2983 addr32b[3] 2984 ]) 2985 2986 # WHEN 2987 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), any_dci(), io.BytesIO(addr32b)) 2988 2989 # THEN 2990 self.assertEqual(1, m) 2991 self.assertEqual(0, dac) 2992 self.assertEqual(2, dam) 2993 self.assertEqual(expected_dst_addr, actual_dst_addr) 2994 2995 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_0_and_dam_eq_3(self): 2996 # GIVEN 2997 factory = lowpan.LowpanIpv6HeaderFactory() 2998 2999 addr8b = any_8bits_addr() 3000 3001 m = factory.IPHC_M_YES 3002 dac = factory.IPHC_DAC_STATELESS 3003 dam = factory.IPHC_DAM_8B 3004 3005 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 3006 3007 expected_dst_addr = bytearray( 3008 [0xff, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, addr8b[0]]) 3009 3010 # WHEN 3011 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), any_dci(), io.BytesIO(addr8b)) 3012 3013 # THEN 3014 self.assertEqual(1, m) 3015 self.assertEqual(0, dac) 3016 self.assertEqual(3, dam) 3017 self.assertEqual(expected_dst_addr, actual_dst_addr) 3018 3019 def test_should_raise_RuntimeError_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_1_and_dam_eq_0(self): 3020 # GIVEN 3021 dci = any_dci() 3022 3023 context = any_context() 3024 3025 context_manager = lowpan.ContextManager() 3026 context_manager[dci] = context 3027 3028 factory = lowpan.LowpanIpv6HeaderFactory(context_manager) 3029 3030 addr48b = any_48bits_addr() 3031 3032 m = factory.IPHC_M_YES 3033 dac = factory.IPHC_DAC_STATEFUL 3034 dam = factory.IPHC_DAM_128B 3035 3036 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 3037 3038 prefix = context.prefix[:8] 3039 3040 if len(prefix) < 8: 3041 missing_bytes_count = 8 - len(prefix) 3042 prefix += bytearray([0x00] * missing_bytes_count) 3043 3044 prefix_length = context.prefix_length 3045 3046 dst_addr = bytearray([0xff]) + addr48b[:2] + bytearray([prefix_length]) + prefix + addr48b[2:] 3047 3048 # WHEN 3049 actual_dst_addr = factory._decompress_dst_addr(iphc, any_dst_mac_addr(), dci, io.BytesIO(addr48b)) 3050 3051 # THEN 3052 self.assertEqual(1, m) 3053 self.assertEqual(1, dac) 3054 self.assertEqual(0, dam) 3055 self.assertEqual(dst_addr, actual_dst_addr) 3056 3057 def test_should_raise_RuntimeError_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_1_and_dam_eq_1(self): 3058 # GIVEN 3059 factory = lowpan.LowpanIpv6HeaderFactory() 3060 3061 addr48b = any_48bits_addr() 3062 3063 m = factory.IPHC_M_YES 3064 dac = factory.IPHC_DAC_STATEFUL 3065 dam = factory.IPHC_DAM_48B 3066 3067 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 3068 3069 # WHEN 3070 self.assertRaises(RuntimeError, factory._decompress_dst_addr, iphc, any_dst_mac_addr(), any_dci(), 3071 io.BytesIO(addr48b)) 3072 3073 def test_should_raise_RuntimeError_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_1_and_dam_eq_2(self): 3074 # GIVEN 3075 factory = lowpan.LowpanIpv6HeaderFactory() 3076 3077 addr32b = any_32bits_addr() 3078 3079 m = factory.IPHC_M_YES 3080 dac = factory.IPHC_DAC_STATEFUL 3081 dam = factory.IPHC_DAM_32B 3082 3083 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 3084 3085 # WHEN 3086 self.assertRaises(RuntimeError, factory._decompress_dst_addr, iphc, any_dst_mac_addr(), any_dci(), 3087 io.BytesIO(addr32b)) 3088 3089 def test_should_parse_dst_addr_when_decompress_dst_addr_called_with_m_eq_1_and_dac_eq_1_and_dam_eq_3(self): 3090 # GIVEN 3091 factory = lowpan.LowpanIpv6HeaderFactory() 3092 3093 addr8b = any_8bits_addr() 3094 3095 m = factory.IPHC_M_YES 3096 dac = factory.IPHC_DAC_STATEFUL 3097 dam = factory.IPHC_DAM_8B 3098 3099 iphc = lowpan.LowpanIPHC(any_tf(), any_nh(), any_hlim(), any_cid(), any_sac(), any_sam(), m, dac, dam) 3100 3101 # WHEN 3102 self.assertRaises(RuntimeError, factory._decompress_dst_addr, iphc, any_dst_mac_addr(), any_dci(), 3103 io.BytesIO(addr8b)) 3104 3105 def test_should_merge_pfx_with_addr_bytes_when_merge_method_called_with_pfx_shorter_than_missing_bits(self): 3106 # GIVEN 3107 factory = lowpan.LowpanIpv6HeaderFactory() 3108 3109 prefix = bytearray([0x20, 0x00, 0x0d, 0xb8]) 3110 prefix_length = 32 3111 3112 address_bytes = bytearray([0x1a, 0x2b, 0x3c, 0x4d, 0x5e, 0x6f, 0x70, 0x81]) 3113 3114 addr = prefix + bytearray([0x00] * 4) + address_bytes 3115 3116 # WHEN 3117 actual_addr = factory._merge_prefix_with_address(prefix, prefix_length, address_bytes) 3118 3119 # THEN 3120 self.assertEqual(addr, actual_addr) 3121 3122 def test_should_merge_pfx_with_addr_bytes_when_merge_method_called_with_pfx_longer_than_missing_bits_overlap(self): 3123 # GIVEN 3124 factory = lowpan.LowpanIpv6HeaderFactory() 3125 3126 prefix = bytearray([0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x22]) 3127 prefix_length = 68 3128 3129 address_bytes = bytearray([0x1a, 0x2b, 0x3c, 0x4d, 0x5e, 0x6f, 0x70, 0x81]) 3130 3131 addr = prefix[:-1] + bytearray([0x2a]) + address_bytes[1:] 3132 3133 # WHEN 3134 actual_addr = factory._merge_prefix_with_address(prefix, prefix_length, address_bytes) 3135 3136 # THEN 3137 self.assertEqual(addr, actual_addr) 3138 3139 def test_should_merge_pfx_with_address_bytes_when_merge_method_called_with_pfx_longer_than_missing_bits(self): 3140 # GIVEN 3141 factory = lowpan.LowpanIpv6HeaderFactory() 3142 3143 prefix = bytearray( 3144 [0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x22, 0x00, 0x00, 0x11, 0x01, 0x11, 0x01, 0x22]) 3145 prefix_length = 128 3146 3147 address_bytes = bytearray([0x1a, 0x2b, 0x3c, 0x4d, 0x5e, 0x6f, 0x70, 0x81]) 3148 3149 addr = prefix 3150 3151 # WHEN 3152 actual_addr = factory._merge_prefix_with_address(prefix, prefix_length, address_bytes) 3153 3154 # THEN 3155 self.assertEqual(addr, actual_addr) 3156 3157 3158class TestContext(unittest.TestCase): 3159 3160 def test_should_extract_context_from_str_representation_when_constructor_called(self): 3161 # GIVEN 3162 prefix = "2000:db8::/64" 3163 3164 # WHEN 3165 c = lowpan.Context(prefix) 3166 3167 # THEN 3168 self.assertEqual(bytearray([0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00]), c.prefix) 3169 self.assertEqual(64, c.prefix_length) 3170 self.assertEqual(8, c.prefix_length_full_bytes) 3171 3172 def test_should_extract_context_from_bytearray_when_construct_called(self): 3173 # GIVEN 3174 prefix = bytearray([0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00]) 3175 3176 # WHEN 3177 c = lowpan.Context(prefix) 3178 3179 # THEN 3180 self.assertEqual(bytearray([0x20, 0x00, 0x0d, 0xb8, 0x00, 0x00, 0x00, 0x00]), c.prefix) 3181 self.assertEqual(8, c.prefix_length_full_bytes) 3182 self.assertEqual(64, c.prefix_length) 3183 3184 3185class TestContextManager(unittest.TestCase): 3186 3187 def test_should_raise_IndexError_when_index_is_larger_than_15(self): 3188 # GIVEN 3189 context_manager = lowpan.ContextManager() 3190 3191 index = random.randint(16, 255) 3192 3193 # WHEN 3194 with self.assertRaises(IndexError): 3195 context_manager[index] = any_context() 3196 3197 def test_should_raise_IndexError_when_index_is_smaller_than_0(self): 3198 # GIVEN 3199 context_manager = lowpan.ContextManager() 3200 3201 index = random.randint(-255, -1) 3202 3203 # WHEN 3204 with self.assertRaises(IndexError): 3205 context_manager[index] = any_context() 3206 3207 def test_should_raise_TypeError_when_set_value_is_not_Context(self): 3208 # GIVEN 3209 context_manager = lowpan.ContextManager() 3210 3211 # WHEN 3212 with self.assertRaises(TypeError): 3213 context_manager[0] = int 3214 3215 3216class TestLowpanMeshHeader(unittest.TestCase): 3217 3218 def test_should_return_hops_left_value_when_hops_left_property_called(self): 3219 # GIVEN 3220 hops_left = any_hops_left() 3221 3222 mesh_header = lowpan.LowpanMeshHeader(hops_left, any_mac_address(), any_mac_address()) 3223 3224 # WHEN 3225 actual_hops_left = mesh_header.hops_left 3226 3227 # THEN 3228 self.assertEqual(hops_left, actual_hops_left) 3229 3230 def test_should_return_originator_address_value_when_originator_address_property_called(self): 3231 # GIVEN 3232 originator_address = any_mac_address() 3233 3234 mesh_header = lowpan.LowpanMeshHeader(any_hops_left(), originator_address, any_mac_address()) 3235 3236 # WHEN 3237 actual_originator_address = mesh_header.originator_address 3238 3239 # THEN 3240 self.assertEqual(originator_address, actual_originator_address) 3241 3242 def test_should_return_final_destination_address_value_when_final_destination_address_property_called(self): 3243 # GIVEN 3244 final_destination_address = any_mac_address() 3245 3246 mesh_header = lowpan.LowpanMeshHeader(any_hops_left(), any_mac_address(), final_destination_address) 3247 3248 # WHEN 3249 actual_final_destination_address = mesh_header.final_destination_address 3250 3251 # THEN 3252 self.assertEqual(final_destination_address, actual_final_destination_address) 3253 3254 3255class TestLowpanMeshHeaderFactory(unittest.TestCase): 3256 3257 def test_should_create_LowpanMeshHeader_when_parse_method_called(self): 3258 # GIVEN 3259 hops_left = any_hops_left() 3260 3261 originator_address = any_mac_address() 3262 final_destination_address = any_mac_address() 3263 3264 v = int(originator_address.type == common.MacAddressType.SHORT) 3265 f = int(final_destination_address.type == common.MacAddressType.SHORT) 3266 3267 mesh_header_first_byte = (2 << 6) | (v << 5) | (f << 4) 3268 3269 if hops_left >= 0x0f: 3270 mesh_header_data = bytearray([mesh_header_first_byte | 0x0f, hops_left]) 3271 else: 3272 mesh_header_data = bytearray([mesh_header_first_byte | hops_left]) 3273 3274 mesh_header_data += originator_address.mac_address + final_destination_address.mac_address 3275 3276 mesh_header_factory = lowpan.LowpanMeshHeaderFactory() 3277 3278 # WHEN 3279 mesh_header = mesh_header_factory.parse(io.BytesIO(mesh_header_data), None) 3280 3281 # THEN 3282 self.assertEqual(hops_left, mesh_header.hops_left) 3283 self.assertEqual(originator_address, mesh_header.originator_address) 3284 self.assertEqual(final_destination_address, mesh_header.final_destination_address) 3285 3286 3287class TestLowpanFragmentationHeader(unittest.TestCase): 3288 3289 def test_should_return_datagram_size_value_when_datagram_size_property_called(self): 3290 # GIVEN 3291 datagram_size = any_datagram_size() 3292 3293 fragmentation_header = lowpan.LowpanFragmentationHeader(datagram_size, any_datagram_tag(), 3294 any_datagram_offset()) 3295 3296 # WHEN 3297 actual_datagram_size = fragmentation_header.datagram_size 3298 3299 # THEN 3300 self.assertEqual(datagram_size, actual_datagram_size) 3301 3302 def test_should_return_datagram_tag_value_when_datagram_tag_property_called(self): 3303 # GIVEN 3304 datagram_tag = any_datagram_tag() 3305 3306 fragmentation_header = lowpan.LowpanFragmentationHeader(any_datagram_size(), datagram_tag, 3307 any_datagram_offset()) 3308 3309 # WHEN 3310 actual_datagram_tag = fragmentation_header.datagram_tag 3311 3312 # THEN 3313 self.assertEqual(datagram_tag, actual_datagram_tag) 3314 3315 def test_should_return_datagram_offset_value_when_datagram_offset_property_called(self): 3316 # GIVEN 3317 datagram_offset = any_datagram_offset() 3318 3319 fragmentation_header = lowpan.LowpanFragmentationHeader(any_datagram_size(), any_datagram_tag(), 3320 datagram_offset) 3321 3322 # WHEN 3323 actual_datagram_offset = fragmentation_header.datagram_offset 3324 3325 # THEN 3326 self.assertEqual(datagram_offset, actual_datagram_offset) 3327 3328 def test_should_return_False_when_is_first_property_called_and_datagram_offset_is_not_eq_0(self): 3329 # GIVEN 3330 datagram_offset = random.randint(1, (1 << 8) - 1) 3331 3332 fragmentation_header = lowpan.LowpanFragmentationHeader(any_datagram_size(), any_datagram_tag(), 3333 datagram_offset) 3334 3335 # WHEN 3336 is_first = fragmentation_header.is_first 3337 3338 # THEN 3339 self.assertFalse(is_first) 3340 3341 def test_should_to_bytes_LowpanFragmentationHeader_from_bytes_when_from_bytes_class_method_called(self): 3342 # GIVEN 3343 datagram_size = any_datagram_size() 3344 datagram_tag = any_datagram_tag() 3345 datagram_offset = any_datagram_offset() 3346 3347 data = struct.pack(">HHB", ((3 << 14) | (int(datagram_offset != 0) << 13) | datagram_size), datagram_tag, 3348 datagram_offset) 3349 3350 # WHEN 3351 fragmentation_header = lowpan.LowpanFragmentationHeader.from_bytes(io.BytesIO(data)) 3352 3353 # THEN 3354 self.assertEqual(datagram_size, fragmentation_header.datagram_size) 3355 self.assertEqual(datagram_tag, fragmentation_header.datagram_tag) 3356 self.assertEqual(datagram_offset, fragmentation_header.datagram_offset) 3357 3358 3359class TestLowpanDecompressor(unittest.TestCase): 3360 3361 def test_should_parse_parent_request_when_decompress_method_called(self): 3362 # GIVEN 3363 data = bytearray([ 3364 0x7f, 0x3b, 0x02, 0xf0, 0x4d, 0x4c, 0x4d, 0x4c, 0x5e, 0xaf, 0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 3365 0x00, 0x00, 0x01, 0x3b, 0xfb, 0x0e, 0x3b, 0x15, 0xa1, 0xf9, 0xf5, 0x64, 0xf4, 0x99, 0xef, 0x70, 0x78, 0x6c, 3366 0x3c, 0x0f, 0x54, 0x4e, 0x95, 0xe8, 0xf5, 0x27, 0x4c, 0xfc 3367 ]) 3368 3369 message_info = common.MessageInfo() 3370 message_info.source_mac_address = common.MacAddress.from_eui64( 3371 bytearray([0x12, 0xcf, 0xd3, 0x8b, 0x3b, 0x61, 0x55, 0x58])) 3372 3373 decompressor = config.create_default_lowpan_decompressor(context_manager=None) 3374 3375 # WHEN 3376 ipv6_header, extension_headers, udp_header = decompressor.decompress(io.BytesIO(data), message_info) 3377 3378 # THEN 3379 self.assertEqual("fe80::10cf:d38b:3b61:5558", ipv6_header.source_address.compressed) 3380 self.assertEqual("ff02::2", ipv6_header.destination_address.compressed) 3381 self.assertEqual(17, ipv6_header.next_header) 3382 self.assertEqual(255, ipv6_header.hop_limit) 3383 3384 self.assertEqual([], extension_headers) 3385 3386 def test_should_parse_parent_response_when_decompress_method_called(self): 3387 # GIVEN 3388 data = bytearray([ 3389 0x7f, 0x33, 0xf0, 0x4d, 0x4c, 0x4d, 0x4c, 0x0f, 0xe8, 0x00, 0x15, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 3390 0x00, 0x01, 0x31, 0xb8, 0x16, 0x02, 0x61, 0xcc, 0x98, 0x90, 0xd6, 0xfd, 0x69, 0xd3, 0x89, 0xa0, 0x30, 0x49, 3391 0x83, 0x7c, 0xf7, 0xb5, 0x7f, 0x83, 0x2a, 0x04, 0xf6, 0x3b, 0x8c, 0xe8, 0xb6, 0x37, 0x51, 0x5b, 0x28, 0x9a, 3392 0x3b, 0xbe, 0x0d, 0xb3, 0x4e, 0x9f, 0xd8, 0x14, 0xc8, 0xc9, 0xf4, 0x28, 0xf6, 0x8d, 0xb7, 0xf0, 0x7d, 0x46, 3393 0x13, 0xc2, 0xb1, 0x69, 0x4d, 0xae, 0xc1, 0x23, 0x16, 0x62, 0x90, 0xea, 0xff, 0x1b, 0xb7, 0xd7, 0x1e, 0x5c 3394 ]) 3395 3396 message_info = common.MessageInfo() 3397 message_info.source_mac_address = common.MacAddress.from_eui64( 3398 bytearray([0x3a, 0x3e, 0x9e, 0xed, 0x7a, 0x01, 0x36, 0xa5])) 3399 message_info.destination_mac_address = common.MacAddress.from_eui64( 3400 bytearray([0x12, 0xcf, 0xd3, 0x8b, 0x3b, 0x61, 0x55, 0x58])) 3401 3402 decompressor = config.create_default_lowpan_decompressor(context_manager=None) 3403 3404 # WHEN 3405 ipv6_header, extension_headers, udp_header = decompressor.decompress(io.BytesIO(data), message_info) 3406 3407 # THEN 3408 self.assertEqual("fe80::383e:9eed:7a01:36a5", ipv6_header.source_address.compressed) 3409 self.assertEqual("fe80::10cf:d38b:3b61:5558", ipv6_header.destination_address.compressed) 3410 self.assertEqual(17, ipv6_header.next_header) 3411 self.assertEqual(255, ipv6_header.hop_limit) 3412 3413 self.assertEqual([], extension_headers) 3414 3415 self.assertEqual(19788, udp_header.src_port) 3416 self.assertEqual(19788, udp_header.dst_port) 3417 3418 def test_should_parse_child_id_request_when_decompress_method_called(self): 3419 # GIVEN 3420 data = bytearray([ 3421 0x7f, 0x33, 0xf0, 0x4d, 0x4c, 0x4d, 0x4c, 0x9a, 0x62, 0x00, 0x15, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 3422 0x00, 0x01, 0x14, 0x03, 0xe3, 0x72, 0x50, 0x4f, 0x8c, 0x5c, 0x42, 0x81, 0x68, 0xe2, 0x11, 0xfc, 0xf5, 0x8c, 3423 0x62, 0x8e, 0x83, 0x99, 0xe7, 0x26, 0x86, 0x34, 0x3b, 0xa7, 0x68, 0xc7, 0x93, 0xfb, 0x72, 0xd9, 0xcc, 0x13, 3424 0x5e, 0x5b, 0x96, 0x0e, 0xf1, 0x80, 0x03, 0x55, 0x4f, 0x27, 0xc2, 0x96, 0xf4, 0x9c, 0x65, 0x82, 0x97, 0xcf, 3425 0x97, 0x35, 0x89, 0xc2 3426 ]) 3427 3428 message_info = common.MessageInfo() 3429 message_info.source_mac_address = common.MacAddress.from_eui64( 3430 bytearray([0x12, 0xcf, 0xd3, 0x8b, 0x3b, 0x61, 0x55, 0x58])) 3431 message_info.destination_mac_address = common.MacAddress.from_eui64( 3432 bytearray([0x3a, 0x3e, 0x9e, 0xed, 0x7a, 0x01, 0x36, 0xa5])) 3433 3434 decompressor = config.create_default_lowpan_decompressor(context_manager=None) 3435 3436 # WHEN 3437 ipv6_header, extension_headers, udp_header = decompressor.decompress(io.BytesIO(data), message_info) 3438 3439 # THEN 3440 self.assertEqual("fe80::10cf:d38b:3b61:5558", ipv6_header.source_address.compressed) 3441 self.assertEqual("fe80::383e:9eed:7a01:36a5", ipv6_header.destination_address.compressed) 3442 self.assertEqual(17, ipv6_header.next_header) 3443 self.assertEqual(255, ipv6_header.hop_limit) 3444 3445 self.assertEqual([], extension_headers) 3446 3447 self.assertEqual(19788, udp_header.src_port) 3448 self.assertEqual(19788, udp_header.dst_port) 3449 3450 def test_should_parse_child_id_response_when_decompress_method_called(self): 3451 # GIVEN 3452 data = bytearray([ 3453 0x7f, 0x33, 0xf0, 0x4d, 0x4c, 0x4d, 0x4c, 0x7b, 0xe3, 0x00, 0x15, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 3454 0x00, 0x01, 0xe0, 0x57, 0xbf, 0x2f, 0xc0, 0x4b, 0x1d, 0xac, 0x3c, 0x24, 0x16, 0xdf, 0xeb, 0x96, 0xeb, 0xda, 3455 0x42, 0xeb, 0x00, 0x89, 0x5f, 0x39, 0xc9, 0x2b, 0x7d, 0x31, 0xd5, 0x83, 0x9d, 0xdb, 0xb7, 0xc8, 0xe6, 0x25, 3456 0xd3, 0x7a, 0x1e, 0x5f, 0x66, 0x9e, 0x63, 0x2d, 0x42, 0x27, 0x19, 0x41, 0xdc, 0xc4, 0xc4, 0xc0, 0x8c, 0x07 3457 ]) 3458 3459 message_info = common.MessageInfo() 3460 message_info.source_mac_address = common.MacAddress.from_eui64( 3461 bytearray([0x3a, 0x3e, 0x9e, 0xed, 0x7a, 0x01, 0x36, 0xa5])) 3462 message_info.destination_mac_address = common.MacAddress.from_eui64( 3463 bytearray([0x12, 0xcf, 0xd3, 0x8b, 0x3b, 0x61, 0x55, 0x58])) 3464 3465 decompressor = config.create_default_lowpan_decompressor(context_manager=None) 3466 3467 # WHEN 3468 ipv6_header, extension_headers, udp_header = decompressor.decompress(io.BytesIO(data), message_info) 3469 3470 # THEN 3471 self.assertEqual("fe80::383e:9eed:7a01:36a5", ipv6_header.source_address.compressed) 3472 self.assertEqual("fe80::10cf:d38b:3b61:5558", ipv6_header.destination_address.compressed) 3473 self.assertEqual(17, ipv6_header.next_header) 3474 self.assertEqual(255, ipv6_header.hop_limit) 3475 3476 self.assertEqual([], extension_headers) 3477 3478 self.assertEqual(19788, udp_header.src_port) 3479 self.assertEqual(19788, udp_header.dst_port) 3480 3481 def test_should_parse_advertisement_when_decompress_method_called(self): 3482 # GIVEN 3483 data = bytearray([ 3484 0x7f, 0x3b, 0x01, 0xf0, 0x4d, 0x4c, 0x4d, 0x4c, 0x35, 0x9f, 0x00, 0x15, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 3485 0x00, 0x00, 0x01, 0x9e, 0xb8, 0xd0, 0x2f, 0x2a, 0xe0, 0x00, 0x5d, 0x66, 0x63, 0x05, 0xa0, 0x59, 0xb0, 0xd4, 3486 0x95, 0x7f, 0xe6, 0x79, 0x17, 0x87, 0x2c, 0x1d, 0x83, 0xad, 0xc2, 0x64, 0x47, 0x20, 0x7a, 0xe2 3487 ]) 3488 3489 message_info = common.MessageInfo() 3490 message_info.source_mac_address = common.MacAddress.from_eui64( 3491 bytearray([0x3a, 0x3e, 0x9e, 0xed, 0x7a, 0x01, 0x36, 0xa5])) 3492 3493 decompressor = config.create_default_lowpan_decompressor(context_manager=None) 3494 3495 # WHEN 3496 ipv6_header, extension_headers, udp_header = decompressor.decompress(io.BytesIO(data), message_info) 3497 3498 # THEN 3499 self.assertEqual("fe80::383e:9eed:7a01:36a5", ipv6_header.source_address.compressed) 3500 self.assertEqual("ff02::1", ipv6_header.destination_address.compressed) 3501 self.assertEqual(17, ipv6_header.next_header) 3502 self.assertEqual(255, ipv6_header.hop_limit) 3503 3504 self.assertEqual([], extension_headers) 3505 3506 self.assertEqual(19788, udp_header.src_port) 3507 self.assertEqual(19788, udp_header.dst_port) 3508 3509 3510class TestLowpanFragmentsBuffer(unittest.TestCase): 3511 3512 def test_should_raise_ValueError_when_write_method_called_with_data_length_bigger_than_buffer_length(self): 3513 # GIVEN 3514 length = random.randint(1, 1280) 3515 3516 fragments_buffer = lowpan.LowpanFragmentsBuffer(buffer_size=(length - 1)) 3517 3518 # THEN 3519 self.assertRaises(ValueError, fragments_buffer.write, any_data(length)) 3520 3521 def test_should_move_write_position_by_the_data_length_when_write_method_called(self): 3522 # GIVEN 3523 length = random.randint(1, 1280) 3524 3525 fragments_buffer = lowpan.LowpanFragmentsBuffer(buffer_size=length) 3526 3527 start_position = fragments_buffer.tell() 3528 3529 data = any_data(length=random.randint(1, length)) 3530 3531 # WHEN 3532 fragments_buffer.write(data) 3533 3534 # THEN 3535 self.assertEqual(fragments_buffer.tell() - start_position, len(data)) 3536 3537 def test_should_raise_ValueError_when_read_method_called_but_not_whole_packet_has_been_stored_in_buffer(self): 3538 # GIVEN 3539 data = any_data(length=3) 3540 3541 fragments_buffer = lowpan.LowpanFragmentsBuffer(buffer_size=random.randint(4, 1280)) 3542 fragments_buffer.write(data) 3543 3544 # WHEN 3545 self.assertRaises(ValueError, fragments_buffer.read) 3546 3547 def test_should_raise_ValueError_when_seek_method_called_with_offset_bigger_than_buffer_length(self): 3548 # GIVEN 3549 offset = random.randint(1281, 2500) 3550 3551 fragments_buffer = lowpan.LowpanFragmentsBuffer(buffer_size=1280) 3552 3553 # THEN 3554 self.assertRaises(ValueError, fragments_buffer.seek, offset) 3555 3556 def test_should_set_write_position_when_seek_method_called(self): 3557 # GIVEN 3558 length = random.randint(1, 1280) 3559 offset = random.randint(0, length - 1) 3560 3561 fragments_buffer = lowpan.LowpanFragmentsBuffer(buffer_size=length) 3562 3563 # WHEN 3564 fragments_buffer.seek(offset) 3565 3566 # THEN 3567 self.assertEqual(offset, fragments_buffer.tell()) 3568 3569 def test_should_write_whole_packet_to_buffer_when_write_method_called(self): 3570 # GIVEN 3571 data = any_data(length=random.randint(1, 1280)) 3572 3573 fragments_buffer = lowpan.LowpanFragmentsBuffer(buffer_size=len(data)) 3574 3575 # WHEN 3576 fragments_buffer.write(data) 3577 3578 # THEN 3579 self.assertEqual(data, fragments_buffer.read()) 3580 3581 def test_should_write_many_frags_to_the_buffer_and_return_whole_message_when_write_method_called_many_times(self): 3582 # GIVEN 3583 buffer_size = 42 3584 fragments_buffer = lowpan.LowpanFragmentsBuffer(buffer_size=buffer_size) 3585 3586 offset_1 = 0 3587 fragment_1 = bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]) 3588 3589 offset_2 = 8 3590 fragment_2 = bytearray([0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10]) 3591 3592 offset_3 = 16 3593 fragment_3 = bytearray([0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18]) 3594 3595 offset_4 = 24 3596 fragment_4 = bytearray([0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20]) 3597 3598 offset_5 = 32 3599 fragment_5 = bytearray([0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28]) 3600 3601 offset_6 = 40 3602 fragment_6 = bytearray([0x29, 0x2a]) 3603 3604 # WHEN 3605 fragments_buffer.seek(offset_1) 3606 fragments_buffer.write(fragment_1) 3607 3608 fragments_buffer.seek(offset_2) 3609 fragments_buffer.write(fragment_2) 3610 3611 fragments_buffer.seek(offset_3) 3612 fragments_buffer.write(fragment_3) 3613 3614 fragments_buffer.seek(offset_4) 3615 fragments_buffer.write(fragment_4) 3616 3617 fragments_buffer.seek(offset_5) 3618 fragments_buffer.write(fragment_5) 3619 3620 fragments_buffer.seek(offset_6) 3621 fragments_buffer.write(fragment_6) 3622 3623 # THEN 3624 self.assertEqual( 3625 bytearray([ 3626 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11, 3627 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 0x20, 0x21, 0x22, 3628 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2a 3629 ]), fragments_buffer.read()) 3630 3631 3632class TestLowpanFragmentsBuffersManager(unittest.TestCase): 3633 3634 def test_should_raise_ValueError_when_get_fragments_buffer_method_called_with_invalid_dgram_size(self): 3635 # GIVEN 3636 message_info = common.MessageInfo() 3637 message_info.source_mac_address = any_mac_address() 3638 message_info.destination_mac_address = any_mac_address() 3639 3640 negative_int = -random.randint(1, 1280) 3641 3642 manager = lowpan.LowpanFragmentsBuffersManager() 3643 3644 # THEN 3645 self.assertRaises(ValueError, manager.get_fragments_buffer, message_info, any_datagram_tag(), None) 3646 self.assertRaises(ValueError, manager.get_fragments_buffer, message_info, any_datagram_tag(), negative_int) 3647 3648 def test_should_return_LowpanFragmentsBuffer_when_get_fragments_buffer_method_called_with_valid_dgram_size(self): 3649 # GIVEN 3650 message_info = common.MessageInfo() 3651 message_info.source_mac_address = any_mac_address() 3652 message_info.destination_mac_address = any_mac_address() 3653 3654 datagram_size = any_datagram_size() 3655 3656 manager = lowpan.LowpanFragmentsBuffersManager() 3657 3658 # WHEN 3659 fragments_buffer = manager.get_fragments_buffer(message_info, any_datagram_tag(), datagram_size) 3660 3661 # THEN 3662 self.assertIsInstance(fragments_buffer, lowpan.LowpanFragmentsBuffer) 3663 self.assertEqual(datagram_size, len(fragments_buffer)) 3664 3665 3666if __name__ == "__main__": 3667 unittest.main(verbosity=1) 3668