1#!/usr/bin/python 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo 18 19import errno 20import os 21import posix 22import random 23from socket import * # pylint: disable=wildcard-import 24import struct 25import sys 26import threading 27import time 28import unittest 29 30from scapy import all as scapy 31 32import csocket 33import multinetwork_base 34import net_test 35 36 37HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6") 38 39ICMP_ECHO = 8 40ICMP_ECHOREPLY = 0 41ICMPV6_ECHO_REQUEST = 128 42ICMPV6_ECHO_REPLY = 129 43IPV6_MIN_MTU = 1280 44ICMPV6_HEADER_LEN = 8 45ICMPV6_PKT_TOOBIG = 2 46 47 48class PingReplyThread(threading.Thread): 49 50 MIN_TTL = 10 51 INTERMEDIATE_IPV4 = "192.0.2.2" 52 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 53 NEIGHBOURS = ["fe80::1"] 54 LINK_MTU = 1300 55 56 def __init__(self, tun, mymac, routermac, routeraddr): 57 super(PingReplyThread, self).__init__() 58 self._tun = tun 59 self._started = False 60 self._stopped = False 61 self._mymac = mymac 62 self._routermac = routermac 63 self._routeraddr = routeraddr 64 65 def IsStarted(self): 66 return self._started 67 68 def Stop(self): 69 self._stopped = True 70 71 def ChecksumValid(self, packet): 72 # Get and clear the checksums. 73 def GetAndClearChecksum(layer): 74 if not layer: 75 return 76 try: 77 checksum = layer.chksum 78 del layer.chksum 79 except AttributeError: 80 checksum = layer.cksum 81 del layer.cksum 82 return checksum 83 84 def GetChecksum(layer): 85 try: 86 return layer.chksum 87 except AttributeError: 88 return layer.cksum 89 90 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 91 sums = {} 92 for name in layers: 93 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 94 95 # Serialize the packet, so scapy recalculates the checksums, and compare 96 # them with the ones in the packet. 97 packet = packet.__class__(str(packet)) 98 for name in layers: 99 layer = packet.getlayer(name) 100 if layer and GetChecksum(layer) != sums[name]: 101 return False 102 103 return True 104 105 def SendTimeExceeded(self, version, packet): 106 if version == 4: 107 src = packet.getlayer(scapy.IP).src 108 self.SendPacket( 109 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 110 scapy.ICMP(type=11, code=0) / 111 packet) 112 elif version == 6: 113 src = packet.getlayer(scapy.IPv6).src 114 self.SendPacket( 115 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 116 scapy.ICMPv6TimeExceeded(code=0) / 117 packet) 118 119 def SendPacketTooBig(self, packet): 120 src = packet.getlayer(scapy.IPv6).src 121 datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN 122 self.SendPacket( 123 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 124 scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) / 125 str(packet)[:datalen]) 126 127 def IPv4Packet(self, ip): 128 icmp = ip.getlayer(scapy.ICMP) 129 130 # We only support ping for now. 131 if (ip.proto != IPPROTO_ICMP or 132 icmp.type != ICMP_ECHO or 133 icmp.code != 0): 134 return 135 136 # Check the checksums. 137 if not self.ChecksumValid(ip): 138 return 139 140 if ip.ttl < self.MIN_TTL: 141 self.SendTimeExceeded(4, ip) 142 return 143 144 icmp.type = ICMP_ECHOREPLY 145 self.SwapAddresses(ip) 146 self.SendPacket(ip) 147 148 def IPv6Packet(self, ipv6): 149 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 150 151 # We only support ping for now. 152 if (ipv6.nh != IPPROTO_ICMPV6 or 153 not icmpv6 or 154 icmpv6.type != ICMPV6_ECHO_REQUEST or 155 icmpv6.code != 0): 156 return 157 158 # Check the checksums. 159 if not self.ChecksumValid(ipv6): 160 return 161 162 if ipv6.dst.startswith("ff02::"): 163 ipv6.dst = ipv6.src 164 for src in [self._routeraddr]: 165 ipv6.src = src 166 icmpv6.type = ICMPV6_ECHO_REPLY 167 self.SendPacket(ipv6) 168 elif ipv6.hlim < self.MIN_TTL: 169 self.SendTimeExceeded(6, ipv6) 170 elif ipv6.plen > self.LINK_MTU: 171 self.SendPacketTooBig(ipv6) 172 else: 173 icmpv6.type = ICMPV6_ECHO_REPLY 174 if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr: 175 return 176 self.SwapAddresses(ipv6) 177 self.SendPacket(ipv6) 178 179 def SwapAddresses(self, packet): 180 src = packet.src 181 packet.src = packet.dst 182 packet.dst = src 183 184 def SendPacket(self, packet): 185 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 186 try: 187 posix.write(self._tun.fileno(), str(packet)) 188 except Exception as e: 189 if not self._stopped: 190 raise e 191 192 def run(self): 193 self._started = True 194 while not self._stopped: 195 try: 196 packet = posix.read(self._tun.fileno(), 4096) 197 except OSError as e: 198 if e.errno == errno.EAGAIN: 199 continue 200 else: 201 break 202 except ValueError as e: 203 if not self._stopped: 204 raise e 205 206 ether = scapy.Ether(packet) 207 if ether.type == net_test.ETH_P_IPV6: 208 self.IPv6Packet(ether.payload) 209 elif ether.type == net_test.ETH_P_IP: 210 self.IPv4Packet(ether.payload) 211 212 213class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 214 215 @classmethod 216 def WaitForReplyThreads(cls): 217 # Wait 2s for the reply threads to start. If they don't, don't blow up, as 218 # that would cause tearDownClass not to be called and thus not clean up 219 # routing configuration, breaking subsequent tests. Instead, just let these 220 # tests fail. 221 _INTERVAL = 0.1 222 _ATTEMPTS = 20 223 for i in range(0, _ATTEMPTS): 224 for netid in cls.NETIDS: 225 if all(thread.IsStarted() for thread in list(cls.reply_threads.values())): 226 return 227 time.sleep(_INTERVAL) 228 msg = "WARNING: reply threads not all started after %.1f seconds\n" % ( 229 _ATTEMPTS * _INTERVAL) 230 sys.stderr.write(msg) 231 232 @classmethod 233 def StopReplyThreads(cls): 234 for thread in list(cls.reply_threads.values()): 235 thread.Stop() 236 237 @classmethod 238 def setUpClass(cls): 239 super(Ping6Test, cls).setUpClass() 240 cls.reply_threads = {} 241 for netid in cls.NETIDS: 242 cls.reply_threads[netid] = PingReplyThread( 243 cls.tuns[netid], 244 cls.MyMacAddress(netid), 245 cls.RouterMacAddress(netid), 246 cls._RouterAddress(netid, 6)) 247 cls.reply_threads[netid].start() 248 cls.WaitForReplyThreads() 249 cls.netid = random.choice(cls.NETIDS) 250 cls.SetDefaultNetwork(cls.netid) 251 252 @classmethod 253 def tearDownClass(cls): 254 cls.StopReplyThreads() 255 cls.ClearDefaultNetwork() 256 super(Ping6Test, cls).tearDownClass() 257 258 def setUp(self): 259 self.ifname = self.GetInterfaceName(self.netid) 260 self.ifindex = self.ifindices[self.netid] 261 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 262 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 263 264 def assertValidPingResponse(self, s, data): 265 family = s.family 266 267 # Receive the reply. 268 rcvd, src = s.recvfrom(32768) 269 self.assertNotEqual(0, len(rcvd), "No data received") 270 271 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 272 # as IPv4. 273 if src[0].startswith("::ffff:"): 274 family = AF_INET 275 src = (src[0].replace("::ffff:", ""), src[1:]) 276 277 # Check the data being sent is valid. 278 self.assertGreater(len(data), 7, "Not enough data for ping packet") 279 if family == AF_INET: 280 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request") 281 elif family == AF_INET6: 282 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request") 283 else: 284 self.fail("Unknown socket address family %d" * s.family) 285 286 # Check address, ICMP type, and ICMP code. 287 if family == AF_INET: 288 addr, unused_port = src 289 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 290 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply") 291 else: 292 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 293 self.assertGreaterEqual(len(addr), len("::")) 294 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply") 295 # Check that the flow label is zero and that the scope ID is sane. 296 self.assertEqual(flowlabel, 0) 297 if addr.startswith("fe80::"): 298 self.assertTrue(scope_id in list(self.ifindices.values())) 299 else: 300 self.assertEqual(0, scope_id) 301 302 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 303 # we don't have the IP addresses so we can't construct the pseudoheader. 304 305 # Check the sequence number and the data. 306 self.assertEqual(len(data), len(rcvd)) 307 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) 308 309 @staticmethod 310 def IsAlmostEqual(expected, actual, delta): 311 return abs(expected - actual) < delta 312 313 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 314 txmem=0, rxmem=0): 315 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 316 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 317 "%02X" % state, 318 "%08X:%08X" % (txmem, rxmem), 319 str(os.getuid()), "2", "0"] 320 for actual in self.ReadProcNetSocket(name): 321 # Check that rxmem and txmem don't differ too much from each other. 322 actual_txmem, actual_rxmem = expected[3].split(":") 323 if self.IsAlmostEqual(txmem, int(actual_txmem, 16), txmem / 4): 324 return 325 if self.IsAlmostEqual(rxmem, int(actual_rxmem, 16), rxmem / 4): 326 return 327 328 # Check all the parameters except rxmem and txmem. 329 expected[3] = actual[3] 330 if expected == actual: 331 return 332 333 self.fail("Cound not find socket matching %s" % expected) 334 335 def testIPv4SendWithNoConnection(self): 336 s = net_test.IPv4PingSocket() 337 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 338 339 def testIPv6SendWithNoConnection(self): 340 s = net_test.IPv6PingSocket() 341 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 342 343 def testIPv4LoopbackPingWithConnect(self): 344 s = net_test.IPv4PingSocket() 345 s.connect(("127.0.0.1", 55)) 346 data = net_test.IPV4_PING + "foobarbaz" 347 s.send(data) 348 self.assertValidPingResponse(s, data) 349 350 def testIPv6LoopbackPingWithConnect(self): 351 s = net_test.IPv6PingSocket() 352 s.connect(("::1", 55)) 353 s.send(net_test.IPV6_PING) 354 self.assertValidPingResponse(s, net_test.IPV6_PING) 355 356 def testIPv4PingUsingSendto(self): 357 s = net_test.IPv4PingSocket() 358 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 359 self.assertEqual(len(net_test.IPV4_PING), written) 360 self.assertValidPingResponse(s, net_test.IPV4_PING) 361 362 def testIPv6PingUsingSendto(self): 363 s = net_test.IPv6PingSocket() 364 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 365 self.assertEqual(len(net_test.IPV6_PING), written) 366 self.assertValidPingResponse(s, net_test.IPV6_PING) 367 368 def testIPv4NoCrash(self): 369 # Python 2.x does not provide either read() or recvmsg. 370 s = net_test.IPv4PingSocket() 371 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 372 self.assertEqual(len(net_test.IPV4_PING), written) 373 fd = s.fileno() 374 reply = posix.read(fd, 4096) 375 self.assertEqual(written, len(reply)) 376 377 def testIPv6NoCrash(self): 378 # Python 2.x does not provide either read() or recvmsg. 379 s = net_test.IPv6PingSocket() 380 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 381 self.assertEqual(len(net_test.IPV6_PING), written) 382 fd = s.fileno() 383 reply = posix.read(fd, 4096) 384 self.assertEqual(written, len(reply)) 385 386 def testCrossProtocolCrash(self): 387 # Checks that an ICMP error containing a ping packet that matches the ID 388 # of a socket of the wrong protocol (which can happen when using 464xlat) 389 # doesn't crash the kernel. 390 391 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 392 # because IPv4 packets sent by scapy.send() on loopback are not received by 393 # the kernel. So we don't actually use this function yet. 394 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 395 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 396 scapy.ICMP(type=3, code=0) / 397 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 398 scapy.ICMP(type=8, id=port, seq=1)) 399 400 def GetIPv6Unreachable(port): 401 return (scapy.IPv6(src="::1", dst="::1") / 402 scapy.ICMPv6DestUnreach() / 403 scapy.IPv6(src="::1", dst="::1") / 404 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 405 406 # An unreachable matching the ID of a socket of the wrong protocol 407 # shouldn't crash. 408 s = net_test.IPv4PingSocket() 409 s.connect(("127.0.0.1", 12345)) 410 _, port = s.getsockname() 411 scapy.send(GetIPv6Unreachable(port), verbose=False) 412 # No crash? Good. 413 414 def testCrossProtocolCalls(self): 415 """Tests that passing in the wrong family returns EAFNOSUPPORT. 416 417 Relevant kernel commits: 418 upstream net: 419 91a0b60 net/ping: handle protocol mismatching scenario 420 9145736d net: ping: Return EAFNOSUPPORT when appropriate. 421 422 android-3.10: 423 78a6809 net/ping: handle protocol mismatching scenario 424 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate. 425 """ 426 427 def CheckEAFNoSupport(function, *args): 428 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 429 430 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 431 432 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 433 # IPv4 socket address structures, we need to pass down a socket address 434 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 435 # will fail immediately with EINVAL because the passed-in socket length is 436 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 437 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 438 ipv4sockaddr = csocket.SockaddrIn6( 439 ipv4sockaddr.Pack() + 440 "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 441 442 s4 = net_test.IPv4PingSocket() 443 s6 = net_test.IPv6PingSocket() 444 445 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 446 # address family, because the Python implementation will just pass garbage 447 # down to the kernel. So call the C functions directly. 448 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 449 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 450 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 451 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 452 CheckEAFNoSupport(csocket.Sendmsg, 453 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 454 CheckEAFNoSupport(csocket.Sendmsg, 455 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 456 457 def testIPv4Bind(self): 458 # Bind to unspecified address. 459 s = net_test.IPv4PingSocket() 460 s.bind(("0.0.0.0", 544)) 461 self.assertEqual(("0.0.0.0", 544), s.getsockname()) 462 463 # Bind to loopback. 464 s = net_test.IPv4PingSocket() 465 s.bind(("127.0.0.1", 99)) 466 self.assertEqual(("127.0.0.1", 99), s.getsockname()) 467 468 # Binding twice is not allowed. 469 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 470 471 # But binding two different sockets to the same ID is allowed. 472 s2 = net_test.IPv4PingSocket() 473 s2.bind(("127.0.0.1", 99)) 474 self.assertEqual(("127.0.0.1", 99), s2.getsockname()) 475 s3 = net_test.IPv4PingSocket() 476 s3.bind(("127.0.0.1", 99)) 477 self.assertEqual(("127.0.0.1", 99), s3.getsockname()) 478 479 # If two sockets bind to the same port, the first one to call read() gets 480 # the response. 481 s4 = net_test.IPv4PingSocket() 482 s5 = net_test.IPv4PingSocket() 483 s4.bind(("0.0.0.0", 167)) 484 s5.bind(("0.0.0.0", 167)) 485 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 486 self.assertValidPingResponse(s5, net_test.IPV4_PING) 487 csocket.SetSocketTimeout(s4, 100) 488 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 489 490 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 491 s6 = net_test.IPv4PingSocket() 492 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 493 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 494 495 # Can't bind after sendto. 496 s = net_test.IPv4PingSocket() 497 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 498 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 499 500 def testIPv6Bind(self): 501 # Bind to unspecified address. 502 s = net_test.IPv6PingSocket() 503 s.bind(("::", 769)) 504 self.assertEqual(("::", 769, 0, 0), s.getsockname()) 505 506 # Bind to loopback. 507 s = net_test.IPv6PingSocket() 508 s.bind(("::1", 99)) 509 self.assertEqual(("::1", 99, 0, 0), s.getsockname()) 510 511 # Binding twice is not allowed. 512 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 513 514 # But binding two different sockets to the same ID is allowed. 515 s2 = net_test.IPv6PingSocket() 516 s2.bind(("::1", 99)) 517 self.assertEqual(("::1", 99, 0, 0), s2.getsockname()) 518 s3 = net_test.IPv6PingSocket() 519 s3.bind(("::1", 99)) 520 self.assertEqual(("::1", 99, 0, 0), s3.getsockname()) 521 522 # Binding both IPv4 and IPv6 to the same socket works. 523 s4 = net_test.IPv4PingSocket() 524 s6 = net_test.IPv6PingSocket() 525 s4.bind(("0.0.0.0", 444)) 526 s6.bind(("::", 666, 0, 0)) 527 528 # Can't bind after sendto. 529 s = net_test.IPv6PingSocket() 530 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 531 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 532 533 def testIPv4InvalidBind(self): 534 s = net_test.IPv4PingSocket() 535 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 536 s.bind, ("255.255.255.255", 1026)) 537 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 538 s.bind, ("224.0.0.1", 651)) 539 # Binding to an address we don't have only works with IP_TRANSPARENT. 540 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 541 s.bind, (net_test.IPV4_ADDR, 651)) 542 try: 543 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 544 s.bind((net_test.IPV4_ADDR, 651)) 545 except IOError as e: 546 if e.errno == errno.EACCES: 547 pass # We're not root. let it go for now. 548 549 def testIPv6InvalidBind(self): 550 s = net_test.IPv6PingSocket() 551 self.assertRaisesErrno(errno.EINVAL, 552 s.bind, ("ff02::2", 1026)) 553 554 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 555 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 556 s.bind, (net_test.IPV6_ADDR, 651)) 557 try: 558 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 559 s.bind((net_test.IPV6_ADDR, 651)) 560 except IOError as e: 561 if e.errno == errno.EACCES: 562 pass # We're not root. let it go for now. 563 564 def testAfUnspecBind(self): 565 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 566 s4 = net_test.IPv4PingSocket() 567 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 568 sockaddr.family = AF_UNSPEC 569 csocket.Bind(s4, sockaddr) 570 self.assertEqual(("0.0.0.0", 12996), s4.getsockname()) 571 572 # But not if the address is anything else. 573 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 574 sockaddr.family = AF_UNSPEC 575 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 576 577 # This doesn't work for IPv6. 578 s6 = net_test.IPv6PingSocket() 579 sockaddr = csocket.Sockaddr(("::1", 58997)) 580 sockaddr.family = AF_UNSPEC 581 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 582 583 def testIPv6ScopedBind(self): 584 # Can't bind to a link-local address without a scope ID. 585 s = net_test.IPv6PingSocket() 586 self.assertRaisesErrno(errno.EINVAL, 587 s.bind, (self.lladdr, 1026, 0, 0)) 588 589 # Binding to a link-local address with a scope ID works, and the scope ID is 590 # returned by a subsequent getsockname. Interestingly, Python's getsockname 591 # returns "fe80:1%foo", even though it does not understand it. 592 expected = self.lladdr + "%" + self.ifname 593 s.bind((self.lladdr, 4646, 0, self.ifindex)) 594 self.assertEqual((expected, 4646, 0, self.ifindex), s.getsockname()) 595 596 # Of course, for the above to work the address actually has to be configured 597 # on the machine. 598 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 599 s.bind, ("fe80::f00", 1026, 0, 1)) 600 601 # Scope IDs on non-link-local addresses are silently ignored. 602 s = net_test.IPv6PingSocket() 603 s.bind(("::1", 1234, 0, 1)) 604 self.assertEqual(("::1", 1234, 0, 0), s.getsockname()) 605 606 def testBindAffectsIdentifier(self): 607 s = net_test.IPv6PingSocket() 608 s.bind((self.globaladdr, 0xf976)) 609 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 610 self.assertEqual("\xf9\x76", s.recv(32768)[4:6]) 611 612 s = net_test.IPv6PingSocket() 613 s.bind((self.globaladdr, 0xace)) 614 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 615 self.assertEqual("\x0a\xce", s.recv(32768)[4:6]) 616 617 def testLinkLocalAddress(self): 618 s = net_test.IPv6PingSocket() 619 # Sending to a link-local address with no scope fails with EINVAL. 620 self.assertRaisesErrno(errno.EINVAL, 621 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 622 # Sending to link-local address with a scope succeeds. Note that Python 623 # doesn't understand the "fe80::1%lo" format, even though it returns it. 624 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 625 # No exceptions? Good. 626 627 def testLinkLocalOif(self): 628 """Checks that ping to link-local addresses works correctly. 629 630 Relevant kernel commits: 631 upstream net: 632 5e45789 net: ipv6: Fix ping to link-local addresses. 633 """ 634 for mode in ["oif", "ucast_oif", None]: 635 s = net_test.IPv6PingSocket() 636 for netid in self.NETIDS: 637 s2 = net_test.IPv6PingSocket() 638 dst = self._RouterAddress(netid, 6) 639 self.assertTrue(dst.startswith("fe80:")) 640 641 if mode: 642 self.SelectInterface(s, netid, mode) 643 self.SelectInterface(s2, netid, mode) 644 scopeid = 0 645 else: 646 scopeid = self.ifindices[netid] 647 648 if mode == "oif": 649 # If SO_BINDTODEVICE has been set, any attempt to send on another 650 # interface returns EINVAL. 651 othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1) 652 % len(self.NETIDS)] 653 otherscopeid = self.ifindices[othernetid] 654 self.assertRaisesErrno( 655 errno.EINVAL, 656 s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid)) 657 self.assertRaisesErrno( 658 errno.EINVAL, 659 s.connect, (dst, 55, 0, otherscopeid)) 660 661 # Try using both sendto and connect/send. 662 # If we get a reply, we sent the packet out on the right interface. 663 s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid)) 664 self.assertValidPingResponse(s, net_test.IPV6_PING) 665 666 # IPV6_UNICAST_IF doesn't work on connected sockets. 667 if mode != "ucast_oif": 668 s2.connect((dst, 123, 0, scopeid)) 669 s2.send(net_test.IPV6_PING) 670 self.assertValidPingResponse(s2, net_test.IPV6_PING) 671 672 def testMappedAddressFails(self): 673 s = net_test.IPv6PingSocket() 674 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 675 self.assertValidPingResponse(s, net_test.IPV6_PING) 676 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 677 self.assertValidPingResponse(s, net_test.IPV6_PING) 678 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 679 ("::ffff:192.0.2.1", 55)) 680 681 @unittest.skipUnless(False, "skipping: does not work yet") 682 def testFlowLabel(self): 683 s = net_test.IPv6PingSocket() 684 685 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 686 # the flow label in the packet is not set. 687 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 688 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 689 690 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 691 # that is not registered with the flow manager should return EINVAL... 692 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 693 # ... but this doesn't work yet. 694 if False: 695 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 696 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 697 698 # After registering the flow label, it gets sent properly, appears in the 699 # output packet, and is returned in the response. 700 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 701 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 702 net_test.IPV6_FLOWINFO_SEND)) 703 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 704 _, src = s.recvfrom(32768) 705 _, _, flowlabel, _ = src 706 self.assertEqual(0xdead, flowlabel & 0xfffff) 707 708 def testIPv4Error(self): 709 s = net_test.IPv4PingSocket() 710 s.setsockopt(SOL_IP, IP_TTL, 2) 711 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 712 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 713 # We can't check the actual error because Python 2.7 doesn't implement 714 # recvmsg, but we can at least check that the socket returns an error. 715 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 716 717 def testIPv6Error(self): 718 s = net_test.IPv6PingSocket() 719 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 720 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 721 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 722 # We can't check the actual error because Python 2.7 doesn't implement 723 # recvmsg, but we can at least check that the socket returns an error. 724 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 725 726 def testIPv6MulticastPing(self): 727 s = net_test.IPv6PingSocket() 728 # Send a multicast ping and check we get at least one duplicate. 729 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 730 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 731 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 732 self.assertValidPingResponse(s, net_test.IPV6_PING) 733 self.assertValidPingResponse(s, net_test.IPV6_PING) 734 735 def testIPv4LargePacket(self): 736 s = net_test.IPv4PingSocket() 737 data = net_test.IPV4_PING + 20000 * "a" 738 s.sendto(data, ("127.0.0.1", 987)) 739 self.assertValidPingResponse(s, data) 740 741 def testIPv6LargePacket(self): 742 s = net_test.IPv6PingSocket() 743 s.bind(("::", 0xace)) 744 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa" 745 s.sendto(data, ("::1", 953)) 746 747 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 748 def testIcmpSocketsNotInIcmp6(self): 749 numrows = len(self.ReadProcNetSocket("icmp")) 750 numrows6 = len(self.ReadProcNetSocket("icmp6")) 751 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 752 s.bind(("127.0.0.1", 0xace)) 753 s.connect(("127.0.0.1", 0xbeef)) 754 self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 755 self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) 756 757 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 758 def testIcmp6SocketsNotInIcmp(self): 759 numrows = len(self.ReadProcNetSocket("icmp")) 760 numrows6 = len(self.ReadProcNetSocket("icmp6")) 761 s = net_test.IPv6PingSocket() 762 s.bind(("::1", 0xace)) 763 s.connect(("::1", 0xbeef)) 764 self.assertEqual(numrows, len(self.ReadProcNetSocket("icmp"))) 765 self.assertEqual(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 766 767 def testProcNetIcmp(self): 768 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 769 s.bind(("127.0.0.1", 0xace)) 770 s.connect(("127.0.0.1", 0xbeef)) 771 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 772 773 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 774 def testProcNetIcmp6(self): 775 numrows6 = len(self.ReadProcNetSocket("icmp6")) 776 s = net_test.IPv6PingSocket() 777 s.bind(("::1", 0xace)) 778 s.connect(("::1", 0xbeef)) 779 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 780 781 # Check the row goes away when the socket is closed. 782 s.close() 783 self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) 784 785 # Try send, bind and connect to check the addresses and the state. 786 s = net_test.IPv6PingSocket() 787 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 788 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 789 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 790 791 # Can't bind after sendto, apparently. 792 s = net_test.IPv6PingSocket() 793 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 794 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 795 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 796 797 # Check receive bytes. 798 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 799 s.connect(("ff02::1", 0xdead)) 800 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 801 s.send(net_test.IPV6_PING) 802 s.recvfrom(32768, MSG_PEEK) # Wait until the receive thread replies. 803 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 804 txmem=0, rxmem=0x300) 805 self.assertValidPingResponse(s, net_test.IPV6_PING) 806 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 807 txmem=0, rxmem=0) 808 809 def testProcNetUdp6(self): 810 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 811 s.bind(("::1", 0xace)) 812 s.connect(("::1", 0xbeef)) 813 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 814 815 def testProcNetRaw6(self): 816 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 817 s.bind(("::1", 0xace)) 818 s.connect(("::1", 0xbeef)) 819 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 820 821 def testIPv6MTU(self): 822 """Tests IPV6_RECVERR and path MTU discovery on ping sockets. 823 824 Relevant kernel commits: 825 upstream net-next: 826 dcb94b8 ipv6: fix endianness error in icmpv6_err 827 """ 828 s = net_test.IPv6PingSocket() 829 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1) 830 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2) 831 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 832 s.connect((net_test.IPV6_ADDR, 55)) 833 pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a" 834 s.send(pkt) 835 self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768) 836 data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE) 837 838 # Compare the offending packet with the one we sent. To do this we need to 839 # calculate the ident of the packet we sent and blank out the checksum of 840 # the one we received. 841 ident = struct.pack("!H", s.getsockname()[1]) 842 pkt = pkt[:4] + ident + pkt[6:] 843 data = data[:2] + "\x00\x00" + pkt[4:] 844 self.assertEqual(pkt, data) 845 846 # Check the address that the packet was sent to. 847 # ... except in 4.1, where it just returns an AF_UNSPEC, like this: 848 # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC, 849 # sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, 850 # msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}], 851 # msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...}, 852 # msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232 853 if net_test.LINUX_VERSION != (4, 1, 0): 854 self.assertEqual(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) 855 856 # Check the cmsg data, including the link MTU. 857 mtu = PingReplyThread.LINK_MTU 858 src = self.reply_threads[self.netid].INTERMEDIATE_IPV6 859 msglist = [ 860 (net_test.SOL_IPV6, net_test.IPV6_RECVERR, 861 (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6, 862 ICMPV6_PKT_TOOBIG, 0, mtu, 0)), 863 csocket.Sockaddr((src, 0)))) 864 ] 865 866 # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port. 867 # The fix might have been in 676d236, but we don't have that in 3.10 and it 868 # touches code all over the tree. Instead, just don't check the port. 869 if net_test.LINUX_VERSION <= (3, 14, 0): 870 msglist[0][2][1].port = cmsg[0][2][1].port 871 872 self.assertEqual(msglist, cmsg) 873 874 875if __name__ == "__main__": 876 unittest.main() 877