1#!/usr/bin/python3 2# 3# Copyright 2014 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import time 21import unittest 22 23from scapy import all as scapy 24 25import csocket 26import iproute 27import multinetwork_base 28import packets 29import net_test 30 31# Setsockopt values. 32IPV6_ADDR_PREFERENCES = 72 33IPV6_PREFER_SRC_PUBLIC = 0x0002 34 35# The retrans timer is also the DAD timeout. We set this to a value that's not 36# so short that DAD will complete before we attempt to use the network, but 37# short enough that we don't have to wait too long for DAD to complete. 38RETRANS_TIMER = 150 39 40 41class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): 42 """Test for IPv6 source address selection. 43 44 Relevant kernel commits: 45 upstream net-next: 46 7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 47 c58da4c net: ipv6: allow explicitly choosing optimistic addresses 48 9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface. 49 c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr(). 50 c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr(). 51 3985e8a ipv6: sysctl to restrict candidate source addresses 52 53 android-3.10: 54 2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates 55 0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic 56 0633924 ipv6: sysctl to restrict candidate source addresses 57 """ 58 59 def SetIPv6Sysctl(self, ifname, sysctl, value): 60 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value) 61 62 def SetDAD(self, ifname, value): 63 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value) 64 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value) 65 66 def SetOptimisticDAD(self, ifname, value): 67 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value) 68 69 def SetUseTempaddrs(self, ifname, value): 70 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value) 71 72 def SetUseOptimistic(self, ifname, value): 73 self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value) 74 75 def SetForwarding(self, value): 76 self.SetSysctl("/proc/sys/net/ipv6/conf/all/forwarding", value) 77 78 def GetSourceIP(self, netid, mode="mark"): 79 s = self.BuildSocket(6, net_test.UDPSocket, netid, mode) 80 # Because why not...testing for temporary addresses is a separate thing. 81 s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC) 82 83 s.connect((net_test.IPV6_ADDR, 123)) 84 src_addr = s.getsockname()[0] 85 self.assertTrue(src_addr) 86 return src_addr 87 88 def assertAddressNotPresent(self, address): 89 self.assertRaises(IOError, self.iproute.GetAddress, address) 90 91 def assertAddressHasExpectedAttributes( 92 self, address, expected_ifindex, expected_flags): 93 ifa_msg = self.iproute.GetAddress(address)[0] 94 self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) 95 self.assertEqual(64, ifa_msg.prefixlen) 96 self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) 97 self.assertEqual(expected_ifindex, ifa_msg.index) 98 self.assertEqual(expected_flags, ifa_msg.flags & expected_flags) 99 100 def AddressIsTentative(self, address): 101 ifa_msg = self.iproute.GetAddress(address)[0] 102 return ifa_msg.flags & iproute.IFA_F_TENTATIVE 103 104 def BindToAddress(self, address): 105 s = net_test.UDPSocket(AF_INET6) 106 s.bind((address, 0, 0, 0)) 107 108 def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR): 109 pktinfo = multinetwork_base.MakePktInfo(6, address, 0) 110 cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)] 111 s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark") 112 return csocket.Sendmsg(s, (dest, 53), b"Hello", cmsgs, 0) 113 114 def assertAddressUsable(self, address, netid): 115 self.BindToAddress(address) 116 self.SendWithSourceAddress(address, netid) 117 # No exceptions? Good. 118 119 def assertAddressNotUsable(self, address, netid): 120 self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address) 121 self.assertRaisesErrno(errno.EINVAL, 122 self.SendWithSourceAddress, address, netid) 123 124 def assertAddressSelected(self, address, netid): 125 self.assertEqual(address, self.GetSourceIP(netid)) 126 127 def assertAddressNotSelected(self, address, netid): 128 self.assertNotEqual(address, self.GetSourceIP(netid)) 129 130 def WaitForDad(self, address): 131 for _ in range(20): 132 if not self.AddressIsTentative(address): 133 return 134 time.sleep(0.1) 135 raise AssertionError("%s did not complete DAD after 2 seconds") 136 137 138class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest): 139 140 def setUp(self): 141 # [0] Make sure DAD, optimistic DAD, and the use_optimistic option 142 # are all consistently disabled at the outset. 143 for netid in self.tuns: 144 ifname = self.GetInterfaceName(netid) 145 self.SetDAD(ifname, 0) 146 self.SetOptimisticDAD(ifname, 0) 147 self.SetUseTempaddrs(ifname, 0) 148 self.SetUseOptimistic(ifname, 0) 149 self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0) 150 151 # [1] Pick an interface on which to test. 152 self.test_netid = random.choice(list(self.tuns.keys())) 153 self.test_ip = self.MyAddress(6, self.test_netid) 154 self.test_ifindex = self.ifindices[self.test_netid] 155 self.test_ifname = self.GetInterfaceName(self.test_netid) 156 self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True) 157 158 # [2] Delete the test interface's IPv6 address. 159 self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex) 160 self.assertAddressNotPresent(self.test_ip) 161 162 self.assertAddressNotUsable(self.test_ip, self.test_netid) 163 # Verify that the link-local address is not tentative. 164 # Even though we disable DAD above, without this change occasionally the 165 # test fails. This might be due to us disabling DAD only after the 166 # link-local address is generated. 167 self.WaitForDad(self.test_lladdr) 168 169 # Disable forwarding, because optimistic addresses don't work when 170 # forwarding is on. Forwarding will be re-enabled when the sysctls are 171 # restored by MultiNetworkBaseTest.tearDownClass. 172 # TODO: Fix this and remove this hack. 173 self.SetForwarding("0") 174 175 176class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest): 177 178 def testRfc6724Behaviour(self): 179 # [3] Get an IPv6 address back, in DAD start-up. 180 self.SetDAD(self.test_ifname, 1) # Enable DAD 181 # Send a RA to start SLAAC and subsequent DAD. 182 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 183 # Get flags and prove tentative-ness. 184 self.assertAddressHasExpectedAttributes( 185 self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE) 186 187 # Even though the interface has an IPv6 address, its tentative nature 188 # prevents it from being selected. 189 self.assertAddressNotUsable(self.test_ip, self.test_netid) 190 self.assertAddressNotSelected(self.test_ip, self.test_netid) 191 192 # Busy wait for DAD to complete (should be less than 1 second). 193 self.WaitForDad(self.test_ip) 194 195 # The test_ip should have completed DAD by now, and should be the 196 # chosen source address, eligible to bind to, etc. 197 self.assertAddressUsable(self.test_ip, self.test_netid) 198 self.assertAddressSelected(self.test_ip, self.test_netid) 199 200 201class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest): 202 203 def testRfc6724Behaviour(self): 204 # [3] Get an IPv6 address back, in optimistic DAD start-up. 205 self.SetDAD(self.test_ifname, 1) # Enable DAD 206 self.SetOptimisticDAD(self.test_ifname, 1) 207 # Send a RA to start SLAAC and subsequent DAD. 208 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 209 # Get flags and prove optimism. 210 self.assertAddressHasExpectedAttributes( 211 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 212 213 # Optimistic addresses are usable but are not selected. 214 self.assertAddressUsable(self.test_ip, self.test_netid) 215 self.assertAddressNotSelected(self.test_ip, self.test_netid) 216 217 # Busy wait for DAD to complete (should be less than 1 second). 218 self.WaitForDad(self.test_ip) 219 220 # The test_ip should have completed DAD by now, and should be the 221 # chosen source address. 222 self.assertAddressUsable(self.test_ip, self.test_netid) 223 self.assertAddressSelected(self.test_ip, self.test_netid) 224 225 226class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest): 227 228 def testModifiedRfc6724Behaviour(self): 229 # [3] Get an IPv6 address back, in optimistic DAD start-up. 230 self.SetDAD(self.test_ifname, 1) # Enable DAD 231 self.SetOptimisticDAD(self.test_ifname, 1) 232 self.SetUseOptimistic(self.test_ifname, 1) 233 # Send a RA to start SLAAC and subsequent DAD. 234 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 235 # Get flags and prove optimistism. 236 self.assertAddressHasExpectedAttributes( 237 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 238 239 # The interface has an IPv6 address and, despite its optimistic nature, 240 # the use_optimistic option allows it to be selected. 241 self.assertAddressUsable(self.test_ip, self.test_netid) 242 self.assertAddressSelected(self.test_ip, self.test_netid) 243 244 245class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 246 247 def testModifiedRfc6724Behaviour(self): 248 # [3] Add a valid IPv6 address to this interface and verify it is 249 # selected as the source address. 250 preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe" 251 self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex) 252 self.assertAddressHasExpectedAttributes( 253 preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT) 254 self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid)) 255 256 # [4] Get another IPv6 address, in optimistic DAD start-up. 257 self.SetDAD(self.test_ifname, 1) # Enable DAD 258 self.SetOptimisticDAD(self.test_ifname, 1) 259 self.SetUseOptimistic(self.test_ifname, 1) 260 # Send a RA to start SLAAC and subsequent DAD. 261 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 262 # Get flags and prove optimism. 263 self.assertAddressHasExpectedAttributes( 264 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 265 266 # Since the interface has another IPv6 address, the optimistic address 267 # is not selected--the other, valid address is chosen. 268 self.assertAddressUsable(self.test_ip, self.test_netid) 269 self.assertAddressNotSelected(self.test_ip, self.test_netid) 270 self.assertAddressSelected(preferred_ip, self.test_netid) 271 272 273class DadFailureTest(MultiInterfaceSourceAddressSelectionTest): 274 275 def testDadFailure(self): 276 # [3] Get an IPv6 address back, in optimistic DAD start-up. 277 self.SetDAD(self.test_ifname, 1) # Enable DAD 278 self.SetOptimisticDAD(self.test_ifname, 1) 279 self.SetUseOptimistic(self.test_ifname, 1) 280 # Send a RA to start SLAAC and subsequent DAD. 281 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 282 # Prove optimism and usability. 283 self.assertAddressHasExpectedAttributes( 284 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 285 self.assertAddressUsable(self.test_ip, self.test_netid) 286 self.assertAddressSelected(self.test_ip, self.test_netid) 287 288 # Send a NA for the optimistic address, indicating address conflict 289 # ("DAD defense"). 290 conflict_macaddr = "02:00:0b:ad:d0:0d" 291 dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") / 292 scapy.IPv6(src=self.test_ip, dst="ff02::1") / 293 scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) / 294 scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr)) 295 self.ReceiveEtherPacketOn(self.test_netid, dad_defense) 296 self.WaitForDad(self.test_lladdr) 297 298 # The address should have failed DAD, and therefore no longer be usable. 299 self.assertAddressNotUsable(self.test_ip, self.test_netid) 300 self.assertAddressNotSelected(self.test_ip, self.test_netid) 301 302 # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address. 303 304 305class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest): 306 307 def testSendToOnlinkDestination(self): 308 # [3] Get an IPv6 address back, in optimistic DAD start-up. 309 self.SetDAD(self.test_ifname, 1) # Enable DAD 310 self.SetOptimisticDAD(self.test_ifname, 1) 311 self.SetUseOptimistic(self.test_ifname, 1) 312 # Send a RA to start SLAAC and subsequent DAD. 313 self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER) 314 # Prove optimism and usability. 315 self.assertAddressHasExpectedAttributes( 316 self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) 317 self.assertAddressUsable(self.test_ip, self.test_netid) 318 self.assertAddressSelected(self.test_ip, self.test_netid) 319 320 # [4] Send to an on-link destination and observe a Neighbor Solicitation 321 # packet with a source address that is NOT the optimistic address. 322 # In this setup, the only usable address is the link-local address. 323 onlink_dest = self.GetRandomDestination( 324 self.OnlinkPrefix(6, self.test_netid)) 325 self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest) 326 327 expected_ns = packets.NS( 328 self.test_lladdr, 329 onlink_dest, 330 self.MyMacAddress(self.test_netid))[1] 331 self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns) 332 333 334# TODO(ek): add tests listening for netlink events. 335 336 337class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 338 339 def testChoosesNonInterfaceSourceAddress(self): 340 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0) 341 src_ip = self.GetSourceIP(self.test_netid) 342 self.assertFalse(src_ip in [self.test_ip, self.test_lladdr]) 343 self.assertTrue(src_ip in 344 [self.MyAddress(6, netid) 345 for netid in self.tuns if netid != self.test_netid]) 346 347 348class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest): 349 350 def testChoosesOnlyInterfaceSourceAddress(self): 351 self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1) 352 # self.test_ifname does not have a global IPv6 address, so the only 353 # candidate is the existing link-local address. 354 self.assertAddressSelected(self.test_lladdr, self.test_netid) 355 356 357if __name__ == "__main__": 358 unittest.main() 359