1#!/usr/bin/python3 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import 18from errno import * # pylint: disable=wildcard-import 19from socket import * # pylint: disable=wildcard-import 20 21import random 22import itertools 23import struct 24import unittest 25 26from net_test import LINUX_VERSION 27from scapy import all as scapy 28from tun_twister import TunTwister 29import csocket 30import iproute 31import multinetwork_base 32import net_test 33import packets 34import util 35import xfrm 36import xfrm_base 37 38_LOOPBACK_IFINDEX = 1 39_TEST_XFRM_IFNAME = "ipsec42" 40_TEST_XFRM_IF_ID = 42 41_TEST_SPI = 0x1234 42 43# Does the kernel support CONFIG_XFRM_INTERFACE? 44def HaveXfrmInterfaces(): 45 # 4.19+ must have CONFIG_XFRM_INTERFACE enabled 46 if LINUX_VERSION >= (4, 19, 0): 47 return True 48 49 try: 50 i = iproute.IPRoute() 51 i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, 52 _LOOPBACK_IFINDEX) 53 i.DeleteLink(_TEST_XFRM_IFNAME) 54 try: 55 i.GetIfIndex(_TEST_XFRM_IFNAME) 56 assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME 57 except IOError: 58 pass 59 return True 60 except IOError: 61 return False 62 63HAVE_XFRM_INTERFACES = HaveXfrmInterfaces() 64 65# Two kernel fixes have been added in 5.17 to allow XFRM_MIGRATE to work correctly 66# when (1) there are multiple tunnels with the same selectors; and (2) addresses 67# are updated to a different IP family. These two fixes were pulled into upstream 68# LTS releases 4.14.273, 4.19.236, 5.4.186, 5.10.107 and 5.15.30, from whence they 69# flowed into the Android Common Kernel (via standard LTS merges). 70# As such we require 4.14.273+, 4.19.236+, 5.4.186+, 5.10.107+, 5.15.30+ or 5.17+ 71# to have these fixes. 72def HasXfrmMigrateFixes(): 73 return ( 74 ((LINUX_VERSION >= (4, 14, 273)) and (LINUX_VERSION < (4, 19, 0))) or 75 ((LINUX_VERSION >= (4, 19, 236)) and (LINUX_VERSION < (5, 4, 0))) or 76 ((LINUX_VERSION >= (5, 4, 186)) and (LINUX_VERSION < (5, 10, 0))) or 77 ((LINUX_VERSION >= (5, 10, 107)) and (LINUX_VERSION < (5, 15, 0))) or 78 (LINUX_VERSION >= (5, 15, 30)) 79 ) 80 81 82# Does the kernel support CONFIG_XFRM_MIGRATE and include the kernel fixes? 83def SupportsXfrmMigrate(): 84 if not HasXfrmMigrateFixes(): 85 return False 86 87 # 5.10+ must have CONFIG_XFRM_MIGRATE enabled 88 if LINUX_VERSION >= (5, 10, 0): 89 return True 90 91 # XFRM_MIGRATE depends on xfrmi interfaces 92 if not HAVE_XFRM_INTERFACES: 93 return False 94 95 try: 96 x = xfrm.Xfrm() 97 wildcard_addr = net_test.GetWildcardAddress(6) 98 selector = xfrm.EmptySelector(AF_INET6) 99 100 # Expect migration to fail with EINVAL because it is trying to migrate a 101 # non-existent SA. 102 x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr, 103 wildcard_addr, wildcard_addr, _TEST_SPI, 104 None, None, None, None, None, None) 105 print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled") 106 return True 107 except IOError as err: 108 if err.errno == ENOPROTOOPT: 109 return False 110 elif err.errno == EINVAL: 111 return True 112 else: 113 print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno) 114 return True 115 116SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate() 117 118# Parameters to setup tunnels as special networks 119_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService 120_BASE_TUNNEL_NETID = {4: 40, 6: 60} 121_BASE_VTI_OKEY = 2000000100 122_BASE_VTI_IKEY = 2000000200 123 124_TEST_OUT_SPI = _TEST_SPI 125_TEST_IN_SPI = _TEST_OUT_SPI 126 127_TEST_OKEY = 2000000100 128_TEST_IKEY = 2000000200 129 130_TEST_REMOTE_PORT = 1234 131 132_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6} 133 134 135def _GetLocalInnerAddress(version): 136 return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version] 137 138 139def _GetRemoteInnerAddress(version): 140 return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version] 141 142 143def _GetRemoteOuterAddress(version): 144 return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] 145 146 147def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer, 148 src_port, dst_inner, dst_outer, 149 dst_port, spi, seq_num, ip_hdr_options=None): 150 if ip_hdr_options is None: 151 ip_hdr_options = {} 152 153 ip_hdr_options.update({'src': src_inner, 'dst': dst_inner}) 154 155 # Build and receive an ESP packet destined for the inner socket 156 IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] 157 input_pkt = ( 158 IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) / 159 net_test.UDP_PAYLOAD) 160 input_pkt = IpType(bytes(input_pkt)) # Compute length, checksum. 161 input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num, 162 (src_outer, dst_outer)) 163 164 return input_pkt 165 166 167def _CreateReceiveSock(version, port=0): 168 # Create a socket to receive packets. 169 read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) 170 read_sock.bind((net_test.GetWildcardAddress(version), port)) 171 # The second parameter of the tuple is the port number regardless of AF. 172 local_port = read_sock.getsockname()[1] 173 # Guard against the eventuality of the receive failing. 174 csocket.SetSocketTimeout(read_sock, 500) 175 176 return read_sock, local_port 177 178 179def _SendPacket(testInstance, netid, version, remote, remote_port): 180 # Send a packet out via the tunnel-backed network, bound for the port number 181 # of the input socket. 182 write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) 183 testInstance.SelectInterface(write_sock, netid, "mark") 184 write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port)) 185 local_port = write_sock.getsockname()[1] 186 187 return local_port 188 189 190def InjectTests(): 191 InjectParameterizedTests(XfrmTunnelTest) 192 InjectParameterizedTests(XfrmInterfaceTest) 193 InjectParameterizedTests(XfrmVtiTest) 194 InjectParameterizedMigrateTests(XfrmInterfaceMigrateTest) 195 196 197def InjectParameterizedTests(cls): 198 VERSIONS = (4, 6) 199 param_list = itertools.product(VERSIONS, VERSIONS) 200 201 def NameGenerator(*args): 202 return "IPv%d_in_IPv%d" % tuple(args) 203 204 util.InjectParameterizedTest(cls, param_list, NameGenerator) 205 206def InjectParameterizedMigrateTests(cls): 207 VERSIONS = (4, 6) 208 param_list = itertools.product(VERSIONS, VERSIONS, VERSIONS) 209 210 def NameGenerator(*args): 211 return "IPv%d_in_IPv%d_to_outer_IPv%d" % tuple(args) 212 213 util.InjectParameterizedTest(cls, param_list, NameGenerator) 214 215 216class XfrmTunnelTest(xfrm_base.XfrmLazyTest): 217 218 def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid, 219 netid, local_inner, remote_inner, local_outer, 220 remote_outer, write_sock): 221 222 write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) 223 self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, 224 local_outer, remote_outer) 225 226 def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid, 227 netid, local_inner, remote_inner, local_outer, 228 remote_outer, read_sock): 229 230 # The second parameter of the tuple is the port number regardless of AF. 231 local_port = read_sock.getsockname()[1] 232 233 # Build and receive an ESP packet destined for the inner socket 234 input_pkt = _GetNullAuthCryptTunnelModePkt( 235 inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT, 236 local_inner, local_outer, local_port, _TEST_IN_SPI, 1) 237 self.ReceivePacketOn(underlying_netid, input_pkt) 238 239 # Verify that the packet data and src are correct 240 data, src = read_sock.recvfrom(4096) 241 self.assertEqual(net_test.UDP_PAYLOAD, data) 242 self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) 243 244 def _TestTunnel(self, inner_version, outer_version, func, direction, 245 test_output_mark_unset): 246 """Test a unidirectional XFRM Tunnel with explicit selectors""" 247 # Select the underlying netid, which represents the external 248 # interface from/to which to route ESP packets. 249 u_netid = self.RandomNetid() 250 # Select a random netid that will originate traffic locally and 251 # which represents the netid on which the plaintext is sent 252 netid = self.RandomNetid(exclude=u_netid) 253 254 local_inner = self.MyAddress(inner_version, netid) 255 remote_inner = _GetRemoteInnerAddress(inner_version) 256 local_outer = self.MyAddress(outer_version, u_netid) 257 remote_outer = _GetRemoteOuterAddress(outer_version) 258 259 output_mark = u_netid 260 if test_output_mark_unset: 261 output_mark = None 262 self.SetDefaultNetwork(u_netid) 263 264 try: 265 # Create input/ouput SPs, SAs and sockets to simulate a more realistic 266 # environment. 267 self.xfrm.CreateTunnel( 268 xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner), 269 remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL, 270 xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL) 271 272 self.xfrm.CreateTunnel( 273 xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner), 274 local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, 275 xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL) 276 277 write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) 278 self.SelectInterface(write_sock, netid, "mark") 279 read_sock, _ = _CreateReceiveSock(inner_version) 280 281 sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock 282 func(inner_version, outer_version, u_netid, netid, local_inner, 283 remote_inner, local_outer, remote_outer, sock) 284 finally: 285 if test_output_mark_unset: 286 self.ClearDefaultNetwork() 287 288 def ParamTestTunnelInput(self, inner_version, outer_version): 289 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, 290 xfrm.XFRM_POLICY_IN, False) 291 292 def ParamTestTunnelOutput(self, inner_version, outer_version): 293 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 294 xfrm.XFRM_POLICY_OUT, False) 295 296 def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version): 297 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 298 xfrm.XFRM_POLICY_OUT, True) 299 300 301class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): 302 def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, 303 ikey, okey): 304 self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey) 305 self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey) 306 307 family = AF_INET if version == 4 else AF_INET6 308 self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), 309 local_addr) 310 self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), 311 remote_addr) 312 313 def testAddVti(self): 314 """Test the creation of a Virtual Tunnel Interface.""" 315 for version in [4, 6]: 316 netid = self.RandomNetid() 317 local_addr = self.MyAddress(version, netid) 318 self.iproute.CreateVirtualTunnelInterface( 319 dev_name=_TEST_XFRM_IFNAME, 320 local_addr=local_addr, 321 remote_addr=_GetRemoteOuterAddress(version), 322 o_key=_TEST_OKEY, 323 i_key=_TEST_IKEY) 324 self._VerifyVtiInfoData( 325 self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, 326 _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY) 327 328 new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} 329 new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID 330 new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID 331 self.iproute.CreateVirtualTunnelInterface( 332 dev_name=_TEST_XFRM_IFNAME, 333 local_addr=local_addr, 334 remote_addr=new_remote_addr[version], 335 o_key=new_okey, 336 i_key=new_ikey, 337 is_update=True) 338 339 self._VerifyVtiInfoData( 340 self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, 341 new_remote_addr[version], new_ikey, new_okey) 342 343 if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 344 345 # Validate that the netlink interface matches the ioctl interface. 346 self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) 347 self.iproute.DeleteLink(_TEST_XFRM_IFNAME) 348 with self.assertRaises(IOError): 349 self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 350 351 def _QuietDeleteLink(self, ifname): 352 try: 353 self.iproute.DeleteLink(ifname) 354 except IOError: 355 # The link was not present. 356 pass 357 358 def tearDown(self): 359 super(XfrmAddDeleteVtiTest, self).tearDown() 360 self._QuietDeleteLink(_TEST_XFRM_IFNAME) 361 362 363class SaInfo(object): 364 365 def __init__(self, spi): 366 self.spi = spi 367 self.seq_num = 1 368 369 370class IpSecBaseInterface(object): 371 372 def __init__(self, iface, netid, underlying_netid, local, remote, version): 373 self.iface = iface 374 self.netid = netid 375 self.underlying_netid = underlying_netid 376 self.local, self.remote = local, remote 377 378 # XFRM interfaces technically do not have a version. This keeps track of 379 # the IP version of the local and remote addresses. 380 self.version = version 381 self.rx = self.tx = 0 382 self.addrs = {} 383 384 self.iproute = iproute.IPRoute() 385 self.xfrm = xfrm.Xfrm() 386 387 def Teardown(self): 388 self.TeardownXfrm() 389 self.TeardownInterface() 390 391 def TeardownInterface(self): 392 self.iproute.DeleteLink(self.iface) 393 394 def SetupXfrm(self, use_null_crypt): 395 rand_spi = random.randint(0, 0x7fffffff) 396 self.in_sa = SaInfo(rand_spi) 397 self.out_sa = SaInfo(rand_spi) 398 399 # Select algorithms: 400 if use_null_crypt: 401 auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL 402 else: 403 auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256 404 405 self.auth = auth 406 self.crypt = crypt 407 408 self._SetupXfrmByType(auth, crypt) 409 410 def Rekey(self, outer_family, new_out_sa, new_in_sa): 411 """Rekeys the Tunnel Interface 412 413 Creates new SAs and updates the outbound security policy to use new SAs. 414 415 Args: 416 outer_family: AF_INET or AF_INET6 417 new_out_sa: An SaInfo struct representing the new outbound SA's info 418 new_in_sa: An SaInfo struct representing the new inbound SA's info 419 """ 420 self._Rekey(outer_family, new_out_sa, new_in_sa) 421 422 # Update Interface object 423 self.out_sa = new_out_sa 424 self.in_sa = new_in_sa 425 426 def TeardownXfrm(self): 427 raise NotImplementedError("Subclasses should implement this") 428 429 def _SetupXfrmByType(self, auth_algo, crypt_algo): 430 raise NotImplementedError("Subclasses should implement this") 431 432 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 433 raise NotImplementedError("Subclasses should implement this") 434 435 436class VtiInterface(IpSecBaseInterface): 437 438 def __init__(self, iface, netid, underlying_netid, _, local, remote, version): 439 super(VtiInterface, self).__init__(iface, netid, underlying_netid, local, 440 remote, version) 441 442 self.ikey = _TEST_IKEY + netid 443 self.okey = _TEST_OKEY + netid 444 445 self.SetupInterface() 446 self.SetupXfrm(False) 447 448 def SetupInterface(self): 449 return self.iproute.CreateVirtualTunnelInterface( 450 self.iface, self.local, self.remote, self.ikey, self.okey) 451 452 def _SetupXfrmByType(self, auth_algo, crypt_algo): 453 # For the VTI, the selectors are wildcard since packets will only 454 # be selected if they have the appropriate mark, hence the inner 455 # addresses are wildcard. 456 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 457 self.out_sa.spi, crypt_algo, auth_algo, 458 xfrm.ExactMatchMark(self.okey), 459 self.underlying_netid, None, xfrm.MATCH_METHOD_ALL) 460 461 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 462 self.in_sa.spi, crypt_algo, auth_algo, 463 xfrm.ExactMatchMark(self.ikey), None, None, 464 xfrm.MATCH_METHOD_MARK) 465 466 def TeardownXfrm(self): 467 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, 468 self.out_sa.spi, self.okey, None) 469 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, 470 self.in_sa.spi, self.ikey, None) 471 472 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 473 # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly 474 # the same, but rekeys are asymmetric, and only update the outbound 475 # policy. 476 self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi, 477 xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, 478 xfrm_base._ALGO_AUTH_NULL, None, None, 479 xfrm.ExactMatchMark(self.okey), self.underlying_netid) 480 481 self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi, 482 xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, 483 xfrm_base._ALGO_AUTH_NULL, None, None, 484 xfrm.ExactMatchMark(self.ikey), None) 485 486 # Create new policies for IPv4 and IPv6. 487 for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: 488 # Add SPI-specific output policy to enforce using new outbound SPI 489 policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) 490 tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, 491 (self.local, self.remote)) 492 self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey), 493 0) 494 495 def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): 496 self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, 497 xfrm.ExactMatchMark(self.ikey)) 498 self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, 499 xfrm.ExactMatchMark(self.okey)) 500 501 502@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") 503class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): 504 """Test the creation of an XFRM Interface.""" 505 506 def testAddXfrmInterface(self): 507 self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, 508 _LOOPBACK_IFINDEX) 509 if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 510 net_test.SetInterfaceUp(_TEST_XFRM_IFNAME) 511 512 # Validate that the netlink interface matches the ioctl interface. 513 self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) 514 self.iproute.DeleteLink(_TEST_XFRM_IFNAME) 515 with self.assertRaises(IOError): 516 self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 517 518 519class XfrmInterface(IpSecBaseInterface): 520 521 def __init__(self, iface, netid, underlying_netid, ifindex, local, remote, 522 version, use_null_crypt=False): 523 super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local, 524 remote, version) 525 526 self.ifindex = ifindex 527 self.xfrm_if_id = netid 528 529 self.SetupInterface() 530 self.SetupXfrm(use_null_crypt) 531 532 def SetupInterface(self): 533 """Create an XFRM interface.""" 534 return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex) 535 536 def _SetupXfrmByType(self, auth_algo, crypt_algo): 537 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 538 self.out_sa.spi, crypt_algo, auth_algo, None, 539 self.underlying_netid, self.xfrm_if_id, 540 xfrm.MATCH_METHOD_ALL) 541 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 542 self.in_sa.spi, crypt_algo, auth_algo, None, None, 543 self.xfrm_if_id, xfrm.MATCH_METHOD_IFID) 544 545 def TeardownXfrm(self): 546 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, 547 self.out_sa.spi, None, self.xfrm_if_id) 548 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, 549 self.in_sa.spi, None, self.xfrm_if_id) 550 551 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 552 # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly 553 # the same, but rekeys are asymmetric, and only update the outbound 554 # policy. 555 self.xfrm.AddSaInfo( 556 self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, 557 xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, 558 None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id) 559 560 self.xfrm.AddSaInfo( 561 self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, 562 xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, 563 None, None, xfrm_if_id=self.xfrm_if_id) 564 565 # Create new policies for IPv4 and IPv6. 566 for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: 567 # Add SPI-specific output policy to enforce using new outbound SPI 568 policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) 569 tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, 570 (self.local, self.remote)) 571 self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id) 572 573 def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): 574 self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None, 575 self.xfrm_if_id) 576 self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None, 577 self.xfrm_if_id) 578 579 def Migrate(self, new_underlying_netid, new_local, new_remote): 580 self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 581 new_remote, new_local, self.in_sa.spi, 582 self.crypt, self.auth, None, None, 583 new_underlying_netid, self.xfrm_if_id) 584 585 self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 586 new_local, new_remote, self.out_sa.spi, 587 self.crypt, self.auth, None, None, 588 new_underlying_netid, self.xfrm_if_id) 589 590 self.local = new_local 591 self.remote = new_remote 592 self.version = net_test.GetAddressVersion(new_local) 593 self.underlying_netid = new_underlying_netid 594 595 596class XfrmTunnelBase(xfrm_base.XfrmBaseTest): 597 598 @classmethod 599 def setUpClass(cls): 600 xfrm_base.XfrmBaseTest.setUpClass() 601 # Tunnel interfaces use marks extensively, so configure realistic packet 602 # marking rules to make the test representative, make PMTUD work, etc. 603 cls.SetInboundMarks(True) 604 cls.SetMarkReflectSysctls(1) 605 606 # Group by tunnel version to ensure that we test at least one IPv4 and one 607 # IPv6 tunnel 608 cls.tunnelsV4 = {} 609 cls.tunnelsV6 = {} 610 611 for i, underlying_netid in enumerate(cls.tuns): 612 for version in 4, 6: 613 netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i 614 iface = "ipsec%s" % netid 615 local = cls.MyAddress(version, underlying_netid) 616 if version == 4: 617 remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2) 618 else: 619 remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2) 620 621 ifindex = cls.ifindices[underlying_netid] 622 tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex, 623 local, remote, version) 624 cls._SetInboundMarking(netid, iface, True) 625 cls._SetupTunnelNetwork(tunnel, True) 626 627 if version == 4: 628 cls.tunnelsV4[netid] = tunnel 629 else: 630 cls.tunnelsV6[netid] = tunnel 631 632 @classmethod 633 def tearDownClass(cls): 634 # The sysctls are restored by MultinetworkBaseTest.tearDownClass. 635 cls.SetInboundMarks(False) 636 for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()): 637 cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) 638 cls._SetupTunnelNetwork(tunnel, False) 639 tunnel.Teardown() 640 xfrm_base.XfrmBaseTest.tearDownClass() 641 642 def randomTunnel(self, outer_version): 643 version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6 644 return random.choice(list(version_dict.values())) 645 646 def setUp(self): 647 multinetwork_base.MultiNetworkBaseTest.setUp(self) 648 self.iproute = iproute.IPRoute() 649 self.xfrm = xfrm.Xfrm() 650 651 def tearDown(self): 652 multinetwork_base.MultiNetworkBaseTest.tearDown(self) 653 654 def _SwapInterfaceAddress(self, ifname, old_addr, new_addr): 655 """Exchange two addresses on a given interface. 656 657 Args: 658 ifname: Name of the interface 659 old_addr: An address to be removed from the interface 660 new_addr: An address to be added to an interface 661 """ 662 version = 6 if ":" in new_addr else 4 663 ifindex = net_test.GetInterfaceIndex(ifname) 664 self.iproute.AddAddress(new_addr, 665 net_test.AddressLengthBits(version), ifindex) 666 self.iproute.DelAddress(old_addr, 667 net_test.AddressLengthBits(version), ifindex) 668 669 @classmethod 670 def _GetLocalAddress(cls, version, netid): 671 if version == 4: 672 return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET) 673 else: 674 return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1" 675 676 @classmethod 677 def _SetupTunnelNetwork(cls, tunnel, is_add): 678 """Setup rules and routes for a tunnel Network. 679 680 Takes an interface and depending on the boolean 681 value of is_add, either adds or removes the rules 682 and routes for a tunnel interface to behave like an 683 Android Network for purposes of testing. 684 685 Args: 686 tunnel: A VtiInterface or XfrmInterface, the tunnel to set up. 687 is_add: Boolean that causes this method to perform setup if True or 688 teardown if False 689 """ 690 if is_add: 691 # Disable router solicitations to avoid occasional spurious packets 692 # arriving on the underlying network; there are two possible behaviors 693 # when that occurred: either only the RA packet is read, and when it 694 # is echoed back to the tunnel, it causes the test to fail by not 695 # receiving # the UDP_PAYLOAD; or, two packets may arrive on the 696 # underlying # network which fails the assertion that only one ESP packet 697 # is received. 698 cls.SetSysctl( 699 "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0) 700 net_test.SetInterfaceUp(tunnel.iface) 701 702 for version in [4, 6]: 703 ifindex = net_test.GetInterfaceIndex(tunnel.iface) 704 table = tunnel.netid 705 706 # Set up routing rules. 707 start, end = cls.UidRangeForNetid(tunnel.netid) 708 cls.iproute.UidRangeRule(version, is_add, start, end, table, 709 cls.PRIORITY_UID) 710 cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF) 711 cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK, 712 table, cls.PRIORITY_FWMARK) 713 714 # Configure IP addresses. 715 addr = cls._GetLocalAddress(version, tunnel.netid) 716 prefixlen = net_test.AddressLengthBits(version) 717 tunnel.addrs[version] = addr 718 if is_add: 719 cls.iproute.AddAddress(addr, prefixlen, ifindex) 720 cls.iproute.AddRoute(version, table, "default", 0, None, ifindex) 721 else: 722 cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) 723 cls.iproute.DelAddress(addr, prefixlen, ifindex) 724 725 def assertReceivedPacket(self, tunnel, sa_info): 726 tunnel.rx += 1 727 self.assertEqual((tunnel.rx, tunnel.tx), 728 self.iproute.GetRxTxPackets(tunnel.iface)) 729 sa_info.seq_num += 1 730 731 def assertSentPacket(self, tunnel, sa_info): 732 tunnel.tx += 1 733 self.assertEqual((tunnel.rx, tunnel.tx), 734 self.iproute.GetRxTxPackets(tunnel.iface)) 735 sa_info.seq_num += 1 736 737 def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner, 738 sa_info=None, expect_fail=False): 739 """Test null-crypt input path over an IPsec interface.""" 740 if sa_info is None: 741 sa_info = tunnel.in_sa 742 read_sock, local_port = _CreateReceiveSock(inner_version) 743 744 input_pkt = _GetNullAuthCryptTunnelModePkt( 745 inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT, 746 local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num) 747 self.ReceivePacketOn(tunnel.underlying_netid, input_pkt) 748 749 if expect_fail: 750 self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096) 751 else: 752 # Verify that the packet data and src are correct 753 data, src = read_sock.recvfrom(4096) 754 self.assertReceivedPacket(tunnel, sa_info) 755 self.assertEqual(net_test.UDP_PAYLOAD, data) 756 self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) 757 758 def _CheckTunnelOutput(self, tunnel, inner_version, local_inner, 759 remote_inner, sa_info=None): 760 """Test null-crypt output path over an IPsec interface.""" 761 if sa_info is None: 762 sa_info = tunnel.out_sa 763 local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 764 _TEST_REMOTE_PORT) 765 766 # Read a tunneled IP packet on the underlying (outbound) network 767 # verifying that it is an ESP packet. 768 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, 769 sa_info.seq_num, None, tunnel.local, 770 tunnel.remote) 771 772 # Get and update the IP headers on the inner payload so that we can do a simple 773 # comparison of byte data. Unfortunately, due to the scapy version this runs on, 774 # we cannot parse past the ESP header to the inner IP header, and thus have to 775 # workaround in this manner 776 if inner_version == 4: 777 ip_hdr_options = { 778 'id': scapy.IP(bytes(pkt.payload)[8:]).id, 779 'flags': scapy.IP(bytes(pkt.payload)[8:]).flags 780 } 781 else: 782 ip_hdr_options = {'fl': scapy.IPv6(bytes(pkt.payload)[8:]).fl} 783 784 expected = _GetNullAuthCryptTunnelModePkt( 785 inner_version, local_inner, tunnel.local, local_port, remote_inner, 786 tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num, 787 ip_hdr_options) 788 789 # Check outer header manually (Avoids having to overwrite outer header's 790 # id, flags or flow label) 791 self.assertSentPacket(tunnel, sa_info) 792 self.assertEqual(expected.src, pkt.src) 793 self.assertEqual(expected.dst, pkt.dst) 794 self.assertEqual(len(expected), len(pkt)) 795 796 # Check everything else 797 self.assertEqual(bytes(expected.payload), bytes(pkt.payload)) 798 799 def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner, 800 remote_inner): 801 """Test both input and output paths over an encrypted IPsec interface. 802 803 This tests specifically makes sure that the both encryption and decryption 804 work together, as opposed to the _CheckTunnel(Input|Output) where the 805 input and output paths are tested separately, and using null encryption. 806 """ 807 src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 808 _TEST_REMOTE_PORT) 809 810 # Make sure it appeared on the underlying interface 811 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi, 812 tunnel.out_sa.seq_num, None, tunnel.local, 813 tunnel.remote) 814 815 # Check that packet is not sent in plaintext 816 self.assertTrue(bytes(net_test.UDP_PAYLOAD) not in bytes(pkt)) 817 818 # Check src/dst 819 self.assertEqual(tunnel.local, pkt.src) 820 self.assertEqual(tunnel.remote, pkt.dst) 821 822 # Check that the interface statistics recorded the outbound packet 823 self.assertSentPacket(tunnel, tunnel.out_sa) 824 825 try: 826 # Swap the interface addresses to pretend we are the remote 827 self._SwapInterfaceAddress( 828 tunnel.iface, new_addr=remote_inner, old_addr=local_inner) 829 830 # Swap the packet's IP headers and write it back to the underlying 831 # network. 832 pkt = TunTwister.TwistPacket(pkt) 833 read_sock, local_port = _CreateReceiveSock(inner_version, 834 _TEST_REMOTE_PORT) 835 self.ReceivePacketOn(tunnel.underlying_netid, pkt) 836 837 # Verify that the packet data and src are correct 838 data, src = read_sock.recvfrom(4096) 839 self.assertEqual(net_test.UDP_PAYLOAD, data) 840 self.assertEqual((local_inner, src_port), src[:2]) 841 842 # Check that the interface statistics recorded the inbound packet 843 self.assertReceivedPacket(tunnel, tunnel.in_sa) 844 finally: 845 # Swap the interface addresses to pretend we are the remote 846 self._SwapInterfaceAddress( 847 tunnel.iface, new_addr=local_inner, old_addr=remote_inner) 848 849 def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner, 850 sa_info=None): 851 """Test ICMP error path over an IPsec interface.""" 852 if sa_info is None: 853 sa_info = tunnel.out_sa 854 # Now attempt to provoke an ICMP error. 855 # TODO: deduplicate with multinetwork_test.py. 856 dst_prefix, intermediate = { 857 4: ("172.19.", "172.16.9.12"), 858 6: ("2001:db8::", "2001:db8::1") 859 }[tunnel.version] 860 861 local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 862 _TEST_REMOTE_PORT) 863 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, 864 sa_info.seq_num, None, tunnel.local, 865 tunnel.remote) 866 self.assertSentPacket(tunnel, sa_info) 867 868 myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid) 869 _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr, 870 pkt) 871 self.ReceivePacketOn(tunnel.underlying_netid, toobig) 872 873 # Check that the packet too big reduced the MTU. 874 routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None) 875 self.assertEqual(1, len(routes)) 876 rtmsg, attributes = routes[0] 877 self.assertEqual(iproute.RTN_UNICAST, rtmsg.type) 878 self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) 879 880 # Clear PMTU information so that future tests don't have to worry about it. 881 self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid) 882 883 def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner, 884 remote_inner): 885 """Test combined encryption path with ICMP errors over an IPsec tunnel""" 886 self._CheckTunnelEncryption(tunnel, inner_version, local_inner, 887 remote_inner) 888 self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner) 889 self._CheckTunnelEncryption(tunnel, inner_version, local_inner, 890 remote_inner) 891 892 def _RebuildTunnel(self, tunnel, use_null_crypt): 893 # Some tests require that the out_seq_num and in_seq_num are the same 894 # (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1 895 # 896 # Until we get better scapy support, the only way we can build an 897 # encrypted packet is to send it out, and read the packet from the wire. 898 # We then generally use this as the "inbound" encrypted packet, injecting 899 # it into the interface for which it is expected on. 900 # 901 # As such, this is required to ensure that encrypted packets (which we 902 # currently have no way to easily modify) are not considered replay 903 # attacks by the inbound SA. (eg: received 3 packets, seq_num_in = 3, 904 # sent only 1, # seq_num_out = 1, inbound SA would consider this a replay 905 # attack) 906 tunnel.TeardownXfrm() 907 tunnel.SetupXfrm(use_null_crypt) 908 909 def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): 910 """Bootstrap method to setup and run tests for the given parameters.""" 911 tunnel = self.randomTunnel(outer_version) 912 913 try: 914 self._RebuildTunnel(tunnel, use_null_crypt) 915 916 local_inner = tunnel.addrs[inner_version] 917 remote_inner = _GetRemoteInnerAddress(inner_version) 918 919 for i in range(2): 920 func(tunnel, inner_version, local_inner, remote_inner) 921 finally: 922 if use_null_crypt: 923 tunnel.TeardownXfrm() 924 tunnel.SetupXfrm(False) 925 926 def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner): 927 old_out_sa = tunnel.out_sa 928 old_in_sa = tunnel.in_sa 929 930 # Check to make sure that both directions work before rekey 931 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 932 old_in_sa) 933 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 934 old_out_sa) 935 936 # Rekey 937 outer_family = net_test.GetAddressFamily(tunnel.version) 938 939 # Create new SA 940 # Distinguish the new SAs with new SPIs. 941 new_out_sa = SaInfo(old_out_sa.spi + 1) 942 new_in_sa = SaInfo(old_in_sa.spi + 1) 943 944 # Perform Rekey 945 tunnel.Rekey(outer_family, new_out_sa, new_in_sa) 946 947 # Expect that the old SPI still works for inbound packets 948 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 949 old_in_sa) 950 951 # Test both paths with new SPIs, expect outbound to use new SPI 952 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 953 new_in_sa) 954 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 955 new_out_sa) 956 957 # Delete old SAs 958 tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi) 959 960 # Test both paths with new SPIs; should still work 961 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 962 new_in_sa) 963 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 964 new_out_sa) 965 966 # Expect failure upon trying to receive a packet with the deleted SPI 967 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 968 old_in_sa, True) 969 970 def _TestTunnelRekey(self, inner_version, outer_version): 971 """Test packet input and output over a Virtual Tunnel Interface.""" 972 tunnel = self.randomTunnel(outer_version) 973 974 try: 975 # Always use null_crypt, so we can check input and output separately 976 tunnel.TeardownXfrm() 977 tunnel.SetupXfrm(True) 978 979 local_inner = tunnel.addrs[inner_version] 980 remote_inner = _GetRemoteInnerAddress(inner_version) 981 982 self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner) 983 finally: 984 tunnel.TeardownXfrm() 985 tunnel.SetupXfrm(False) 986 987 988class XfrmVtiTest(XfrmTunnelBase): 989 990 INTERFACE_CLASS = VtiInterface 991 992 def ParamTestVtiInput(self, inner_version, outer_version): 993 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) 994 995 def ParamTestVtiOutput(self, inner_version, outer_version): 996 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 997 True) 998 999 def ParamTestVtiInOutEncrypted(self, inner_version, outer_version): 1000 self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, 1001 False) 1002 1003 def ParamTestVtiIcmp(self, inner_version, outer_version): 1004 self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) 1005 1006 def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version): 1007 self._TestTunnel(inner_version, outer_version, 1008 self._CheckTunnelEncryptionWithIcmp, False) 1009 1010 def ParamTestVtiRekey(self, inner_version, outer_version): 1011 self._TestTunnelRekey(inner_version, outer_version) 1012 1013 1014@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") 1015class XfrmInterfaceTest(XfrmTunnelBase): 1016 1017 INTERFACE_CLASS = XfrmInterface 1018 1019 def ParamTestXfrmIntfInput(self, inner_version, outer_version): 1020 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) 1021 1022 def ParamTestXfrmIntfOutput(self, inner_version, outer_version): 1023 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 1024 True) 1025 1026 def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version): 1027 self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, 1028 False) 1029 1030 def ParamTestXfrmIntfIcmp(self, inner_version, outer_version): 1031 self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) 1032 1033 def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): 1034 self._TestTunnel(inner_version, outer_version, 1035 self._CheckTunnelEncryptionWithIcmp, False) 1036 1037 def ParamTestXfrmIntfRekey(self, inner_version, outer_version): 1038 self._TestTunnelRekey(inner_version, outer_version) 1039 1040############################################################################## 1041# 1042# Test for presence of CONFIG_XFRM_MIGRATE and kernel patches 1043# 1044# xfrm: Check if_id in xfrm_migrate 1045# Upstream commit: c1aca3080e382886e2e58e809787441984a2f89b 1046# 1047# xfrm: Fix xfrm migrate issues when address family changes 1048# Upstream commit: e03c3bba351f99ad932e8f06baa9da1afc418e02 1049# 1050# Those two upstream 5.17 fixes above were pulled in to LTS in kernel versions 1051# 4.14.273, 4.19.236, 5.4.186, 5.10.107, 5.15.30. 1052# 1053@unittest.skipUnless(SUPPORTS_XFRM_MIGRATE, 1054 "XFRM migration unsupported or fixes not included") 1055class XfrmInterfaceMigrateTest(XfrmTunnelBase): 1056 INTERFACE_CLASS = XfrmInterface 1057 1058 def setUpTunnel(self, outer_version, use_null_crypt): 1059 underlying_netid = self.RandomNetid() 1060 netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET 1061 iface = "ipsec%s" % netid 1062 ifindex = self.ifindices[underlying_netid] 1063 1064 local = self.MyAddress(outer_version, underlying_netid) 1065 remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR 1066 1067 tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex, 1068 local, remote, outer_version, use_null_crypt) 1069 self._SetInboundMarking(netid, iface, True) 1070 self._SetupTunnelNetwork(tunnel, True) 1071 1072 return tunnel 1073 1074 def tearDownTunnel(self, tunnel): 1075 self._SetInboundMarking(tunnel.netid, tunnel.iface, False) 1076 self._SetupTunnelNetwork(tunnel, False) 1077 tunnel.Teardown() 1078 1079 def _TestTunnel(self, inner_version, outer_version, new_outer_version, func, 1080 use_null_crypt): 1081 tunnel = self.randomTunnel(outer_version) 1082 1083 old_underlying_netid = tunnel.underlying_netid 1084 old_local = tunnel.local 1085 old_remote = tunnel.remote 1086 1087 1088 try: 1089 self._RebuildTunnel(tunnel, use_null_crypt) 1090 1091 # Verify functionality before migration 1092 local_inner = tunnel.addrs[inner_version] 1093 remote_inner = _GetRemoteInnerAddress(inner_version) 1094 func(tunnel, inner_version, local_inner, remote_inner) 1095 1096 # Migrate tunnel 1097 new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid) 1098 new_version = new_outer_version 1099 new_local = self.MyAddress(new_version, new_underlying_netid) 1100 new_remote = net_test.IPV4_ADDR2 if new_version == 4 else net_test.IPV6_ADDR2 1101 1102 tunnel.Migrate(new_underlying_netid, new_local, new_remote) 1103 1104 # Verify functionality after migration 1105 func(tunnel, inner_version, local_inner, remote_inner) 1106 finally: 1107 # Reset the tunnel to the original configuration 1108 tunnel.TeardownXfrm() 1109 1110 self.local = old_local 1111 self.remote = old_remote 1112 self.underlying_netid = old_underlying_netid 1113 tunnel.SetupXfrm(False) 1114 1115 1116 def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version, 1117 new_outer_version): 1118 self._TestTunnel(inner_version, outer_version, new_outer_version, 1119 self._CheckTunnelInput, True) 1120 1121 def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version, 1122 new_outer_version): 1123 self._TestTunnel(inner_version, outer_version, new_outer_version, 1124 self._CheckTunnelOutput, True) 1125 1126 def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version, 1127 new_outer_version): 1128 self._TestTunnel(inner_version, outer_version, new_outer_version, 1129 self._CheckTunnelEncryption, False) 1130 1131 def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version, 1132 new_outer_version): 1133 self._TestTunnel(inner_version, outer_version, new_outer_version, 1134 self._CheckTunnelIcmp, False) 1135 1136 def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version, 1137 new_outer_version): 1138 self._TestTunnel(inner_version, outer_version, new_outer_version, 1139 self._CheckTunnelEncryptionWithIcmp, False) 1140 1141 def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version, 1142 new_outer_version): 1143 self._TestTunnel(inner_version, outer_version, new_outer_version, 1144 self._CheckTunnelRekey, True) 1145 1146if __name__ == "__main__": 1147 InjectTests() 1148 unittest.main() 1149