1#!/usr/bin/python 2# 3# Copyright 2015 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import errno 18import random 19from socket import * # pylint: disable=wildcard-import 20import time 21import unittest 22 23from scapy import all as scapy 24 25import csocket 26import multinetwork_base 27import net_test 28 29 30RTMGRP_NEIGH = 4 31 32NUD_INCOMPLETE = 0x01 33NUD_REACHABLE = 0x02 34NUD_STALE = 0x04 35NUD_DELAY = 0x08 36NUD_PROBE = 0x10 37NUD_FAILED = 0x20 38NUD_PERMANENT = 0x80 39 40 41# TODO: Support IPv4. 42class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): 43 44 # Set a 500-ms retrans timer so we can test for ND retransmits without 45 # waiting too long. Apparently this cannot go below 500ms. 46 RETRANS_TIME_MS = 500 47 48 # This can only be in seconds, so 1000 is the minimum. 49 DELAY_TIME_MS = 1000 50 51 # Unfortunately, this must be above the delay timer or the kernel ND code will 52 # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is 53 # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value 54 # that's 2x the delay timer. 55 BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS 56 MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS 57 58 # Kernel default unicast solicit is 3, but it need be changed larger 59 # when test recofiguration during probing 60 UCAST_SOLICIT_DEFAULT = 3 61 UCAST_SOLICIT_LARGE = 10 62 63 @classmethod 64 def setUpClass(cls): 65 super(NeighbourTest, cls).setUpClass() 66 for netid in cls.tuns: 67 iface = cls.GetInterfaceName(netid) 68 # This can't be set in an RA. 69 for proto in ["ipv4", "ipv6"]: 70 cls.SetSysctl( 71 "/proc/sys/net/%s/neigh/%s/delay_first_probe_time" % (proto, iface), 72 cls.DELAY_TIME_MS / 1000) 73 cls.SetSysctl( 74 "/proc/sys/net/%s/neigh/%s/retrans_time_ms" % (proto, iface), 75 cls.RETRANS_TIME_MS) 76 77 def setUp(self): 78 super(NeighbourTest, self).setUp() 79 80 for netid in self.tuns: 81 # Clear the ND cache entries for all routers, so each test starts with 82 # the IPv6 default router in state STALE. 83 addr = self._RouterAddress(netid, 6) 84 ifindex = self.ifindices[netid] 85 self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED) 86 87 # Configure IPv6 by sending an RA. 88 self.SendRA(netid, 89 retranstimer=self.RETRANS_TIME_MS, 90 reachabletime=self.BASE_REACHABLE_TIME_MS) 91 92 self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) 93 self.sock.bind((0, RTMGRP_NEIGH)) 94 net_test.SetNonBlocking(self.sock) 95 96 self.netid = random.choice(list(self.tuns.keys())) 97 self.ifindex = self.ifindices[self.netid] 98 99 # MultinetworkBaseTest always uses NUD_PERMANENT for router ARP entries. 100 # Temporarily change those entries to NUD_STALE so we can test them. 101 if net_test.LINUX_VERSION < (4, 9, 0): 102 # Cannot change state from NUD_PERMANENT to NUD_STALE directly, 103 # so delete it to make it NUD_FAILED then change it to NUD_STALE. 104 router = self._RouterAddress(self.netid, 4) 105 macaddr = self.RouterMacAddress(self.netid) 106 self.iproute.DelNeighbour(4, router, macaddr, self.ifindex) 107 self.ExpectNeighbourNotification(router, NUD_FAILED) 108 self.assertNeighbourState(NUD_FAILED, router) 109 self.ChangeRouterNudState(4, NUD_STALE) 110 111 def SetUnicastSolicit(self, proto, iface, value): 112 self.SetSysctl( 113 "/proc/sys/net/%s/neigh/%s/ucast_solicit" % (proto, iface), value) 114 115 def tearDown(self): 116 super(NeighbourTest, self).tearDown() 117 # It is already reset to default by TearDownClass, 118 # but here we need to set it to default after each testcase. 119 iface = self.GetInterfaceName(self.netid) 120 for proto in ["ipv4", "ipv6"]: 121 self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT) 122 123 # Change router ARP entries back to NUD_PERMANENT, 124 # so as not to affect other tests. 125 self.ChangeRouterNudState(4, NUD_PERMANENT) 126 127 def ChangeRouterNudState(self, version, state): 128 router = self._RouterAddress(self.netid, version) 129 macaddr = self.RouterMacAddress(self.netid) 130 self.iproute.UpdateNeighbour(version, router, macaddr, self.ifindex, state) 131 self.ExpectNeighbourNotification(router, state) 132 self.assertNeighbourState(state, router) 133 134 def GetNeighbour(self, addr, ifindex): 135 version = csocket.AddressVersion(addr) 136 for msg, args in self.iproute.DumpNeighbours(version, ifindex): 137 if args["NDA_DST"] == addr: 138 return msg, args 139 140 def GetNdEntry(self, addr): 141 return self.GetNeighbour(addr, self.ifindex) 142 143 def CheckNoNdEvents(self): 144 self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK) 145 146 def assertNeighbourState(self, state, addr): 147 self.assertEqual(state, self.GetNdEntry(addr)[0].state) 148 149 def assertNeighbourAttr(self, addr, name, value): 150 self.assertEqual(value, self.GetNdEntry(addr)[1][name]) 151 152 def ExpectNeighbourNotification(self, addr, state, attrs=None): 153 msg = self.sock.recv(4096) 154 msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg) 155 self.assertEqual(addr, actual_attrs["NDA_DST"]) 156 self.assertEqual(state, msg.state) 157 if attrs: 158 for name in attrs: 159 self.assertEqual(attrs[name], actual_attrs[name]) 160 161 def ExpectProbe(self, is_unicast, addr): 162 version = csocket.AddressVersion(addr) 163 llsrc = self.MyMacAddress(self.netid) 164 if version == 6: 165 if is_unicast: 166 src = self.MyLinkLocalAddress(self.netid) 167 dst = addr 168 else: 169 solicited = inet_pton(AF_INET6, addr) 170 last3bytes = tuple([ord(b) for b in solicited[-3:]]) 171 dst = "ff02::1:ff%02x:%02x%02x" % last3bytes 172 src = self.MyAddress(6, self.netid) 173 expected = ( 174 scapy.IPv6(src=src, dst=dst) / 175 scapy.ICMPv6ND_NS(tgt=addr) / 176 scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc) 177 ) 178 msg = "%s probe" % ("Unicast" if is_unicast else "Multicast") 179 self.ExpectPacketOn(self.netid, msg, expected) 180 else: # version == 4 181 if is_unicast: 182 src = self._MyIPv4Address(self.netid) 183 dst = addr 184 else: 185 raise NotImplementedError("This test does not support broadcast ARP") 186 expected = scapy.ARP(psrc=src, pdst=dst, hwsrc=llsrc, op=1) 187 msg = "Unicast ARP probe" 188 self.ExpectPacketOn(self.netid, msg, expected) 189 190 def ExpectUnicastProbe(self, addr): 191 self.ExpectProbe(True, addr) 192 193 def ExpectMulticastNS(self, addr): 194 self.ExpectProbe(False, addr) 195 196 def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None, 197 S=1, O=0, R=1): 198 version = csocket.AddressVersion(addr) 199 if srcaddr is None: 200 srcaddr = addr 201 if dstaddr is None: 202 dstaddr = self.MyLinkLocalAddress(self.netid) 203 if version == 6: 204 packet = ( 205 scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) / 206 scapy.IPv6(src=srcaddr, dst=dstaddr) / 207 scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) / 208 scapy.ICMPv6NDOptDstLLAddr(lladdr=mac) 209 ) 210 self.ReceiveEtherPacketOn(self.netid, packet) 211 else: 212 raise NotImplementedError 213 214 def SendDnsRequest(self, addr): 215 version = csocket.AddressVersion(addr) 216 routing_mode = random.choice(["mark", "oif", "uid"]) 217 s = self.BuildSocket(version, net_test.UDPSocket, self.netid, routing_mode) 218 s.connect((addr, 53)) 219 s.send(net_test.UDP_PAYLOAD) 220 return s 221 222 def MonitorSleepMs(self, interval, addr): 223 slept = 0 224 while slept < interval: 225 sleep_ms = min(100, interval - slept) 226 time.sleep(sleep_ms / 1000.0) 227 slept += sleep_ms 228 print(self.GetNdEntry(addr)) 229 230 def MonitorSleep(self, intervalseconds, addr): 231 self.MonitorSleepMs(intervalseconds * 1000, addr) 232 233 def SleepMs(self, ms): 234 time.sleep(ms / 1000.0) 235 236 def testNotifications(self): 237 """Tests neighbour notifications. 238 239 Relevant kernel commits: 240 upstream net-next: 241 765c9c6 neigh: Better handling of transition to NUD_PROBE state 242 53385d2 neigh: Netlink notification for administrative NUD state change 243 (only checked on kernel v3.13+, not on v3.10) 244 245 android-3.10: 246 e4a6d6b neigh: Better handling of transition to NUD_PROBE state 247 248 android-3.18: 249 2011e72 neigh: Better handling of transition to NUD_PROBE state 250 """ 251 router4 = self._RouterAddress(self.netid, 4) 252 router6 = self._RouterAddress(self.netid, 6) 253 self.assertNeighbourState(NUD_STALE, router4) 254 self.assertNeighbourState(NUD_STALE, router6) 255 256 # Send a packet and check that we go into DELAY. 257 s = self.SendDnsRequest(net_test.IPV6_ADDR) 258 self.assertNeighbourState(NUD_DELAY, router6) 259 260 # Wait for the probe interval, then check that we're in PROBE, and that the 261 # kernel has notified us. 262 self.SleepMs(self.DELAY_TIME_MS * 1.1) 263 self.ExpectNeighbourNotification(router6, NUD_PROBE) 264 self.assertNeighbourState(NUD_PROBE, router6) 265 self.ExpectUnicastProbe(router6) 266 267 # Respond to the NS and verify we're in REACHABLE again. 268 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid)) 269 self.assertNeighbourState(NUD_REACHABLE, router6) 270 if net_test.LINUX_VERSION >= (3, 13, 0): 271 # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative 272 # NUD state change" produces notifications for NUD_REACHABLE, but these 273 # are not generated on earlier kernels. 274 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 275 276 # Wait until the reachable time has passed, and verify we're in STALE. 277 self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2) 278 self.assertNeighbourState(NUD_STALE, router6) 279 self.ExpectNeighbourNotification(router6, NUD_STALE) 280 281 # Send a packet, and verify we go into DELAY and then to PROBE. 282 s.send(net_test.UDP_PAYLOAD) 283 self.assertNeighbourState(NUD_DELAY, router6) 284 self.SleepMs(self.DELAY_TIME_MS * 1.1) 285 self.assertNeighbourState(NUD_PROBE, router6) 286 self.ExpectNeighbourNotification(router6, NUD_PROBE) 287 288 # Wait for the probes to time out, and expect a FAILED notification. 289 self.assertNeighbourAttr(router6, "NDA_PROBES", 1) 290 self.ExpectUnicastProbe(router6) 291 292 self.SleepMs(self.RETRANS_TIME_MS) 293 self.ExpectUnicastProbe(router6) 294 self.assertNeighbourAttr(router6, "NDA_PROBES", 2) 295 296 self.SleepMs(self.RETRANS_TIME_MS) 297 self.ExpectUnicastProbe(router6) 298 self.assertNeighbourAttr(router6, "NDA_PROBES", 3) 299 300 self.SleepMs(self.RETRANS_TIME_MS) 301 self.assertNeighbourState(NUD_FAILED, router6) 302 self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3}) 303 304 def testRepeatedProbes(self): 305 router4 = self._RouterAddress(self.netid, 4) 306 router6 = self._RouterAddress(self.netid, 6) 307 routermac = self.RouterMacAddress(self.netid) 308 self.assertNeighbourState(NUD_STALE, router4) 309 self.assertNeighbourState(NUD_STALE, router6) 310 311 def ForceProbe(addr, mac): 312 self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE) 313 self.assertNeighbourState(NUD_PROBE, addr) 314 self.ExpectNeighbourNotification(addr, NUD_PROBE) 315 self.SleepMs(1) # TODO: Why is this necessary? 316 self.assertNeighbourState(NUD_PROBE, addr) 317 self.ExpectUnicastProbe(addr) 318 self.ReceiveUnicastAdvertisement(addr, mac) 319 self.assertNeighbourState(NUD_REACHABLE, addr) 320 self.ExpectNeighbourNotification(addr, NUD_REACHABLE) 321 322 for _ in range(5): 323 ForceProbe(router6, routermac) 324 325 def testIsRouterFlag(self): 326 router6 = self._RouterAddress(self.netid, 6) 327 self.assertNeighbourState(NUD_STALE, router6) 328 329 # Get into FAILED. 330 ifindex = self.ifindices[self.netid] 331 self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED) 332 self.ExpectNeighbourNotification(router6, NUD_FAILED) 333 self.assertNeighbourState(NUD_FAILED, router6) 334 335 time.sleep(1) 336 337 # Send another packet and expect a multicast NS. 338 self.SendDnsRequest(net_test.IPV6_ADDR) 339 self.ExpectMulticastNS(router6) 340 341 # Receive a unicast NA with the R flag set to 0. 342 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid), 343 srcaddr=self._RouterAddress(self.netid, 6), 344 dstaddr=self.MyAddress(6, self.netid), 345 S=1, O=0, R=0) 346 347 # Expect that this takes us to REACHABLE. 348 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 349 self.assertNeighbourState(NUD_REACHABLE, router6) 350 351 def DoReconfigureDuringProbing(self, version): 352 if version == 6: 353 proto = "ipv6" 354 ip_addr = net_test.IPV6_ADDR 355 else: 356 proto = "ipv4" 357 ip_addr = net_test.IPV4_ADDR 358 router = self._RouterAddress(self.netid, version) 359 self.assertNeighbourState(NUD_STALE, router) 360 361 iface = self.GetInterfaceName(self.netid) 362 # set unicast solicit larger. 363 self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_LARGE) 364 365 # Send a packet and check that we go into DELAY. 366 self.SendDnsRequest(ip_addr) 367 self.assertNeighbourState(NUD_DELAY, router) 368 369 # Probing 4 times but no reponse 370 self.SleepMs(self.DELAY_TIME_MS * 1.1) 371 self.ExpectNeighbourNotification(router, NUD_PROBE) 372 self.assertNeighbourState(NUD_PROBE, router) 373 self.ExpectUnicastProbe(router) 374 375 for i in range(0, 3): 376 self.SleepMs(self.RETRANS_TIME_MS) 377 self.ExpectUnicastProbe(router) 378 379 # reconfiguration to 3 while probing and the state change to NUD_FAILED 380 self.SetUnicastSolicit(proto, iface, self.UCAST_SOLICIT_DEFAULT) 381 self.SleepMs(self.RETRANS_TIME_MS) 382 self.ExpectNeighbourNotification(router, NUD_FAILED) 383 self.assertNeighbourState(NUD_FAILED, router) 384 385 # Check neighbor state after re-config ARP probe times. 386 def testReconfigureDuringProbing(self): 387 self.DoReconfigureDuringProbing(4) 388 self.DoReconfigureDuringProbing(6) 389 390if __name__ == "__main__": 391 unittest.main() 392