1#!/usr/bin/python 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo 18 19import errno 20import os 21import posix 22import random 23import re 24from socket import * # pylint: disable=wildcard-import 25import threading 26import time 27import unittest 28 29from scapy import all as scapy 30 31import csocket 32import multinetwork_base 33import net_test 34 35 36HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6") 37 38ICMP_ECHO = 8 39ICMP_ECHOREPLY = 0 40ICMPV6_ECHO_REQUEST = 128 41ICMPV6_ECHO_REPLY = 129 42 43 44class PingReplyThread(threading.Thread): 45 46 MIN_TTL = 10 47 INTERMEDIATE_IPV4 = "192.0.2.2" 48 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 49 NEIGHBOURS = ["fe80::1"] 50 51 def __init__(self, tun, mymac, routermac): 52 super(PingReplyThread, self).__init__() 53 self._tun = tun 54 self._stopped = False 55 self._mymac = mymac 56 self._routermac = routermac 57 58 def Stop(self): 59 self._stopped = True 60 61 def ChecksumValid(self, packet): 62 # Get and clear the checksums. 63 def GetAndClearChecksum(layer): 64 if not layer: 65 return 66 try: 67 checksum = layer.chksum 68 del layer.chksum 69 except AttributeError: 70 checksum = layer.cksum 71 del layer.cksum 72 return checksum 73 74 def GetChecksum(layer): 75 try: 76 return layer.chksum 77 except AttributeError: 78 return layer.cksum 79 80 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 81 sums = {} 82 for name in layers: 83 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 84 85 # Serialize the packet, so scapy recalculates the checksums, and compare 86 # them with the ones in the packet. 87 packet = packet.__class__(str(packet)) 88 for name in layers: 89 layer = packet.getlayer(name) 90 if layer and GetChecksum(layer) != sums[name]: 91 return False 92 93 return True 94 95 def SendTimeExceeded(self, version, packet): 96 if version == 4: 97 src = packet.getlayer(scapy.IP).src 98 self.SendPacket( 99 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 100 scapy.ICMP(type=11, code=0) / 101 packet) 102 elif version == 6: 103 src = packet.getlayer(scapy.IPv6).src 104 self.SendPacket( 105 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 106 scapy.ICMPv6TimeExceeded(code=0) / 107 packet) 108 109 def IPv4Packet(self, ip): 110 icmp = ip.getlayer(scapy.ICMP) 111 112 # We only support ping for now. 113 if (ip.proto != IPPROTO_ICMP or 114 icmp.type != ICMP_ECHO or 115 icmp.code != 0): 116 return 117 118 # Check the checksums. 119 if not self.ChecksumValid(ip): 120 return 121 122 if ip.ttl < self.MIN_TTL: 123 self.SendTimeExceeded(4, ip) 124 return 125 126 icmp.type = ICMP_ECHOREPLY 127 self.SwapAddresses(ip) 128 self.SendPacket(ip) 129 130 def IPv6Packet(self, ipv6): 131 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 132 133 # We only support ping for now. 134 if (ipv6.nh != IPPROTO_ICMPV6 or 135 not icmpv6 or 136 icmpv6.type != ICMPV6_ECHO_REQUEST or 137 icmpv6.code != 0): 138 return 139 140 # Check the checksums. 141 if not self.ChecksumValid(ipv6): 142 return 143 144 if ipv6.dst.startswith("ff02::"): 145 ipv6.dst = ipv6.src 146 for src in self.NEIGHBOURS: 147 ipv6.src = src 148 icmpv6.type = ICMPV6_ECHO_REPLY 149 self.SendPacket(ipv6) 150 elif ipv6.hlim < self.MIN_TTL: 151 self.SendTimeExceeded(6, ipv6) 152 else: 153 icmpv6.type = ICMPV6_ECHO_REPLY 154 self.SwapAddresses(ipv6) 155 self.SendPacket(ipv6) 156 157 def SwapAddresses(self, packet): 158 src = packet.src 159 packet.src = packet.dst 160 packet.dst = src 161 162 def SendPacket(self, packet): 163 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 164 try: 165 posix.write(self._tun.fileno(), str(packet)) 166 except ValueError: 167 pass 168 169 def run(self): 170 while not self._stopped: 171 172 try: 173 packet = posix.read(self._tun.fileno(), 4096) 174 except OSError, e: 175 if e.errno == errno.EAGAIN: 176 continue 177 else: 178 break 179 180 ether = scapy.Ether(packet) 181 if ether.type == net_test.ETH_P_IPV6: 182 self.IPv6Packet(ether.payload) 183 elif ether.type == net_test.ETH_P_IP: 184 self.IPv4Packet(ether.payload) 185 186 187class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 188 189 @classmethod 190 def setUpClass(cls): 191 super(Ping6Test, cls).setUpClass() 192 cls.netid = random.choice(cls.NETIDS) 193 cls.reply_thread = PingReplyThread( 194 cls.tuns[cls.netid], 195 cls.MyMacAddress(cls.netid), 196 cls.RouterMacAddress(cls.netid)) 197 cls.SetDefaultNetwork(cls.netid) 198 cls.reply_thread.start() 199 200 @classmethod 201 def tearDownClass(cls): 202 cls.reply_thread.Stop() 203 cls.ClearDefaultNetwork() 204 super(Ping6Test, cls).tearDownClass() 205 206 def setUp(self): 207 self.ifname = self.GetInterfaceName(self.netid) 208 self.ifindex = self.ifindices[self.netid] 209 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 210 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 211 212 def assertValidPingResponse(self, s, data): 213 family = s.family 214 215 # Receive the reply. 216 rcvd, src = s.recvfrom(32768) 217 self.assertNotEqual(0, len(rcvd), "No data received") 218 219 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 220 # as IPv4. 221 if src[0].startswith("::ffff:"): 222 family = AF_INET 223 src = (src[0].replace("::ffff:", ""), src[1:]) 224 225 # Check the data being sent is valid. 226 self.assertGreater(len(data), 7, "Not enough data for ping packet") 227 if family == AF_INET: 228 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request") 229 elif family == AF_INET6: 230 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request") 231 else: 232 self.fail("Unknown socket address family %d" * s.family) 233 234 # Check address, ICMP type, and ICMP code. 235 if family == AF_INET: 236 addr, unused_port = src 237 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 238 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply") 239 else: 240 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 241 self.assertGreaterEqual(len(addr), len("::")) 242 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply") 243 # Check that the flow label is zero and that the scope ID is sane. 244 self.assertEqual(flowlabel, 0) 245 if addr.startswith("fe80::"): 246 self.assertTrue(scope_id in self.ifindices.values()) 247 else: 248 self.assertEquals(0, scope_id) 249 250 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 251 # we don't have the IP addresses so we can't construct the pseudoheader. 252 253 # Check the sequence number and the data. 254 self.assertEqual(len(data), len(rcvd)) 255 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) 256 257 def ReadProcNetSocket(self, protocol): 258 # Read file. 259 lines = open("/proc/net/%s" % protocol).readlines() 260 261 # Possibly check, and strip, header. 262 if protocol in ["icmp6", "raw6", "udp6"]: 263 self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0]) 264 lines = lines[1:] 265 266 # Check contents. 267 if protocol.endswith("6"): 268 addrlen = 32 269 else: 270 addrlen = 8 271 regexp = re.compile(r" *(\d+): " # bucket 272 "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port 273 "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port 274 "([0-9A-F][0-9A-F]) " # state 275 "([0-9A-F]{8}:[0-9A-F]{8}) " # mem 276 "([0-9A-F]{2}:[0-9A-F]{8}) " # ? 277 "([0-9A-F]{8}) +" # ? 278 "([0-9]+) +" # uid 279 "([0-9]+) +" # ? 280 "([0-9]+) +" # inode 281 "([0-9]+) +" # refcnt 282 "([0-9a-f]+) +" # sp 283 "([0-9]+) *$" # drops, icmp has spaces 284 % (addrlen, addrlen)) 285 # Return a list of lists with only source / dest addresses for now. 286 out = [] 287 for line in lines: 288 (_, src, dst, state, mem, 289 _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups() 290 out.append([src, dst, state, mem, uid, refcnt, drops]) 291 return out 292 293 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 294 txmem=0, rxmem=0): 295 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 296 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 297 "%02X" % state, 298 "%08X:%08X" % (txmem, rxmem), 299 str(os.getuid()), "2", "0"] 300 actual = self.ReadProcNetSocket(name)[-1] 301 self.assertListEqual(expected, actual) 302 303 def testIPv4SendWithNoConnection(self): 304 s = net_test.IPv4PingSocket() 305 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 306 307 def testIPv6SendWithNoConnection(self): 308 s = net_test.IPv6PingSocket() 309 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 310 311 def testIPv4LoopbackPingWithConnect(self): 312 s = net_test.IPv4PingSocket() 313 s.connect(("127.0.0.1", 55)) 314 data = net_test.IPV4_PING + "foobarbaz" 315 s.send(data) 316 self.assertValidPingResponse(s, data) 317 318 def testIPv6LoopbackPingWithConnect(self): 319 s = net_test.IPv6PingSocket() 320 s.connect(("::1", 55)) 321 s.send(net_test.IPV6_PING) 322 self.assertValidPingResponse(s, net_test.IPV6_PING) 323 324 def testIPv4PingUsingSendto(self): 325 s = net_test.IPv4PingSocket() 326 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 327 self.assertEquals(len(net_test.IPV4_PING), written) 328 self.assertValidPingResponse(s, net_test.IPV4_PING) 329 330 def testIPv6PingUsingSendto(self): 331 s = net_test.IPv6PingSocket() 332 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 333 self.assertEquals(len(net_test.IPV6_PING), written) 334 self.assertValidPingResponse(s, net_test.IPV6_PING) 335 336 def testIPv4NoCrash(self): 337 # Python 2.x does not provide either read() or recvmsg. 338 s = net_test.IPv4PingSocket() 339 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 340 self.assertEquals(len(net_test.IPV4_PING), written) 341 fd = s.fileno() 342 reply = posix.read(fd, 4096) 343 self.assertEquals(written, len(reply)) 344 345 def testIPv6NoCrash(self): 346 # Python 2.x does not provide either read() or recvmsg. 347 s = net_test.IPv6PingSocket() 348 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 349 self.assertEquals(len(net_test.IPV6_PING), written) 350 fd = s.fileno() 351 reply = posix.read(fd, 4096) 352 self.assertEquals(written, len(reply)) 353 354 def testCrossProtocolCrash(self): 355 # Checks that an ICMP error containing a ping packet that matches the ID 356 # of a socket of the wrong protocol (which can happen when using 464xlat) 357 # doesn't crash the kernel. 358 359 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 360 # because IPv4 packets sent by scapy.send() on loopback are not received by 361 # the kernel. So we don't actually use this function yet. 362 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 363 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 364 scapy.ICMP(type=3, code=0) / 365 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 366 scapy.ICMP(type=8, id=port, seq=1)) 367 368 def GetIPv6Unreachable(port): 369 return (scapy.IPv6(src="::1", dst="::1") / 370 scapy.ICMPv6DestUnreach() / 371 scapy.IPv6(src="::1", dst="::1") / 372 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 373 374 # An unreachable matching the ID of a socket of the wrong protocol 375 # shouldn't crash. 376 s = net_test.IPv4PingSocket() 377 s.connect(("127.0.0.1", 12345)) 378 _, port = s.getsockname() 379 scapy.send(GetIPv6Unreachable(port)) 380 # No crash? Good. 381 382 def testCrossProtocolCalls(self): 383 """Tests that passing in the wrong family returns EAFNOSUPPORT.""" 384 385 def CheckEAFNoSupport(function, *args): 386 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 387 388 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 389 390 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 391 # IPv4 socket address structures, we need to pass down a socket address 392 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 393 # will fail immediately with EINVAL because the passed-in socket length is 394 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 395 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 396 ipv4sockaddr = csocket.SockaddrIn6( 397 ipv4sockaddr.Pack() + 398 "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 399 400 s4 = net_test.IPv4PingSocket() 401 s6 = net_test.IPv6PingSocket() 402 403 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 404 # address family, because the Python implementation will just pass garbage 405 # down to the kernel. So call the C functions directly. 406 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 407 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 408 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 409 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 410 CheckEAFNoSupport(csocket.Sendmsg, 411 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 412 CheckEAFNoSupport(csocket.Sendmsg, 413 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 414 415 def testIPv4Bind(self): 416 # Bind to unspecified address. 417 s = net_test.IPv4PingSocket() 418 s.bind(("0.0.0.0", 544)) 419 self.assertEquals(("0.0.0.0", 544), s.getsockname()) 420 421 # Bind to loopback. 422 s = net_test.IPv4PingSocket() 423 s.bind(("127.0.0.1", 99)) 424 self.assertEquals(("127.0.0.1", 99), s.getsockname()) 425 426 # Binding twice is not allowed. 427 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 428 429 # But binding two different sockets to the same ID is allowed. 430 s2 = net_test.IPv4PingSocket() 431 s2.bind(("127.0.0.1", 99)) 432 self.assertEquals(("127.0.0.1", 99), s2.getsockname()) 433 s3 = net_test.IPv4PingSocket() 434 s3.bind(("127.0.0.1", 99)) 435 self.assertEquals(("127.0.0.1", 99), s3.getsockname()) 436 437 # If two sockets bind to the same port, the first one to call read() gets 438 # the response. 439 s4 = net_test.IPv4PingSocket() 440 s5 = net_test.IPv4PingSocket() 441 s4.bind(("0.0.0.0", 167)) 442 s5.bind(("0.0.0.0", 167)) 443 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 444 self.assertValidPingResponse(s5, net_test.IPV4_PING) 445 net_test.SetSocketTimeout(s4, 100) 446 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 447 448 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 449 s6 = net_test.IPv4PingSocket() 450 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 451 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 452 453 # Can't bind after sendto. 454 s = net_test.IPv4PingSocket() 455 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 456 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 457 458 def testIPv6Bind(self): 459 # Bind to unspecified address. 460 s = net_test.IPv6PingSocket() 461 s.bind(("::", 769)) 462 self.assertEquals(("::", 769, 0, 0), s.getsockname()) 463 464 # Bind to loopback. 465 s = net_test.IPv6PingSocket() 466 s.bind(("::1", 99)) 467 self.assertEquals(("::1", 99, 0, 0), s.getsockname()) 468 469 # Binding twice is not allowed. 470 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 471 472 # But binding two different sockets to the same ID is allowed. 473 s2 = net_test.IPv6PingSocket() 474 s2.bind(("::1", 99)) 475 self.assertEquals(("::1", 99, 0, 0), s2.getsockname()) 476 s3 = net_test.IPv6PingSocket() 477 s3.bind(("::1", 99)) 478 self.assertEquals(("::1", 99, 0, 0), s3.getsockname()) 479 480 # Binding both IPv4 and IPv6 to the same socket works. 481 s4 = net_test.IPv4PingSocket() 482 s6 = net_test.IPv6PingSocket() 483 s4.bind(("0.0.0.0", 444)) 484 s6.bind(("::", 666, 0, 0)) 485 486 # Can't bind after sendto. 487 s = net_test.IPv6PingSocket() 488 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 489 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 490 491 def testIPv4InvalidBind(self): 492 s = net_test.IPv4PingSocket() 493 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 494 s.bind, ("255.255.255.255", 1026)) 495 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 496 s.bind, ("224.0.0.1", 651)) 497 # Binding to an address we don't have only works with IP_TRANSPARENT. 498 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 499 s.bind, (net_test.IPV4_ADDR, 651)) 500 try: 501 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 502 s.bind((net_test.IPV4_ADDR, 651)) 503 except IOError, e: 504 if e.errno == errno.EACCES: 505 pass # We're not root. let it go for now. 506 507 def testIPv6InvalidBind(self): 508 s = net_test.IPv6PingSocket() 509 self.assertRaisesErrno(errno.EINVAL, 510 s.bind, ("ff02::2", 1026)) 511 512 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 513 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 514 s.bind, (net_test.IPV6_ADDR, 651)) 515 try: 516 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 517 s.bind((net_test.IPV6_ADDR, 651)) 518 except IOError, e: 519 if e.errno == errno.EACCES: 520 pass # We're not root. let it go for now. 521 522 def testAfUnspecBind(self): 523 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 524 s4 = net_test.IPv4PingSocket() 525 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 526 sockaddr.family = AF_UNSPEC 527 csocket.Bind(s4, sockaddr) 528 self.assertEquals(("0.0.0.0", 12996), s4.getsockname()) 529 530 # But not if the address is anything else. 531 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 532 sockaddr.family = AF_UNSPEC 533 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 534 535 # This doesn't work for IPv6. 536 s6 = net_test.IPv6PingSocket() 537 sockaddr = csocket.Sockaddr(("::1", 58997)) 538 sockaddr.family = AF_UNSPEC 539 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 540 541 def testIPv6ScopedBind(self): 542 # Can't bind to a link-local address without a scope ID. 543 s = net_test.IPv6PingSocket() 544 self.assertRaisesErrno(errno.EINVAL, 545 s.bind, (self.lladdr, 1026, 0, 0)) 546 547 # Binding to a link-local address with a scope ID works, and the scope ID is 548 # returned by a subsequent getsockname. Interestingly, Python's getsockname 549 # returns "fe80:1%foo", even though it does not understand it. 550 expected = self.lladdr + "%" + self.ifname 551 s.bind((self.lladdr, 4646, 0, self.ifindex)) 552 self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname()) 553 554 # Of course, for the above to work the address actually has to be configured 555 # on the machine. 556 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 557 s.bind, ("fe80::f00", 1026, 0, 1)) 558 559 # Scope IDs on non-link-local addresses are silently ignored. 560 s = net_test.IPv6PingSocket() 561 s.bind(("::1", 1234, 0, 1)) 562 self.assertEquals(("::1", 1234, 0, 0), s.getsockname()) 563 564 def testBindAffectsIdentifier(self): 565 s = net_test.IPv6PingSocket() 566 s.bind((self.globaladdr, 0xf976)) 567 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 568 self.assertEquals("\xf9\x76", s.recv(32768)[4:6]) 569 570 s = net_test.IPv6PingSocket() 571 s.bind((self.globaladdr, 0xace)) 572 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 573 self.assertEquals("\x0a\xce", s.recv(32768)[4:6]) 574 575 def testLinkLocalAddress(self): 576 s = net_test.IPv6PingSocket() 577 # Sending to a link-local address with no scope fails with EINVAL. 578 self.assertRaisesErrno(errno.EINVAL, 579 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 580 # Sending to link-local address with a scope succeeds. Note that Python 581 # doesn't understand the "fe80::1%lo" format, even though it returns it. 582 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 583 # No exceptions? Good. 584 585 def testMappedAddressFails(self): 586 s = net_test.IPv6PingSocket() 587 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 588 self.assertValidPingResponse(s, net_test.IPV6_PING) 589 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 590 self.assertValidPingResponse(s, net_test.IPV6_PING) 591 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 592 ("::ffff:192.0.2.1", 55)) 593 594 @unittest.skipUnless(False, "skipping: does not work yet") 595 def testFlowLabel(self): 596 s = net_test.IPv6PingSocket() 597 598 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 599 # the flow label in the packet is not set. 600 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 601 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 602 603 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 604 # that is not registered with the flow manager should return EINVAL... 605 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 606 # ... but this doesn't work yet. 607 if False: 608 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 609 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 610 611 # After registering the flow label, it gets sent properly, appears in the 612 # output packet, and is returned in the response. 613 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 614 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 615 net_test.IPV6_FLOWINFO_SEND)) 616 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 617 _, src = s.recvfrom(32768) 618 _, _, flowlabel, _ = src 619 self.assertEqual(0xdead, flowlabel & 0xfffff) 620 621 def testIPv4Error(self): 622 s = net_test.IPv4PingSocket() 623 s.setsockopt(SOL_IP, IP_TTL, 2) 624 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 625 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 626 # We can't check the actual error because Python 2.7 doesn't implement 627 # recvmsg, but we can at least check that the socket returns an error. 628 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 629 630 def testIPv6Error(self): 631 s = net_test.IPv6PingSocket() 632 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 633 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 634 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 635 # We can't check the actual error because Python 2.7 doesn't implement 636 # recvmsg, but we can at least check that the socket returns an error. 637 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 638 639 def testIPv6MulticastPing(self): 640 s = net_test.IPv6PingSocket() 641 # Send a multicast ping and check we get at least one duplicate. 642 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 643 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 644 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 645 self.assertValidPingResponse(s, net_test.IPV6_PING) 646 self.assertValidPingResponse(s, net_test.IPV6_PING) 647 648 def testIPv4LargePacket(self): 649 s = net_test.IPv4PingSocket() 650 data = net_test.IPV4_PING + 20000 * "a" 651 s.sendto(data, ("127.0.0.1", 987)) 652 self.assertValidPingResponse(s, data) 653 654 def testIPv6LargePacket(self): 655 s = net_test.IPv6PingSocket() 656 s.bind(("::", 0xace)) 657 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa" 658 s.sendto(data, ("::1", 953)) 659 660 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 661 def testIcmpSocketsNotInIcmp6(self): 662 numrows = len(self.ReadProcNetSocket("icmp")) 663 numrows6 = len(self.ReadProcNetSocket("icmp6")) 664 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 665 s.bind(("127.0.0.1", 0xace)) 666 s.connect(("127.0.0.1", 0xbeef)) 667 self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 668 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 669 670 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 671 def testIcmp6SocketsNotInIcmp(self): 672 numrows = len(self.ReadProcNetSocket("icmp")) 673 numrows6 = len(self.ReadProcNetSocket("icmp6")) 674 s = net_test.IPv6PingSocket() 675 s.bind(("::1", 0xace)) 676 s.connect(("::1", 0xbeef)) 677 self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp"))) 678 self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 679 680 def testProcNetIcmp(self): 681 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 682 s.bind(("127.0.0.1", 0xace)) 683 s.connect(("127.0.0.1", 0xbeef)) 684 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 685 686 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 687 def testProcNetIcmp6(self): 688 numrows6 = len(self.ReadProcNetSocket("icmp6")) 689 s = net_test.IPv6PingSocket() 690 s.bind(("::1", 0xace)) 691 s.connect(("::1", 0xbeef)) 692 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 693 694 # Check the row goes away when the socket is closed. 695 s.close() 696 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 697 698 # Try send, bind and connect to check the addresses and the state. 699 s = net_test.IPv6PingSocket() 700 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 701 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 702 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 703 704 # Can't bind after sendto, apparently. 705 s = net_test.IPv6PingSocket() 706 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 707 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 708 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 709 710 # Check receive bytes. 711 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 712 s.connect(("ff02::1", 0xdead)) 713 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 714 s.send(net_test.IPV6_PING) 715 time.sleep(0.01) # Give the other thread time to reply. 716 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 717 txmem=0, rxmem=0x300) 718 self.assertValidPingResponse(s, net_test.IPV6_PING) 719 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 720 txmem=0, rxmem=0) 721 722 def testProcNetUdp6(self): 723 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 724 s.bind(("::1", 0xace)) 725 s.connect(("::1", 0xbeef)) 726 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 727 728 def testProcNetRaw6(self): 729 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 730 s.bind(("::1", 0xace)) 731 s.connect(("::1", 0xbeef)) 732 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 733 734 735if __name__ == "__main__": 736 unittest.main() 737