1#!/usr/bin/python 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import time 21import unittest 22 23from scapy import all as scapy 24 25import csocket 26import iproute 27import multinetwork_base 28import packets 29import net_test 30 31# Setsockopt values. 32IPV6_ADDR_PREFERENCES = 72 33IPV6_PREFER_SRC_PUBLIC = 0x0002 34 35# The retrans timer is also the DAD timeout. We set this to a value that's not 36# so short that DAD will complete before we attempt to use the network, but 37# short enough that we don't have to wait too long for DAD to complete. 38RETRANS_TIMER = 150 39 40 41class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): 42 """Test for IPv6 source address selection. 43 44 Relevant kernel commits: 45 upstream net-next: 46 7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 47 c58da4c net: ipv6: allow explicitly choosing optimistic addresses 48 9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface. 49 c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr(). 50 c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr(). 51 3985e8a ipv6: sysctl to restrict candidate source addresses 52 53 android-3.10: 54 2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 55 0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic 56 0633924 ipv6: sysctl to restrict candidate source addresses 57 """ 58 59 def SetIPv6Sysctl(self, ifname, sysctl, value): 60 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value) 61 62 def SetDAD(self, ifname, value): 63 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value) 64 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value) 65 66 def SetOptimisticDAD(self, ifname, value): 67 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value) 68 69 def SetUseTempaddrs(self, ifname, value): 70 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value) 71 72 def SetUseOptimistic(self, ifname, value): 73 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value) 74 75 def SetForwarding(self, value): 76 self.SetSysctl("/proc/sys/net/ipv6/conf/all/forwarding", value) 77 78 def GetSourceIP(self, netid, mode="mark"): 79 s = self.BuildSocket(6, net_test.UDPSocket, netid, mode) 80 # Because why not...testing for temporary addresses is a separate thing. 81 s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC) 82 83 s.connect((net_test.IPV6_ADDR, 123)) 84 src_addr = s.getsockname()[0] 85 self.assertTrue(src_addr) 86 return src_addr 87 88 def assertAddressNotPresent(self, address): 89 self.assertRaises(IOError, self.iproute.GetAddress, address) 90 91 def assertAddressHasExpectedAttributes( 92 self, address, expected_ifindex, expected_flags): 93 ifa_msg = self.iproute.GetAddress(address)[0] 94 self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) 95 self.assertEqual(64, ifa_msg.prefixlen) 96 self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) 97 self.assertEqual(expected_ifindex, ifa_msg.index) 98 self.assertEqual(expected_flags, ifa_msg.flags & expected_flags) 99 100 def AddressIsTentative(self, address): 101 ifa_msg = self.iproute.GetAddress(address)[0] 102 return ifa_msg.flags & iproute.IFA_F_TENTATIVE 103 104 def BindToAddress(self, address): 105 s = net_test.UDPSocket(AF_INET6) 106 s.bind((address, 0, 0, 0)) 107 108 def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR): 109 pktinfo = multinetwork_base.MakePktInfo(6, address, 0) 110 cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)] 111 s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark") 112 return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0) 113 114 def assertAddressUsable(self, address, netid): 115 self.BindToAddress(address) 116 self.SendWithSourceAddress(address, netid) 117 # No exceptions? Good. 118 119 def assertAddressNotUsable(self, address, netid): 120 self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address) 121 self.assertRaisesErrno(errno.EINVAL, 122 self.SendWithSourceAddress, address, netid) 123 124 def assertAddressSelected(self, address, netid): 125 self.assertEqual(address, self.GetSourceIP(netid)) 126 127 def assertAddressNotSelected(self, address, netid): 128 self.assertNotEqual(address, self.GetSourceIP(netid)) 129 130 def WaitForDad(self, address): 131 for _ in range(20): 132 if not self.AddressIsTentative(address): 133 return 134 time.sleep(0.1) 135 raise AssertionError("%s did not complete DAD after 2 seconds") 136 137 138class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest): 139 140 def setUp(self): 141 # [0] Make sure DAD, optimistic DAD, and the use_optimistic option 142 # are all consistently disabled at the outset. 143 for netid in self.tuns: 144 ifname = self.GetInterfaceName(netid) 145 self.SetDAD(ifname, 0) 146 self.SetOptimisticDAD(ifname, 0) 147 self.SetUseTempaddrs(ifname, 0) 148 self.SetUseOptimistic(ifname, 0) 149 self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0) 150 151 # [1] Pick an interface on which to test. 152 self.test_netid = random.choice(list(self.tuns.keys())) 153 self.test_ip = self.MyAddress(6, self.test_netid) 154 self.test_ifindex = self.ifindices[self.test_netid] 155 self.test_ifname = self.GetInterfaceName(self.test_netid) 156 self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True) 157 158 # [2] Delete the test interface's IPv6 address. 159 self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex) 160 self.assertAddressNotPresent(self.test_ip) 161 162 self.assertAddressNotUsable(self.test_ip, self.test_netid) 163 # Verify that the link-local address is not tentative. 164 # Even though we disable DAD above, without this change occasionally the 165 # test fails. This might be due to us disabling DAD only after the 166 # link-local address is generated. 167 self.WaitForDad(self.test_lladdr) 168 169 # Disable forwarding, because optimistic addresses don't work when 170 # forwarding is on. Forwarding will be re-enabled when the sysctls are 171 # restored by MultiNetworkBaseTest.tearDownClass. 172 # TODO: Fix this and remove this hack. 173 self.SetForwarding("0") 174 175 176class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest): 177 178 def testRfc6724Behaviour(self): 179 # [3] Get an IPv6 address back, in DAD start-up. 180 self.SetDAD(self.test_ifname, 1) # Enable DAD 181 # Send a RA to start SLAAC and subsequent DAD. 182 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 183 # Get flags and prove tentative-ness. 184 self.assertAddressHasExpectedAttributes( 185 self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE) 186 187 # Even though the interface has an IPv6 address, its tentative nature 188 # prevents it from being selected. 189 self.assertAddressNotUsable(self.test_ip, self.test_netid) 190 self.assertAddressNotSelected(self.test_ip, self.test_netid) 191 192 # Busy wait for DAD to complete (should be less than 1 second). 193 self.WaitForDad(self.test_ip) 194 195 # The test_ip should have completed DAD by now, and should be the 196 # chosen source address, eligible to bind to, etc. 197 self.assertAddressUsable(self.test_ip, self.test_netid) 198 self.assertAddressSelected(self.test_ip, self.test_netid) 199 200 201class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest): 202 203 def testRfc6724Behaviour(self): 204 # [3] Get an IPv6 address back, in optimistic DAD start-up. 205 self.SetDAD(self.test_ifname, 1) # Enable DAD 206 self.SetOptimisticDAD(self.test_ifname, 1) 207 # Send a RA to start SLAAC and subsequent DAD. 208 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 209 # Get flags and prove optimism. 210 self.assertAddressHasExpectedAttributes( 211 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 212 213 # Optimistic addresses are usable but are not selected. 214 if net_test.LINUX_VERSION >= (3, 18, 0): 215 # The version checked in to android kernels <= 3.10 requires the 216 # use_optimistic sysctl to be turned on. 217 self.assertAddressUsable(self.test_ip, self.test_netid) 218 self.assertAddressNotSelected(self.test_ip, self.test_netid) 219 220 # Busy wait for DAD to complete (should be less than 1 second). 221 self.WaitForDad(self.test_ip) 222 223 # The test_ip should have completed DAD by now, and should be the 224 # chosen source address. 225 self.assertAddressUsable(self.test_ip, self.test_netid) 226 self.assertAddressSelected(self.test_ip, self.test_netid) 227 228 229class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest): 230 231 def testModifiedRfc6724Behaviour(self): 232 # [3] Get an IPv6 address back, in optimistic DAD start-up. 233 self.SetDAD(self.test_ifname, 1) # Enable DAD 234 self.SetOptimisticDAD(self.test_ifname, 1) 235 self.SetUseOptimistic(self.test_ifname, 1) 236 # Send a RA to start SLAAC and subsequent DAD. 237 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 238 # Get flags and prove optimistism. 239 self.assertAddressHasExpectedAttributes( 240 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 241 242 # The interface has an IPv6 address and, despite its optimistic nature, 243 # the use_optimistic option allows it to be selected. 244 self.assertAddressUsable(self.test_ip, self.test_netid) 245 self.assertAddressSelected(self.test_ip, self.test_netid) 246 247 248class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 249 250 def testModifiedRfc6724Behaviour(self): 251 # [3] Add a valid IPv6 address to this interface and verify it is 252 # selected as the source address. 253 preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe" 254 self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex) 255 self.assertAddressHasExpectedAttributes( 256 preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT) 257 self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid)) 258 259 # [4] Get another IPv6 address, in optimistic DAD start-up. 260 self.SetDAD(self.test_ifname, 1) # Enable DAD 261 self.SetOptimisticDAD(self.test_ifname, 1) 262 self.SetUseOptimistic(self.test_ifname, 1) 263 # Send a RA to start SLAAC and subsequent DAD. 264 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 265 # Get flags and prove optimism. 266 self.assertAddressHasExpectedAttributes( 267 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 268 269 # Since the interface has another IPv6 address, the optimistic address 270 # is not selected--the other, valid address is chosen. 271 self.assertAddressUsable(self.test_ip, self.test_netid) 272 self.assertAddressNotSelected(self.test_ip, self.test_netid) 273 self.assertAddressSelected(preferred_ip, self.test_netid) 274 275 276class DadFailureTest(MultiInterfaceSourceAddressSelectionTest): 277 278 def testDadFailure(self): 279 # [3] Get an IPv6 address back, in optimistic DAD start-up. 280 self.SetDAD(self.test_ifname, 1) # Enable DAD 281 self.SetOptimisticDAD(self.test_ifname, 1) 282 self.SetUseOptimistic(self.test_ifname, 1) 283 # Send a RA to start SLAAC and subsequent DAD. 284 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 285 # Prove optimism and usability. 286 self.assertAddressHasExpectedAttributes( 287 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 288 self.assertAddressUsable(self.test_ip, self.test_netid) 289 self.assertAddressSelected(self.test_ip, self.test_netid) 290 291 # Send a NA for the optimistic address, indicating address conflict 292 # ("DAD defense"). 293 conflict_macaddr = "02:00:0b:ad:d0:0d" 294 dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") / 295 scapy.IPv6(src=self.test_ip, dst="ff02::1") / 296 scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) / 297 scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr)) 298 self.ReceiveEtherPacketOn(self.test_netid, dad_defense) 299 self.WaitForDad(self.test_lladdr) 300 301 # The address should have failed DAD, and therefore no longer be usable. 302 self.assertAddressNotUsable(self.test_ip, self.test_netid) 303 self.assertAddressNotSelected(self.test_ip, self.test_netid) 304 305 # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address. 306 307 308class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 309 310 def testSendToOnlinkDestination(self): 311 # [3] Get an IPv6 address back, in optimistic DAD start-up. 312 self.SetDAD(self.test_ifname, 1) # Enable DAD 313 self.SetOptimisticDAD(self.test_ifname, 1) 314 self.SetUseOptimistic(self.test_ifname, 1) 315 # Send a RA to start SLAAC and subsequent DAD. 316 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 317 # Prove optimism and usability. 318 self.assertAddressHasExpectedAttributes( 319 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 320 self.assertAddressUsable(self.test_ip, self.test_netid) 321 self.assertAddressSelected(self.test_ip, self.test_netid) 322 323 # [4] Send to an on-link destination and observe a Neighbor Solicitation 324 # packet with a source address that is NOT the optimistic address. 325 # In this setup, the only usable address is the link-local address. 326 onlink_dest = self.GetRandomDestination( 327 self.OnlinkPrefix(6, self.test_netid)) 328 self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest) 329 330 if net_test.LINUX_VERSION >= (3, 18, 0): 331 # Older versions will actually choose the optimistic address to 332 # originate Neighbor Solications (RFC violation). 333 expected_ns = packets.NS( 334 self.test_lladdr, 335 onlink_dest, 336 self.MyMacAddress(self.test_netid))[1] 337 self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns) 338 339 340# TODO(ek): add tests listening for netlink events. 341 342 343class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 344 345 def testChoosesNonInterfaceSourceAddress(self): 346 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0) 347 src_ip = self.GetSourceIP(self.test_netid) 348 self.assertFalse(src_ip in [self.test_ip, self.test_lladdr]) 349 self.assertTrue(src_ip in 350 [self.MyAddress(6, netid) 351 for netid in self.tuns if netid != self.test_netid]) 352 353 354class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 355 356 def testChoosesOnlyInterfaceSourceAddress(self): 357 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1) 358 # self.test_ifname does not have a global IPv6 address, so the only 359 # candidate is the existing link-local address. 360 self.assertAddressSelected(self.test_lladdr, self.test_netid) 361 362 363if __name__ == "__main__": 364 unittest.main() 365