#!/usr/bin/python # # Copyright 2014 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import errno import random from socket import * # pylint: disable=wildcard-import import time import unittest from scapy import all as scapy import csocket import iproute import multinetwork_base import packets import net_test # Setsockopt values. IPV6_ADDR_PREFERENCES = 72 IPV6_PREFER_SRC_PUBLIC = 0x0002 # The retrans timer is also the DAD timeout. We set this to a value that's not # so short that DAD will complete before we attempt to use the network, but # short enough that we don't have to wait too long for DAD to complete. RETRANS_TIMER = 150 class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): """Test for IPv6 source address selection. Relevant kernel commits: upstream net-next: 7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates c58da4c net: ipv6: allow explicitly choosing optimistic addresses 9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface. c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr(). c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr(). 3985e8a ipv6: sysctl to restrict candidate source addresses android-3.10: 2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic 0633924 ipv6: sysctl to restrict candidate source addresses """ def SetIPv6Sysctl(self, ifname, sysctl, value): self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value) def SetDAD(self, ifname, value): self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value) self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value) def SetOptimisticDAD(self, ifname, value): self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value) def SetUseTempaddrs(self, ifname, value): self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value) def SetUseOptimistic(self, ifname, value): self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value) def SetForwarding(self, value): self.SetSysctl("/proc/sys/net/ipv6/conf/all/forwarding", value) def GetSourceIP(self, netid, mode="mark"): s = self.BuildSocket(6, net_test.UDPSocket, netid, mode) # Because why not...testing for temporary addresses is a separate thing. s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC) s.connect((net_test.IPV6_ADDR, 123)) src_addr = s.getsockname()[0] self.assertTrue(src_addr) return src_addr def assertAddressNotPresent(self, address): self.assertRaises(IOError, self.iproute.GetAddress, address) def assertAddressHasExpectedAttributes( self, address, expected_ifindex, expected_flags): ifa_msg = self.iproute.GetAddress(address)[0] self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) self.assertEqual(64, ifa_msg.prefixlen) self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) self.assertEqual(expected_ifindex, ifa_msg.index) self.assertEqual(expected_flags, ifa_msg.flags & expected_flags) def AddressIsTentative(self, address): ifa_msg = self.iproute.GetAddress(address)[0] return ifa_msg.flags & iproute.IFA_F_TENTATIVE def BindToAddress(self, address): s = net_test.UDPSocket(AF_INET6) s.bind((address, 0, 0, 0)) def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR): pktinfo = multinetwork_base.MakePktInfo(6, address, 0) cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)] s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark") return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0) def assertAddressUsable(self, address, netid): self.BindToAddress(address) self.SendWithSourceAddress(address, netid) # No exceptions? Good. def assertAddressNotUsable(self, address, netid): self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address) self.assertRaisesErrno(errno.EINVAL, self.SendWithSourceAddress, address, netid) def assertAddressSelected(self, address, netid): self.assertEqual(address, self.GetSourceIP(netid)) def assertAddressNotSelected(self, address, netid): self.assertNotEqual(address, self.GetSourceIP(netid)) def WaitForDad(self, address): for _ in range(20): if not self.AddressIsTentative(address): return time.sleep(0.1) raise AssertionError("%s did not complete DAD after 2 seconds") class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest): def setUp(self): # [0] Make sure DAD, optimistic DAD, and the use_optimistic option # are all consistently disabled at the outset. for netid in self.tuns: ifname = self.GetInterfaceName(netid) self.SetDAD(ifname, 0) self.SetOptimisticDAD(ifname, 0) self.SetUseTempaddrs(ifname, 0) self.SetUseOptimistic(ifname, 0) self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0) # [1] Pick an interface on which to test. self.test_netid = random.choice(list(self.tuns.keys())) self.test_ip = self.MyAddress(6, self.test_netid) self.test_ifindex = self.ifindices[self.test_netid] self.test_ifname = self.GetInterfaceName(self.test_netid) self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True) # [2] Delete the test interface's IPv6 address. self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex) self.assertAddressNotPresent(self.test_ip) self.assertAddressNotUsable(self.test_ip, self.test_netid) # Verify that the link-local address is not tentative. # Even though we disable DAD above, without this change occasionally the # test fails. This might be due to us disabling DAD only after the # link-local address is generated. self.WaitForDad(self.test_lladdr) # Disable forwarding, because optimistic addresses don't work when # forwarding is on. Forwarding will be re-enabled when the sysctls are # restored by MultiNetworkBaseTest.tearDownClass. # TODO: Fix this and remove this hack. self.SetForwarding("0") class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest): def testRfc6724Behaviour(self): # [3] Get an IPv6 address back, in DAD start-up. self.SetDAD(self.test_ifname, 1) # Enable DAD # Send a RA to start SLAAC and subsequent DAD. self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) # Get flags and prove tentative-ness. self.assertAddressHasExpectedAttributes( self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE) # Even though the interface has an IPv6 address, its tentative nature # prevents it from being selected. self.assertAddressNotUsable(self.test_ip, self.test_netid) self.assertAddressNotSelected(self.test_ip, self.test_netid) # Busy wait for DAD to complete (should be less than 1 second). self.WaitForDad(self.test_ip) # The test_ip should have completed DAD by now, and should be the # chosen source address, eligible to bind to, etc. self.assertAddressUsable(self.test_ip, self.test_netid) self.assertAddressSelected(self.test_ip, self.test_netid) class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest): def testRfc6724Behaviour(self): # [3] Get an IPv6 address back, in optimistic DAD start-up. self.SetDAD(self.test_ifname, 1) # Enable DAD self.SetOptimisticDAD(self.test_ifname, 1) # Send a RA to start SLAAC and subsequent DAD. self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) # Get flags and prove optimism. self.assertAddressHasExpectedAttributes( self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) # Optimistic addresses are usable but are not selected. if net_test.LINUX_VERSION >= (3, 18, 0): # The version checked in to android kernels <= 3.10 requires the # use_optimistic sysctl to be turned on. self.assertAddressUsable(self.test_ip, self.test_netid) self.assertAddressNotSelected(self.test_ip, self.test_netid) # Busy wait for DAD to complete (should be less than 1 second). self.WaitForDad(self.test_ip) # The test_ip should have completed DAD by now, and should be the # chosen source address. self.assertAddressUsable(self.test_ip, self.test_netid) self.assertAddressSelected(self.test_ip, self.test_netid) class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest): def testModifiedRfc6724Behaviour(self): # [3] Get an IPv6 address back, in optimistic DAD start-up. self.SetDAD(self.test_ifname, 1) # Enable DAD self.SetOptimisticDAD(self.test_ifname, 1) self.SetUseOptimistic(self.test_ifname, 1) # Send a RA to start SLAAC and subsequent DAD. self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) # Get flags and prove optimistism. self.assertAddressHasExpectedAttributes( self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) # The interface has an IPv6 address and, despite its optimistic nature, # the use_optimistic option allows it to be selected. self.assertAddressUsable(self.test_ip, self.test_netid) self.assertAddressSelected(self.test_ip, self.test_netid) class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest): def testModifiedRfc6724Behaviour(self): # [3] Add a valid IPv6 address to this interface and verify it is # selected as the source address. preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe" self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex) self.assertAddressHasExpectedAttributes( preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT) self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid)) # [4] Get another IPv6 address, in optimistic DAD start-up. self.SetDAD(self.test_ifname, 1) # Enable DAD self.SetOptimisticDAD(self.test_ifname, 1) self.SetUseOptimistic(self.test_ifname, 1) # Send a RA to start SLAAC and subsequent DAD. self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) # Get flags and prove optimism. self.assertAddressHasExpectedAttributes( self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) # Since the interface has another IPv6 address, the optimistic address # is not selected--the other, valid address is chosen. self.assertAddressUsable(self.test_ip, self.test_netid) self.assertAddressNotSelected(self.test_ip, self.test_netid) self.assertAddressSelected(preferred_ip, self.test_netid) class DadFailureTest(MultiInterfaceSourceAddressSelectionTest): def testDadFailure(self): # [3] Get an IPv6 address back, in optimistic DAD start-up. self.SetDAD(self.test_ifname, 1) # Enable DAD self.SetOptimisticDAD(self.test_ifname, 1) self.SetUseOptimistic(self.test_ifname, 1) # Send a RA to start SLAAC and subsequent DAD. self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) # Prove optimism and usability. self.assertAddressHasExpectedAttributes( self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) self.assertAddressUsable(self.test_ip, self.test_netid) self.assertAddressSelected(self.test_ip, self.test_netid) # Send a NA for the optimistic address, indicating address conflict # ("DAD defense"). conflict_macaddr = "02:00:0b:ad:d0:0d" dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") / scapy.IPv6(src=self.test_ip, dst="ff02::1") / scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) / scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr)) self.ReceiveEtherPacketOn(self.test_netid, dad_defense) self.WaitForDad(self.test_lladdr) # The address should have failed DAD, and therefore no longer be usable. self.assertAddressNotUsable(self.test_ip, self.test_netid) self.assertAddressNotSelected(self.test_ip, self.test_netid) # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address. class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest): def testSendToOnlinkDestination(self): # [3] Get an IPv6 address back, in optimistic DAD start-up. self.SetDAD(self.test_ifname, 1) # Enable DAD self.SetOptimisticDAD(self.test_ifname, 1) self.SetUseOptimistic(self.test_ifname, 1) # Send a RA to start SLAAC and subsequent DAD. self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) # Prove optimism and usability. self.assertAddressHasExpectedAttributes( self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) self.assertAddressUsable(self.test_ip, self.test_netid) self.assertAddressSelected(self.test_ip, self.test_netid) # [4] Send to an on-link destination and observe a Neighbor Solicitation # packet with a source address that is NOT the optimistic address. # In this setup, the only usable address is the link-local address. onlink_dest = self.GetRandomDestination( self.OnlinkPrefix(6, self.test_netid)) self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest) if net_test.LINUX_VERSION >= (3, 18, 0): # Older versions will actually choose the optimistic address to # originate Neighbor Solications (RFC violation). expected_ns = packets.NS( self.test_lladdr, onlink_dest, self.MyMacAddress(self.test_netid))[1] self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns) # TODO(ek): add tests listening for netlink events. class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): def testChoosesNonInterfaceSourceAddress(self): self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0) src_ip = self.GetSourceIP(self.test_netid) self.assertFalse(src_ip in [self.test_ip, self.test_lladdr]) self.assertTrue(src_ip in [self.MyAddress(6, netid) for netid in self.tuns if netid != self.test_netid]) class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): def testChoosesOnlyInterfaceSourceAddress(self): self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1) # self.test_ifname does not have a global IPv6 address, so the only # candidate is the existing link-local address. self.assertAddressSelected(self.test_lladdr, self.test_netid) if __name__ == "__main__": unittest.main()