1#!/usr/bin/python3 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo 18 19import binascii 20import errno 21import os 22import posix 23import random 24from socket import * # pylint: disable=wildcard-import 25import struct 26import sys 27import threading 28import time 29import unittest 30 31from scapy import all as scapy 32 33import csocket 34import multinetwork_base 35import net_test 36 37 38ICMP_ECHO = 8 39ICMP_ECHOREPLY = 0 40ICMPV6_ECHO_REQUEST = 128 41ICMPV6_ECHO_REPLY = 129 42IPV6_MIN_MTU = 1280 43ICMPV6_HEADER_LEN = 8 44ICMPV6_PKT_TOOBIG = 2 45 46 47class PingReplyThread(threading.Thread): 48 49 MIN_TTL = 10 50 INTERMEDIATE_IPV4 = "192.0.2.2" 51 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 52 NEIGHBOURS = ["fe80::1"] 53 LINK_MTU = 1300 54 55 def __init__(self, tun, mymac, routermac, routeraddr): 56 super(PingReplyThread, self).__init__() 57 self._tun = tun 58 self._started_flag = False 59 self._stopped_flag = False 60 self._mymac = mymac 61 self._routermac = routermac 62 self._routeraddr = routeraddr 63 64 def IsStarted(self): 65 return self._started_flag 66 67 def Stop(self): 68 self._stopped_flag = True 69 70 def ChecksumValid(self, packet): 71 # Get and clear the checksums. 72 def GetAndClearChecksum(layer): 73 if not layer: 74 return 75 try: 76 checksum = layer.chksum 77 del layer.chksum 78 except AttributeError: 79 checksum = layer.cksum 80 del layer.cksum 81 return checksum 82 83 def GetChecksum(layer): 84 try: 85 return layer.chksum 86 except AttributeError: 87 return layer.cksum 88 89 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 90 sums = {} 91 for name in layers: 92 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 93 94 # Serialize the packet, so scapy recalculates the checksums, and compare 95 # them with the ones in the packet. 96 packet = packet.__class__(bytes(packet)) 97 for name in layers: 98 layer = packet.getlayer(name) 99 if layer and GetChecksum(layer) != sums[name]: 100 return False 101 102 return True 103 104 def SendTimeExceeded(self, version, packet): 105 if version == 4: 106 src = packet.getlayer(scapy.IP).src 107 self.SendPacket( 108 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 109 scapy.ICMP(type=11, code=0) / 110 packet) 111 elif version == 6: 112 src = packet.getlayer(scapy.IPv6).src 113 self.SendPacket( 114 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 115 scapy.ICMPv6TimeExceeded(code=0) / 116 packet) 117 118 def SendPacketTooBig(self, packet): 119 src = packet.getlayer(scapy.IPv6).src 120 datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN 121 self.SendPacket( 122 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 123 scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) / 124 bytes(packet)[:datalen]) 125 126 def IPv4Packet(self, ip): 127 icmp = ip.getlayer(scapy.ICMP) 128 129 # We only support ping for now. 130 if (ip.proto != IPPROTO_ICMP or 131 icmp.type != ICMP_ECHO or 132 icmp.code != 0): 133 return 134 135 # Check the checksums. 136 if not self.ChecksumValid(ip): 137 return 138 139 if ip.ttl < self.MIN_TTL: 140 self.SendTimeExceeded(4, ip) 141 return 142 143 icmp.type = ICMP_ECHOREPLY 144 self.SwapAddresses(ip) 145 self.SendPacket(ip) 146 147 def IPv6Packet(self, ipv6): 148 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 149 150 # We only support ping for now. 151 if (ipv6.nh != IPPROTO_ICMPV6 or 152 not icmpv6 or 153 icmpv6.type != ICMPV6_ECHO_REQUEST or 154 icmpv6.code != 0): 155 return 156 157 # Check the checksums. 158 if not self.ChecksumValid(ipv6): 159 return 160 161 if ipv6.dst.startswith("ff02::"): 162 ipv6.dst = ipv6.src 163 for src in [self._routeraddr]: 164 ipv6.src = src 165 icmpv6.type = ICMPV6_ECHO_REPLY 166 self.SendPacket(ipv6) 167 elif ipv6.hlim < self.MIN_TTL: 168 self.SendTimeExceeded(6, ipv6) 169 elif ipv6.plen > self.LINK_MTU: 170 self.SendPacketTooBig(ipv6) 171 else: 172 icmpv6.type = ICMPV6_ECHO_REPLY 173 if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr: 174 return 175 self.SwapAddresses(ipv6) 176 self.SendPacket(ipv6) 177 178 def SwapAddresses(self, packet): 179 src = packet.src 180 packet.src = packet.dst 181 packet.dst = src 182 183 def SendPacket(self, packet): 184 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 185 try: 186 posix.write(self._tun.fileno(), bytes(packet)) 187 except Exception as e: 188 if not self._stopped_flag: 189 raise e 190 191 def run(self): 192 self._started_flag = True 193 while not self._stopped_flag: 194 try: 195 packet = posix.read(self._tun.fileno(), 4096) 196 except OSError as e: 197 if e.errno == errno.EAGAIN: 198 continue 199 else: 200 break 201 except ValueError as e: 202 if not self._stopped_flag: 203 raise e 204 205 ether = scapy.Ether(packet) 206 if ether.type == net_test.ETH_P_IPV6: 207 self.IPv6Packet(ether.payload) 208 elif ether.type == net_test.ETH_P_IP: 209 self.IPv4Packet(ether.payload) 210 211 212class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 213 214 @classmethod 215 def WaitForReplyThreads(cls): 216 # Wait 2s for the reply threads to start. If they don't, don't blow up, as 217 # that would cause tearDownClass not to be called and thus not clean up 218 # routing configuration, breaking subsequent tests. Instead, just let these 219 # tests fail. 220 _INTERVAL = 0.1 221 _ATTEMPTS = 20 222 for i in range(0, _ATTEMPTS): 223 for netid in cls.NETIDS: 224 if all(thread.IsStarted() for thread in list(cls.reply_threads.values())): 225 return 226 time.sleep(_INTERVAL) 227 msg = "WARNING: reply threads not all started after %.1f seconds\n" % ( 228 _ATTEMPTS * _INTERVAL) 229 sys.stderr.write(msg) 230 231 @classmethod 232 def StopReplyThreads(cls): 233 for thread in list(cls.reply_threads.values()): 234 thread.Stop() 235 236 @classmethod 237 def setUpClass(cls): 238 super(Ping6Test, cls).setUpClass() 239 cls.reply_threads = {} 240 for netid in cls.NETIDS: 241 cls.reply_threads[netid] = PingReplyThread( 242 cls.tuns[netid], 243 cls.MyMacAddress(netid), 244 cls.RouterMacAddress(netid), 245 cls._RouterAddress(netid, 6)) 246 cls.reply_threads[netid].start() 247 cls.WaitForReplyThreads() 248 cls.netid = random.choice(cls.NETIDS) 249 cls.SetDefaultNetwork(cls.netid) 250 251 @classmethod 252 def tearDownClass(cls): 253 cls.StopReplyThreads() 254 cls.ClearDefaultNetwork() 255 super(Ping6Test, cls).tearDownClass() 256 257 def setUp(self): 258 self.ifname = self.GetInterfaceName(self.netid) 259 self.ifindex = self.ifindices[self.netid] 260 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 261 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 262 263 def assertValidPingResponse(self, s, data): 264 family = s.family 265 266 # Receive the reply. 267 rcvd, src = s.recvfrom(32768) 268 self.assertNotEqual(0, len(rcvd), "No data received") 269 270 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 271 # as IPv4. 272 if src[0].startswith("::ffff:"): 273 family = AF_INET 274 src = (src[0].replace("::ffff:", ""), src[1:]) 275 276 # Check the data being sent is valid. 277 self.assertGreater(len(data), 7, "Not enough data for ping packet") 278 if family == AF_INET: 279 self.assertTrue(data.startswith(b"\x08\x00"), "Not an IPv4 echo request") 280 elif family == AF_INET6: 281 self.assertTrue(data.startswith(b"\x80\x00"), "Not an IPv6 echo request") 282 else: 283 self.fail("Unknown socket address family %d" * s.family) 284 285 # Check address, ICMP type, and ICMP code. 286 if family == AF_INET: 287 addr, unused_port = src 288 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 289 self.assertTrue(rcvd.startswith(b"\x00\x00"), "Not an IPv4 echo reply") 290 else: 291 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 292 self.assertGreaterEqual(len(addr), len("::")) 293 self.assertTrue(rcvd.startswith(b"\x81\x00"), "Not an IPv6 echo reply") 294 # Check that the flow label is zero and that the scope ID is sane. 295 self.assertEqual(flowlabel, 0) 296 if addr.startswith("fe80::"): 297 self.assertTrue(scope_id in list(self.ifindices.values())) 298 else: 299 self.assertEqual(0, scope_id) 300 301 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 302 # we don't have the IP addresses so we can't construct the pseudoheader. 303 304 # Check the sequence number and the data. 305 self.assertEqual(len(data), len(rcvd)) 306 self.assertEqual(binascii.hexlify(data[6:]), binascii.hexlify(rcvd[6:])) 307 308 @staticmethod 309 def IsAlmostEqual(expected, actual, delta): 310 return abs(expected - actual) < delta 311 312 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 313 txmem=0, rxmem=0): 314 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 315 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 316 "%02X" % state, 317 "%08X:%08X" % (txmem, rxmem), 318 str(os.getuid()), "ref", "0"] 319 for actual in self.ReadProcNetSocket(name): 320 # Check that rxmem and txmem don't differ too much from each other. 321 actual_txmem, actual_rxmem = expected[3].split(":") 322 if self.IsAlmostEqual(txmem, int(actual_txmem, 16), txmem / 4): 323 return 324 if self.IsAlmostEqual(rxmem, int(actual_rxmem, 16), rxmem / 4): 325 return 326 327 # Check all the parameters except rxmem and txmem. 328 expected[3] = actual[3] 329 # also do not check ref, it's always 2 on older kernels, but 1 for 'raw6' on 6.0+ 330 expected[5] = actual[5] 331 if expected == actual: 332 return 333 334 self.fail("Cound not find socket matching %s" % expected) 335 336 def testIPv4SendWithNoConnection(self): 337 s = net_test.IPv4PingSocket() 338 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 339 s.close() 340 341 def testIPv6SendWithNoConnection(self): 342 s = net_test.IPv6PingSocket() 343 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 344 s.close() 345 346 def testIPv4LoopbackPingWithConnect(self): 347 s = net_test.IPv4PingSocket() 348 s.connect(("127.0.0.1", 55)) 349 data = net_test.IPV4_PING + b"foobarbaz" 350 s.send(data) 351 self.assertValidPingResponse(s, data) 352 s.close() 353 354 def testIPv6LoopbackPingWithConnect(self): 355 s = net_test.IPv6PingSocket() 356 s.connect(("::1", 55)) 357 s.send(net_test.IPV6_PING) 358 self.assertValidPingResponse(s, net_test.IPV6_PING) 359 s.close() 360 361 def testIPv4PingUsingSendto(self): 362 s = net_test.IPv4PingSocket() 363 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 364 self.assertEqual(len(net_test.IPV4_PING), written) 365 self.assertValidPingResponse(s, net_test.IPV4_PING) 366 s.close() 367 368 def testIPv6PingUsingSendto(self): 369 s = net_test.IPv6PingSocket() 370 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 371 self.assertEqual(len(net_test.IPV6_PING), written) 372 self.assertValidPingResponse(s, net_test.IPV6_PING) 373 s.close() 374 375 def testIPv4NoCrash(self): 376 # Python 2.x does not provide either read() or recvmsg. 377 s = net_test.IPv4PingSocket() 378 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 379 self.assertEqual(len(net_test.IPV4_PING), written) 380 fd = s.fileno() 381 reply = posix.read(fd, 4096) 382 self.assertEqual(written, len(reply)) 383 s.close() 384 385 def testIPv6NoCrash(self): 386 # Python 2.x does not provide either read() or recvmsg. 387 s = net_test.IPv6PingSocket() 388 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 389 self.assertEqual(len(net_test.IPV6_PING), written) 390 fd = s.fileno() 391 reply = posix.read(fd, 4096) 392 self.assertEqual(written, len(reply)) 393 s.close() 394 395 def testCrossProtocolCrash(self): 396 # Checks that an ICMP error containing a ping packet that matches the ID 397 # of a socket of the wrong protocol (which can happen when using 464xlat) 398 # doesn't crash the kernel. 399 400 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 401 # because IPv4 packets sent by scapy.send() on loopback are not received by 402 # the kernel. So we don't actually use this function yet. 403 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 404 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 405 scapy.ICMP(type=3, code=0) / 406 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 407 scapy.ICMP(type=8, id=port, seq=1)) 408 409 def GetIPv6Unreachable(port): 410 return (scapy.IPv6(src="::1", dst="::1") / 411 scapy.ICMPv6DestUnreach() / 412 scapy.IPv6(src="::1", dst="::1") / 413 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 414 415 # An unreachable matching the ID of a socket of the wrong protocol 416 # shouldn't crash. 417 s = net_test.IPv4PingSocket() 418 s.connect(("127.0.0.1", 12345)) 419 _, port = s.getsockname() 420 scapy.send(GetIPv6Unreachable(port), verbose=False) 421 # No crash? Good. 422 s.close() 423 424 def testCrossProtocolCalls(self): 425 """Tests that passing in the wrong family returns EAFNOSUPPORT. 426 427 Relevant kernel commits: 428 upstream net: 429 91a0b60 net/ping: handle protocol mismatching scenario 430 9145736d net: ping: Return EAFNOSUPPORT when appropriate. 431 432 android-3.10: 433 78a6809 net/ping: handle protocol mismatching scenario 434 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate. 435 """ 436 437 def CheckEAFNoSupport(function, *args): 438 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 439 440 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 441 442 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 443 # IPv4 socket address structures, we need to pass down a socket address 444 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 445 # will fail immediately with EINVAL because the passed-in socket length is 446 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 447 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 448 ipv4sockaddr = csocket.SockaddrIn6( 449 ipv4sockaddr.Pack() + 450 b"\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 451 452 s4 = net_test.IPv4PingSocket() 453 s6 = net_test.IPv6PingSocket() 454 455 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 456 # address family, because the Python implementation will just pass garbage 457 # down to the kernel. So call the C functions directly. 458 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 459 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 460 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 461 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 462 CheckEAFNoSupport(csocket.Sendmsg, 463 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 464 CheckEAFNoSupport(csocket.Sendmsg, 465 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 466 s4.close() 467 s6.close() 468 469 def testIPv4Bind(self): 470 # Bind to unspecified address. 471 s = net_test.IPv4PingSocket() 472 s.bind(("0.0.0.0", 544)) 473 self.assertEqual(("0.0.0.0", 544), s.getsockname()) 474 s.close() 475 476 # Bind to loopback. 477 s = net_test.IPv4PingSocket() 478 s.bind(("127.0.0.1", 99)) 479 self.assertEqual(("127.0.0.1", 99), s.getsockname()) 480 481 # Binding twice is not allowed. 482 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 483 s.close() 484 485 # But binding two different sockets to the same ID is allowed. 486 s2 = net_test.IPv4PingSocket() 487 s2.bind(("127.0.0.1", 99)) 488 self.assertEqual(("127.0.0.1", 99), s2.getsockname()) 489 s3 = net_test.IPv4PingSocket() 490 s3.bind(("127.0.0.1", 99)) 491 self.assertEqual(("127.0.0.1", 99), s3.getsockname()) 492 s2.close() 493 s3.close() 494 495 # If two sockets bind to the same port, the first one to call read() gets 496 # the response. 497 s4 = net_test.IPv4PingSocket() 498 s5 = net_test.IPv4PingSocket() 499 s4.bind(("0.0.0.0", 167)) 500 s5.bind(("0.0.0.0", 167)) 501 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 502 self.assertValidPingResponse(s5, net_test.IPV4_PING) 503 csocket.SetSocketTimeout(s4, 100) 504 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 505 506 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 507 s6 = net_test.IPv4PingSocket() 508 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 509 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 510 511 s4.close() 512 s5.close() 513 s6.close() 514 515 # Can't bind after sendto. 516 s = net_test.IPv4PingSocket() 517 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 518 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 519 s.close() 520 521 def testIPv6Bind(self): 522 # Bind to unspecified address. 523 s = net_test.IPv6PingSocket() 524 s.bind(("::", 769)) 525 self.assertEqual(("::", 769, 0, 0), s.getsockname()) 526 s.close() 527 528 # Bind to loopback. 529 s = net_test.IPv6PingSocket() 530 s.bind(("::1", 99)) 531 self.assertEqual(("::1", 99, 0, 0), s.getsockname()) 532 533 # Binding twice is not allowed. 534 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 535 s.close() 536 537 # But binding two different sockets to the same ID is allowed. 538 s2 = net_test.IPv6PingSocket() 539 s2.bind(("::1", 99)) 540 self.assertEqual(("::1", 99, 0, 0), s2.getsockname()) 541 s3 = net_test.IPv6PingSocket() 542 s3.bind(("::1", 99)) 543 self.assertEqual(("::1", 99, 0, 0), s3.getsockname()) 544 s2.close() 545 s3.close() 546 547 # Binding both IPv4 and IPv6 to the same socket works. 548 s4 = net_test.IPv4PingSocket() 549 s6 = net_test.IPv6PingSocket() 550 s4.bind(("0.0.0.0", 444)) 551 s6.bind(("::", 666, 0, 0)) 552 s4.close() 553 s6.close() 554 555 # Can't bind after sendto. 556 s = net_test.IPv6PingSocket() 557 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 558 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 559 s.close() 560 561 def testIPv4InvalidBind(self): 562 s = net_test.IPv4PingSocket() 563 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 564 s.bind, ("255.255.255.255", 1026)) 565 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 566 s.bind, ("224.0.0.1", 651)) 567 # Binding to an address we don't have only works with IP_TRANSPARENT. 568 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 569 s.bind, (net_test.IPV4_ADDR, 651)) 570 try: 571 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 572 s.bind((net_test.IPV4_ADDR, 651)) 573 except IOError as e: 574 if e.errno == errno.EACCES: 575 pass # We're not root. let it go for now. 576 s.close() 577 578 def testIPv6InvalidBind(self): 579 s = net_test.IPv6PingSocket() 580 self.assertRaisesErrno(errno.EINVAL, 581 s.bind, ("ff02::2", 1026)) 582 583 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 584 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 585 s.bind, (net_test.IPV6_ADDR, 651)) 586 try: 587 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 588 s.bind((net_test.IPV6_ADDR, 651)) 589 except IOError as e: 590 if e.errno == errno.EACCES: 591 pass # We're not root. let it go for now. 592 s.close() 593 594 def testAfUnspecBind(self): 595 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 596 s4 = net_test.IPv4PingSocket() 597 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 598 sockaddr.family = AF_UNSPEC 599 csocket.Bind(s4, sockaddr) 600 self.assertEqual(("0.0.0.0", 12996), s4.getsockname()) 601 602 # But not if the address is anything else. 603 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 604 sockaddr.family = AF_UNSPEC 605 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 606 s4.close() 607 608 # This doesn't work for IPv6. 609 s6 = net_test.IPv6PingSocket() 610 sockaddr = csocket.Sockaddr(("::1", 58997)) 611 sockaddr.family = AF_UNSPEC 612 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 613 s6.close() 614 615 def testIPv6ScopedBind(self): 616 # Can't bind to a link-local address without a scope ID. 617 s = net_test.IPv6PingSocket() 618 self.assertRaisesErrno(errno.EINVAL, 619 s.bind, (self.lladdr, 1026, 0, 0)) 620 621 # Binding to a link-local address with a scope ID works, and the scope ID is 622 # returned by a subsequent getsockname. On Python 2, getsockname returns 623 # "fe80:1%foo". Strip it off, since the ifindex field in the return value is 624 # what matters. 625 s.bind((self.lladdr, 4646, 0, self.ifindex)) 626 sockname = s.getsockname() 627 expected = self.lladdr 628 if "%" in sockname[0]: 629 expected += "%" + self.ifname 630 self.assertEqual((expected, 4646, 0, self.ifindex), sockname) 631 632 # Of course, for the above to work the address actually has to be configured 633 # on the machine. 634 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 635 s.bind, ("fe80::f00", 1026, 0, 1)) 636 s.close() 637 638 # Scope IDs on non-link-local addresses are silently ignored. 639 s = net_test.IPv6PingSocket() 640 s.bind(("::1", 1234, 0, 1)) 641 self.assertEqual(("::1", 1234, 0, 0), s.getsockname()) 642 s.close() 643 644 def testBindAffectsIdentifier(self): 645 s = net_test.IPv6PingSocket() 646 s.bind((self.globaladdr, 0xf976)) 647 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 648 self.assertEqual(b"\xf9\x76", s.recv(32768)[4:6]) 649 s.close() 650 651 s = net_test.IPv6PingSocket() 652 s.bind((self.globaladdr, 0xace)) 653 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 654 self.assertEqual(b"\x0a\xce", s.recv(32768)[4:6]) 655 s.close() 656 657 def testLinkLocalAddress(self): 658 s = net_test.IPv6PingSocket() 659 # Sending to a link-local address with no scope fails with EINVAL. 660 self.assertRaisesErrno(errno.EINVAL, 661 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 662 # Sending to link-local address with a scope succeeds. Note that Python 663 # doesn't understand the "fe80::1%lo" format, even though it returns it. 664 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 665 # No exceptions? Good. 666 s.close() 667 668 def testLinkLocalOif(self): 669 """Checks that ping to link-local addresses works correctly. 670 671 Relevant kernel commits: 672 upstream net: 673 5e45789 net: ipv6: Fix ping to link-local addresses. 674 """ 675 for mode in ["oif", "ucast_oif", None]: 676 s = net_test.IPv6PingSocket() 677 for netid in self.NETIDS: 678 s2 = net_test.IPv6PingSocket() 679 dst = self._RouterAddress(netid, 6) 680 self.assertTrue(dst.startswith("fe80:")) 681 682 if mode: 683 self.SelectInterface(s, netid, mode) 684 self.SelectInterface(s2, netid, mode) 685 scopeid = 0 686 else: 687 scopeid = self.ifindices[netid] 688 689 if mode == "oif": 690 # If SO_BINDTODEVICE has been set, any attempt to send on another 691 # interface returns EINVAL. 692 othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1) 693 % len(self.NETIDS)] 694 otherscopeid = self.ifindices[othernetid] 695 self.assertRaisesErrno( 696 errno.EINVAL, 697 s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid)) 698 self.assertRaisesErrno( 699 errno.EINVAL, 700 s.connect, (dst, 55, 0, otherscopeid)) 701 702 # Try using both sendto and connect/send. 703 # If we get a reply, we sent the packet out on the right interface. 704 s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid)) 705 self.assertValidPingResponse(s, net_test.IPV6_PING) 706 707 # IPV6_UNICAST_IF doesn't work on connected sockets. 708 if mode != "ucast_oif": 709 s2.connect((dst, 123, 0, scopeid)) 710 s2.send(net_test.IPV6_PING) 711 self.assertValidPingResponse(s2, net_test.IPV6_PING) 712 s2.close() 713 s.close() 714 715 def testMappedAddressFails(self): 716 s = net_test.IPv6PingSocket() 717 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 718 self.assertValidPingResponse(s, net_test.IPV6_PING) 719 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 720 self.assertValidPingResponse(s, net_test.IPV6_PING) 721 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 722 ("::ffff:192.0.2.1", 55)) 723 s.close() 724 725 @unittest.skipUnless(False, "skipping: does not work yet") 726 def testFlowLabel(self): 727 s = net_test.IPv6PingSocket() 728 729 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 730 # the flow label in the packet is not set. 731 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 732 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 733 734 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 735 # that is not registered with the flow manager should return EINVAL... 736 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 737 # ... but this doesn't work yet. 738 if False: 739 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 740 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 741 742 # After registering the flow label, it gets sent properly, appears in the 743 # output packet, and is returned in the response. 744 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 745 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 746 net_test.IPV6_FLOWINFO_SEND)) 747 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 748 _, src = s.recvfrom(32768) 749 _, _, flowlabel, _ = src 750 self.assertEqual(0xdead, flowlabel & 0xfffff) 751 s.close() 752 753 def testIPv4Error(self): 754 s = net_test.IPv4PingSocket() 755 s.setsockopt(SOL_IP, IP_TTL, 2) 756 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 757 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 758 # We can't check the actual error because Python 2.7 doesn't implement 759 # recvmsg, but we can at least check that the socket returns an error. 760 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 761 s.close() 762 763 def testIPv6Error(self): 764 s = net_test.IPv6PingSocket() 765 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 766 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 767 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 768 # We can't check the actual error because Python 2.7 doesn't implement 769 # recvmsg, but we can at least check that the socket returns an error. 770 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 771 s.close() 772 773 def testIPv6MulticastPing(self): 774 s = net_test.IPv6PingSocket() 775 # Send a multicast ping and check we get at least one duplicate. 776 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 777 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 778 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 779 self.assertValidPingResponse(s, net_test.IPV6_PING) 780 self.assertValidPingResponse(s, net_test.IPV6_PING) 781 s.close() 782 783 def testIPv4LargePacket(self): 784 s = net_test.IPv4PingSocket() 785 data = net_test.IPV4_PING + 20000 * b"a" 786 s.sendto(data, ("127.0.0.1", 987)) 787 self.assertValidPingResponse(s, data) 788 s.close() 789 790 def testIPv6LargePacket(self): 791 s = net_test.IPv6PingSocket() 792 s.bind(("::", 0xace)) 793 data = net_test.IPV6_PING + b"\x01" + 19994 * b"\x00" + b"aaaaa" 794 s.sendto(data, ("::1", 953)) 795 s.close() 796 797 def testIcmpSocketsNotInIcmp6(self): 798 numrows = len(self.ReadProcNetSocket("icmp")) 799 numrows6 = len(self.ReadProcNetSocket("icmp6")) 800 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 801 s.bind(("127.0.0.1", 0xace)) 802 s.connect(("127.0.0.1", 0xbeef)) 803 self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 804 self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) 805 s.close() 806 807 def testIcmp6SocketsNotInIcmp(self): 808 numrows = len(self.ReadProcNetSocket("icmp")) 809 numrows6 = len(self.ReadProcNetSocket("icmp6")) 810 s = net_test.IPv6PingSocket() 811 s.bind(("::1", 0xace)) 812 s.connect(("::1", 0xbeef)) 813 self.assertEqual(numrows, len(self.ReadProcNetSocket("icmp"))) 814 self.assertEqual(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 815 s.close() 816 817 def testProcNetIcmp(self): 818 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 819 s.bind(("127.0.0.1", 0xace)) 820 s.connect(("127.0.0.1", 0xbeef)) 821 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 822 s.close() 823 824 def testProcNetIcmp6(self): 825 numrows6 = len(self.ReadProcNetSocket("icmp6")) 826 s = net_test.IPv6PingSocket() 827 s.bind(("::1", 0xace)) 828 s.connect(("::1", 0xbeef)) 829 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 830 831 # Check the row goes away when the socket is closed. 832 s.close() 833 self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) 834 835 # Try send, bind and connect to check the addresses and the state. 836 s = net_test.IPv6PingSocket() 837 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 838 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 839 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 840 s.close() 841 842 # Can't bind after sendto, apparently. 843 s = net_test.IPv6PingSocket() 844 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 845 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 846 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 847 848 # Check receive bytes. 849 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 850 s.connect(("ff02::1", 0xdead)) 851 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 852 s.send(net_test.IPV6_PING) 853 s.recvfrom(32768, MSG_PEEK) # Wait until the receive thread replies. 854 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 855 txmem=0, rxmem=0x300) 856 self.assertValidPingResponse(s, net_test.IPV6_PING) 857 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 858 txmem=0, rxmem=0) 859 s.close() 860 861 def testProcNetUdp6(self): 862 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 863 s.bind(("::1", 0xace)) 864 s.connect(("::1", 0xbeef)) 865 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 866 s.close() 867 868 def testProcNetRaw6(self): 869 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 870 s.bind(("::1", 0xace)) 871 s.connect(("::1", 0xbeef)) 872 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 873 s.close() 874 875 def testIPv6MTU(self): 876 """Tests IPV6_RECVERR and path MTU discovery on ping sockets. 877 878 Relevant kernel commits: 879 upstream net-next: 880 dcb94b8 ipv6: fix endianness error in icmpv6_err 881 """ 882 s = net_test.IPv6PingSocket() 883 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1) 884 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2) 885 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 886 s.connect((net_test.IPV6_ADDR, 55)) 887 pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * b"a" 888 s.send(pkt) 889 self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768) 890 data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE) 891 892 # Compare the offending packet with the one we sent. To do this we need to 893 # calculate the ident of the packet we sent and blank out the checksum of 894 # the one we received. 895 ident = struct.pack("!H", s.getsockname()[1]) 896 pkt = pkt[:4] + ident + pkt[6:] 897 data = data[:2] + b"\x00\x00" + pkt[4:] 898 self.assertEqual(pkt, data) 899 900 # Check the address that the packet was sent to. 901 self.assertEqual(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) 902 903 # Check the cmsg data, including the link MTU. 904 mtu = PingReplyThread.LINK_MTU 905 src = self.reply_threads[self.netid].INTERMEDIATE_IPV6 906 msglist = [ 907 (net_test.SOL_IPV6, net_test.IPV6_RECVERR, 908 (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6, 909 ICMPV6_PKT_TOOBIG, 0, mtu, 0)), 910 csocket.Sockaddr((src, 0)))) 911 ] 912 913 self.assertEqual(msglist, cmsg) 914 s.close() 915 916 917if __name__ == "__main__": 918 unittest.main() 919