• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import errno
18import random
19from socket import *  # pylint: disable=wildcard-import
20import time
21import unittest
22
23from scapy import all as scapy
24
25import csocket
26import iproute
27import multinetwork_base
28import packets
29import net_test
30
31# Setsockopt values.
32IPV6_ADDR_PREFERENCES = 72
33IPV6_PREFER_SRC_PUBLIC = 0x0002
34
35# The retrans timer is also the DAD timeout. We set this to a value that's not
36# so short that DAD will complete before we attempt to use the network, but
37# short enough that we don't have to wait too long for DAD to complete.
38RETRANS_TIMER = 150
39
40
41class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
42  """Test for IPv6 source address selection.
43
44  Relevant kernel commits:
45    upstream net-next:
46      7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
47      c58da4c net: ipv6: allow explicitly choosing optimistic addresses
48      9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface.
49      c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr().
50      c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr().
51      3985e8a ipv6: sysctl to restrict candidate source addresses
52
53    android-3.10:
54      2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
55      0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic
56      0633924 ipv6: sysctl to restrict candidate source addresses
57  """
58
59  def SetIPv6Sysctl(self, ifname, sysctl, value):
60    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value)
61
62  def SetDAD(self, ifname, value):
63    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
64    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
65
66  def SetOptimisticDAD(self, ifname, value):
67    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
68
69  def SetUseTempaddrs(self, ifname, value):
70    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
71
72  def SetUseOptimistic(self, ifname, value):
73    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
74
75  def SetForwarding(self, value):
76    self.SetSysctl("/proc/sys/net/ipv6/conf/all/forwarding", value)
77
78  def GetSourceIP(self, netid, mode="mark"):
79    s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
80    # Because why not...testing for temporary addresses is a separate thing.
81    s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
82
83    s.connect((net_test.IPV6_ADDR, 123))
84    src_addr = s.getsockname()[0]
85    self.assertTrue(src_addr)
86    return src_addr
87
88  def assertAddressNotPresent(self, address):
89    self.assertRaises(IOError, self.iproute.GetAddress, address)
90
91  def assertAddressHasExpectedAttributes(
92      self, address, expected_ifindex, expected_flags):
93    ifa_msg = self.iproute.GetAddress(address)[0]
94    self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
95    self.assertEqual(64, ifa_msg.prefixlen)
96    self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
97    self.assertEqual(expected_ifindex, ifa_msg.index)
98    self.assertEqual(expected_flags, ifa_msg.flags & expected_flags)
99
100  def AddressIsTentative(self, address):
101    ifa_msg = self.iproute.GetAddress(address)[0]
102    return ifa_msg.flags & iproute.IFA_F_TENTATIVE
103
104  def BindToAddress(self, address):
105    s = net_test.UDPSocket(AF_INET6)
106    s.bind((address, 0, 0, 0))
107
108  def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
109    pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
110    cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
111    s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
112    return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)
113
114  def assertAddressUsable(self, address, netid):
115    self.BindToAddress(address)
116    self.SendWithSourceAddress(address, netid)
117    # No exceptions? Good.
118
119  def assertAddressNotUsable(self, address, netid):
120    self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
121    self.assertRaisesErrno(errno.EINVAL,
122                           self.SendWithSourceAddress, address, netid)
123
124  def assertAddressSelected(self, address, netid):
125    self.assertEqual(address, self.GetSourceIP(netid))
126
127  def assertAddressNotSelected(self, address, netid):
128    self.assertNotEqual(address, self.GetSourceIP(netid))
129
130  def WaitForDad(self, address):
131    for _ in range(20):
132      if not self.AddressIsTentative(address):
133        return
134      time.sleep(0.1)
135    raise AssertionError("%s did not complete DAD after 2 seconds")
136
137
138class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
139
140  def setUp(self):
141    # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
142    # are all consistently disabled at the outset.
143    for netid in self.tuns:
144      ifname = self.GetInterfaceName(netid)
145      self.SetDAD(ifname, 0)
146      self.SetOptimisticDAD(ifname, 0)
147      self.SetUseTempaddrs(ifname, 0)
148      self.SetUseOptimistic(ifname, 0)
149      self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)
150
151    # [1]  Pick an interface on which to test.
152    self.test_netid = random.choice(list(self.tuns.keys()))
153    self.test_ip = self.MyAddress(6, self.test_netid)
154    self.test_ifindex = self.ifindices[self.test_netid]
155    self.test_ifname = self.GetInterfaceName(self.test_netid)
156    self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True)
157
158    # [2]  Delete the test interface's IPv6 address.
159    self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
160    self.assertAddressNotPresent(self.test_ip)
161
162    self.assertAddressNotUsable(self.test_ip, self.test_netid)
163    # Verify that the link-local address is not tentative.
164    # Even though we disable DAD above, without this change occasionally the
165    # test fails. This might be due to us disabling DAD only after the
166    # link-local address is generated.
167    self.WaitForDad(self.test_lladdr)
168
169    # Disable forwarding, because optimistic addresses don't work when
170    # forwarding is on. Forwarding will be re-enabled when the sysctls are
171    # restored by MultiNetworkBaseTest.tearDownClass.
172    # TODO: Fix this and remove this hack.
173    self.SetForwarding("0")
174
175
176class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
177
178  def testRfc6724Behaviour(self):
179    # [3]  Get an IPv6 address back, in DAD start-up.
180    self.SetDAD(self.test_ifname, 1)  # Enable DAD
181    # Send a RA to start SLAAC and subsequent DAD.
182    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
183    # Get flags and prove tentative-ness.
184    self.assertAddressHasExpectedAttributes(
185        self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
186
187    # Even though the interface has an IPv6 address, its tentative nature
188    # prevents it from being selected.
189    self.assertAddressNotUsable(self.test_ip, self.test_netid)
190    self.assertAddressNotSelected(self.test_ip, self.test_netid)
191
192    # Busy wait for DAD to complete (should be less than 1 second).
193    self.WaitForDad(self.test_ip)
194
195    # The test_ip should have completed DAD by now, and should be the
196    # chosen source address, eligible to bind to, etc.
197    self.assertAddressUsable(self.test_ip, self.test_netid)
198    self.assertAddressSelected(self.test_ip, self.test_netid)
199
200
201class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
202
203  def testRfc6724Behaviour(self):
204    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
205    self.SetDAD(self.test_ifname, 1)  # Enable DAD
206    self.SetOptimisticDAD(self.test_ifname, 1)
207    # Send a RA to start SLAAC and subsequent DAD.
208    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
209    # Get flags and prove optimism.
210    self.assertAddressHasExpectedAttributes(
211        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
212
213    # Optimistic addresses are usable but are not selected.
214    if net_test.LINUX_VERSION >= (3, 18, 0):
215      # The version checked in to android kernels <= 3.10 requires the
216      # use_optimistic sysctl to be turned on.
217      self.assertAddressUsable(self.test_ip, self.test_netid)
218    self.assertAddressNotSelected(self.test_ip, self.test_netid)
219
220    # Busy wait for DAD to complete (should be less than 1 second).
221    self.WaitForDad(self.test_ip)
222
223    # The test_ip should have completed DAD by now, and should be the
224    # chosen source address.
225    self.assertAddressUsable(self.test_ip, self.test_netid)
226    self.assertAddressSelected(self.test_ip, self.test_netid)
227
228
229class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
230
231  def testModifiedRfc6724Behaviour(self):
232    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
233    self.SetDAD(self.test_ifname, 1)  # Enable DAD
234    self.SetOptimisticDAD(self.test_ifname, 1)
235    self.SetUseOptimistic(self.test_ifname, 1)
236    # Send a RA to start SLAAC and subsequent DAD.
237    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
238    # Get flags and prove optimistism.
239    self.assertAddressHasExpectedAttributes(
240        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
241
242    # The interface has an IPv6 address and, despite its optimistic nature,
243    # the use_optimistic option allows it to be selected.
244    self.assertAddressUsable(self.test_ip, self.test_netid)
245    self.assertAddressSelected(self.test_ip, self.test_netid)
246
247
248class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
249
250  def testModifiedRfc6724Behaviour(self):
251    # [3]  Add a valid IPv6 address to this interface and verify it is
252    # selected as the source address.
253    preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe"
254    self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
255    self.assertAddressHasExpectedAttributes(
256        preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
257    self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid))
258
259    # [4]  Get another IPv6 address, in optimistic DAD start-up.
260    self.SetDAD(self.test_ifname, 1)  # Enable DAD
261    self.SetOptimisticDAD(self.test_ifname, 1)
262    self.SetUseOptimistic(self.test_ifname, 1)
263    # Send a RA to start SLAAC and subsequent DAD.
264    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
265    # Get flags and prove optimism.
266    self.assertAddressHasExpectedAttributes(
267        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
268
269    # Since the interface has another IPv6 address, the optimistic address
270    # is not selected--the other, valid address is chosen.
271    self.assertAddressUsable(self.test_ip, self.test_netid)
272    self.assertAddressNotSelected(self.test_ip, self.test_netid)
273    self.assertAddressSelected(preferred_ip, self.test_netid)
274
275
276class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
277
278  def testDadFailure(self):
279    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
280    self.SetDAD(self.test_ifname, 1)  # Enable DAD
281    self.SetOptimisticDAD(self.test_ifname, 1)
282    self.SetUseOptimistic(self.test_ifname, 1)
283    # Send a RA to start SLAAC and subsequent DAD.
284    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
285    # Prove optimism and usability.
286    self.assertAddressHasExpectedAttributes(
287        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
288    self.assertAddressUsable(self.test_ip, self.test_netid)
289    self.assertAddressSelected(self.test_ip, self.test_netid)
290
291    # Send a NA for the optimistic address, indicating address conflict
292    # ("DAD defense").
293    conflict_macaddr = "02:00:0b:ad:d0:0d"
294    dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
295                   scapy.IPv6(src=self.test_ip, dst="ff02::1") /
296                   scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
297                   scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
298    self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
299    self.WaitForDad(self.test_lladdr)
300
301    # The address should have failed DAD, and therefore no longer be usable.
302    self.assertAddressNotUsable(self.test_ip, self.test_netid)
303    self.assertAddressNotSelected(self.test_ip, self.test_netid)
304
305    # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
306
307
308class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
309
310  def testSendToOnlinkDestination(self):
311    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
312    self.SetDAD(self.test_ifname, 1)  # Enable DAD
313    self.SetOptimisticDAD(self.test_ifname, 1)
314    self.SetUseOptimistic(self.test_ifname, 1)
315    # Send a RA to start SLAAC and subsequent DAD.
316    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
317    # Prove optimism and usability.
318    self.assertAddressHasExpectedAttributes(
319        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
320    self.assertAddressUsable(self.test_ip, self.test_netid)
321    self.assertAddressSelected(self.test_ip, self.test_netid)
322
323    # [4]  Send to an on-link destination and observe a Neighbor Solicitation
324    # packet with a source address that is NOT the optimistic address.
325    # In this setup, the only usable address is the link-local address.
326    onlink_dest = self.GetRandomDestination(
327        self.OnlinkPrefix(6, self.test_netid))
328    self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
329
330    if net_test.LINUX_VERSION >= (3, 18, 0):
331      # Older versions will actually choose the optimistic address to
332      # originate Neighbor Solications (RFC violation).
333      expected_ns = packets.NS(
334          self.test_lladdr,
335          onlink_dest,
336          self.MyMacAddress(self.test_netid))[1]
337      self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
338
339
340# TODO(ek): add tests listening for netlink events.
341
342
343class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
344
345  def testChoosesNonInterfaceSourceAddress(self):
346    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0)
347    src_ip = self.GetSourceIP(self.test_netid)
348    self.assertFalse(src_ip in [self.test_ip, self.test_lladdr])
349    self.assertTrue(src_ip in
350                    [self.MyAddress(6, netid)
351                     for netid in self.tuns if netid != self.test_netid])
352
353
354class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
355
356  def testChoosesOnlyInterfaceSourceAddress(self):
357    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1)
358    # self.test_ifname does not have a global IPv6 address, so the only
359    # candidate is the existing link-local address.
360    self.assertAddressSelected(self.test_lladdr, self.test_netid)
361
362
363if __name__ == "__main__":
364  unittest.main()
365