1#!/usr/bin/python 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import 18from errno import * # pylint: disable=wildcard-import 19from socket import * # pylint: disable=wildcard-import 20 21import random 22import itertools 23import struct 24import unittest 25 26from scapy import all as scapy 27from tun_twister import TunTwister 28import csocket 29import iproute 30import multinetwork_base 31import net_test 32import packets 33import util 34import xfrm 35import xfrm_base 36 37_LOOPBACK_IFINDEX = 1 38_TEST_XFRM_IFNAME = "ipsec42" 39_TEST_XFRM_IF_ID = 42 40_TEST_SPI = 0x1234 41 42# Does the kernel support xfrmi interfaces? 43def HaveXfrmInterfaces(): 44 if net_test.LINUX_VERSION >= (4, 19, 0): 45 return True 46 47 try: 48 i = iproute.IPRoute() 49 i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, 50 _LOOPBACK_IFINDEX) 51 i.DeleteLink(_TEST_XFRM_IFNAME) 52 try: 53 i.GetIfIndex(_TEST_XFRM_IFNAME) 54 assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME 55 except IOError: 56 pass 57 return True 58 except IOError: 59 return False 60 61HAVE_XFRM_INTERFACES = HaveXfrmInterfaces() 62 63# Does the kernel support CONFIG_XFRM_MIGRATE? 64def SupportsXfrmMigrate(): 65 if net_test.LINUX_VERSION >= (5, 10, 0): 66 return True 67 68 # XFRM_MIGRATE depends on xfrmi interfaces 69 if not HAVE_XFRM_INTERFACES: 70 return False 71 72 try: 73 x = xfrm.Xfrm() 74 wildcard_addr = net_test.GetWildcardAddress(6) 75 selector = xfrm.EmptySelector(AF_INET6) 76 77 # Expect migration to fail with EINVAL because it is trying to migrate a 78 # non-existent SA. 79 x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr, 80 wildcard_addr, wildcard_addr, _TEST_SPI, 81 None, None, None, None, None, None) 82 print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled") 83 return True 84 except IOError as err: 85 if err.errno == ENOPROTOOPT: 86 return False 87 elif err.errno == EINVAL: 88 return True 89 else: 90 print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno) 91 return True 92 93SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate() 94 95# Parameters to setup tunnels as special networks 96_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService 97_BASE_TUNNEL_NETID = {4: 40, 6: 60} 98_BASE_VTI_OKEY = 2000000100 99_BASE_VTI_IKEY = 2000000200 100 101_TEST_OUT_SPI = _TEST_SPI 102_TEST_IN_SPI = _TEST_OUT_SPI 103 104_TEST_OKEY = 2000000100 105_TEST_IKEY = 2000000200 106 107_TEST_REMOTE_PORT = 1234 108 109_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6} 110 111 112def _GetLocalInnerAddress(version): 113 return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version] 114 115 116def _GetRemoteInnerAddress(version): 117 return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version] 118 119 120def _GetRemoteOuterAddress(version): 121 return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] 122 123 124def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer, 125 src_port, dst_inner, dst_outer, 126 dst_port, spi, seq_num, ip_hdr_options=None): 127 if ip_hdr_options is None: 128 ip_hdr_options = {} 129 130 ip_hdr_options.update({'src': src_inner, 'dst': dst_inner}) 131 132 # Build and receive an ESP packet destined for the inner socket 133 IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] 134 input_pkt = ( 135 IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) / 136 net_test.UDP_PAYLOAD) 137 input_pkt = IpType(str(input_pkt)) # Compute length, checksum. 138 input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num, 139 (src_outer, dst_outer)) 140 141 return input_pkt 142 143 144def _CreateReceiveSock(version, port=0): 145 # Create a socket to receive packets. 146 read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) 147 read_sock.bind((net_test.GetWildcardAddress(version), port)) 148 # The second parameter of the tuple is the port number regardless of AF. 149 local_port = read_sock.getsockname()[1] 150 # Guard against the eventuality of the receive failing. 151 csocket.SetSocketTimeout(read_sock, 500) 152 153 return read_sock, local_port 154 155 156def _SendPacket(testInstance, netid, version, remote, remote_port): 157 # Send a packet out via the tunnel-backed network, bound for the port number 158 # of the input socket. 159 write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) 160 testInstance.SelectInterface(write_sock, netid, "mark") 161 write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port)) 162 local_port = write_sock.getsockname()[1] 163 164 return local_port 165 166 167def InjectTests(): 168 InjectParameterizedTests(XfrmTunnelTest) 169 InjectParameterizedTests(XfrmInterfaceTest) 170 InjectParameterizedTests(XfrmVtiTest) 171 InjectParameterizedTests(XfrmInterfaceMigrateTest) 172 173 174def InjectParameterizedTests(cls): 175 VERSIONS = (4, 6) 176 param_list = itertools.product(VERSIONS, VERSIONS) 177 178 def NameGenerator(*args): 179 return "IPv%d_in_IPv%d" % tuple(args) 180 181 util.InjectParameterizedTest(cls, param_list, NameGenerator) 182 183 184class XfrmTunnelTest(xfrm_base.XfrmLazyTest): 185 186 def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid, 187 netid, local_inner, remote_inner, local_outer, 188 remote_outer, write_sock): 189 190 write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) 191 self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, 192 local_outer, remote_outer) 193 194 def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid, 195 netid, local_inner, remote_inner, local_outer, 196 remote_outer, read_sock): 197 198 # The second parameter of the tuple is the port number regardless of AF. 199 local_port = read_sock.getsockname()[1] 200 201 # Build and receive an ESP packet destined for the inner socket 202 input_pkt = _GetNullAuthCryptTunnelModePkt( 203 inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT, 204 local_inner, local_outer, local_port, _TEST_IN_SPI, 1) 205 self.ReceivePacketOn(underlying_netid, input_pkt) 206 207 # Verify that the packet data and src are correct 208 data, src = read_sock.recvfrom(4096) 209 self.assertEqual(net_test.UDP_PAYLOAD, data) 210 self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) 211 212 def _TestTunnel(self, inner_version, outer_version, func, direction, 213 test_output_mark_unset): 214 """Test a unidirectional XFRM Tunnel with explicit selectors""" 215 # Select the underlying netid, which represents the external 216 # interface from/to which to route ESP packets. 217 u_netid = self.RandomNetid() 218 # Select a random netid that will originate traffic locally and 219 # which represents the netid on which the plaintext is sent 220 netid = self.RandomNetid(exclude=u_netid) 221 222 local_inner = self.MyAddress(inner_version, netid) 223 remote_inner = _GetRemoteInnerAddress(inner_version) 224 local_outer = self.MyAddress(outer_version, u_netid) 225 remote_outer = _GetRemoteOuterAddress(outer_version) 226 227 output_mark = u_netid 228 if test_output_mark_unset: 229 output_mark = None 230 self.SetDefaultNetwork(u_netid) 231 232 try: 233 # Create input/ouput SPs, SAs and sockets to simulate a more realistic 234 # environment. 235 self.xfrm.CreateTunnel( 236 xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner), 237 remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL, 238 xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL) 239 240 self.xfrm.CreateTunnel( 241 xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner), 242 local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, 243 xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL) 244 245 write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) 246 self.SelectInterface(write_sock, netid, "mark") 247 read_sock, _ = _CreateReceiveSock(inner_version) 248 249 sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock 250 func(inner_version, outer_version, u_netid, netid, local_inner, 251 remote_inner, local_outer, remote_outer, sock) 252 finally: 253 if test_output_mark_unset: 254 self.ClearDefaultNetwork() 255 256 def ParamTestTunnelInput(self, inner_version, outer_version): 257 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, 258 xfrm.XFRM_POLICY_IN, False) 259 260 def ParamTestTunnelOutput(self, inner_version, outer_version): 261 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 262 xfrm.XFRM_POLICY_OUT, False) 263 264 def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version): 265 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 266 xfrm.XFRM_POLICY_OUT, True) 267 268 269@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") 270class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): 271 def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, 272 ikey, okey): 273 self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey) 274 self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey) 275 276 family = AF_INET if version == 4 else AF_INET6 277 self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), 278 local_addr) 279 self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), 280 remote_addr) 281 282 def testAddVti(self): 283 """Test the creation of a Virtual Tunnel Interface.""" 284 for version in [4, 6]: 285 netid = self.RandomNetid() 286 local_addr = self.MyAddress(version, netid) 287 self.iproute.CreateVirtualTunnelInterface( 288 dev_name=_TEST_XFRM_IFNAME, 289 local_addr=local_addr, 290 remote_addr=_GetRemoteOuterAddress(version), 291 o_key=_TEST_OKEY, 292 i_key=_TEST_IKEY) 293 self._VerifyVtiInfoData( 294 self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, 295 _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY) 296 297 new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} 298 new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID 299 new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID 300 self.iproute.CreateVirtualTunnelInterface( 301 dev_name=_TEST_XFRM_IFNAME, 302 local_addr=local_addr, 303 remote_addr=new_remote_addr[version], 304 o_key=new_okey, 305 i_key=new_ikey, 306 is_update=True) 307 308 self._VerifyVtiInfoData( 309 self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, 310 new_remote_addr[version], new_ikey, new_okey) 311 312 if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 313 314 # Validate that the netlink interface matches the ioctl interface. 315 self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) 316 self.iproute.DeleteLink(_TEST_XFRM_IFNAME) 317 with self.assertRaises(IOError): 318 self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 319 320 def _QuietDeleteLink(self, ifname): 321 try: 322 self.iproute.DeleteLink(ifname) 323 except IOError: 324 # The link was not present. 325 pass 326 327 def tearDown(self): 328 super(XfrmAddDeleteVtiTest, self).tearDown() 329 self._QuietDeleteLink(_TEST_XFRM_IFNAME) 330 331 332class SaInfo(object): 333 334 def __init__(self, spi): 335 self.spi = spi 336 self.seq_num = 1 337 338 339class IpSecBaseInterface(object): 340 341 def __init__(self, iface, netid, underlying_netid, local, remote, version): 342 self.iface = iface 343 self.netid = netid 344 self.underlying_netid = underlying_netid 345 self.local, self.remote = local, remote 346 347 # XFRM interfaces technically do not have a version. This keeps track of 348 # the IP version of the local and remote addresses. 349 self.version = version 350 self.rx = self.tx = 0 351 self.addrs = {} 352 353 self.iproute = iproute.IPRoute() 354 self.xfrm = xfrm.Xfrm() 355 356 def Teardown(self): 357 self.TeardownXfrm() 358 self.TeardownInterface() 359 360 def TeardownInterface(self): 361 self.iproute.DeleteLink(self.iface) 362 363 def SetupXfrm(self, use_null_crypt): 364 rand_spi = random.randint(0, 0x7fffffff) 365 self.in_sa = SaInfo(rand_spi) 366 self.out_sa = SaInfo(rand_spi) 367 368 # Select algorithms: 369 if use_null_crypt: 370 auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL 371 else: 372 auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256 373 374 self.auth = auth 375 self.crypt = crypt 376 377 self._SetupXfrmByType(auth, crypt) 378 379 def Rekey(self, outer_family, new_out_sa, new_in_sa): 380 """Rekeys the Tunnel Interface 381 382 Creates new SAs and updates the outbound security policy to use new SAs. 383 384 Args: 385 outer_family: AF_INET or AF_INET6 386 new_out_sa: An SaInfo struct representing the new outbound SA's info 387 new_in_sa: An SaInfo struct representing the new inbound SA's info 388 """ 389 self._Rekey(outer_family, new_out_sa, new_in_sa) 390 391 # Update Interface object 392 self.out_sa = new_out_sa 393 self.in_sa = new_in_sa 394 395 def TeardownXfrm(self): 396 raise NotImplementedError("Subclasses should implement this") 397 398 def _SetupXfrmByType(self, auth_algo, crypt_algo): 399 raise NotImplementedError("Subclasses should implement this") 400 401 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 402 raise NotImplementedError("Subclasses should implement this") 403 404 405class VtiInterface(IpSecBaseInterface): 406 407 def __init__(self, iface, netid, underlying_netid, _, local, remote, version): 408 super(VtiInterface, self).__init__(iface, netid, underlying_netid, local, 409 remote, version) 410 411 self.ikey = _TEST_IKEY + netid 412 self.okey = _TEST_OKEY + netid 413 414 self.SetupInterface() 415 self.SetupXfrm(False) 416 417 def SetupInterface(self): 418 return self.iproute.CreateVirtualTunnelInterface( 419 self.iface, self.local, self.remote, self.ikey, self.okey) 420 421 def _SetupXfrmByType(self, auth_algo, crypt_algo): 422 # For the VTI, the selectors are wildcard since packets will only 423 # be selected if they have the appropriate mark, hence the inner 424 # addresses are wildcard. 425 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 426 self.out_sa.spi, crypt_algo, auth_algo, 427 xfrm.ExactMatchMark(self.okey), 428 self.underlying_netid, None, xfrm.MATCH_METHOD_ALL) 429 430 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 431 self.in_sa.spi, crypt_algo, auth_algo, 432 xfrm.ExactMatchMark(self.ikey), None, None, 433 xfrm.MATCH_METHOD_MARK) 434 435 def TeardownXfrm(self): 436 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, 437 self.out_sa.spi, self.okey, None) 438 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, 439 self.in_sa.spi, self.ikey, None) 440 441 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 442 # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly 443 # the same, but rekeys are asymmetric, and only update the outbound 444 # policy. 445 self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi, 446 xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, 447 xfrm_base._ALGO_AUTH_NULL, None, None, 448 xfrm.ExactMatchMark(self.okey), self.underlying_netid) 449 450 self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi, 451 xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, 452 xfrm_base._ALGO_AUTH_NULL, None, None, 453 xfrm.ExactMatchMark(self.ikey), None) 454 455 # Create new policies for IPv4 and IPv6. 456 for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: 457 # Add SPI-specific output policy to enforce using new outbound SPI 458 policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) 459 tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, 460 (self.local, self.remote)) 461 self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey), 462 0) 463 464 def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): 465 self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, 466 xfrm.ExactMatchMark(self.ikey)) 467 self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, 468 xfrm.ExactMatchMark(self.okey)) 469 470 471@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") 472class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): 473 """Test the creation of an XFRM Interface.""" 474 475 def testAddXfrmInterface(self): 476 self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, 477 _LOOPBACK_IFINDEX) 478 if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 479 net_test.SetInterfaceUp(_TEST_XFRM_IFNAME) 480 481 # Validate that the netlink interface matches the ioctl interface. 482 self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) 483 self.iproute.DeleteLink(_TEST_XFRM_IFNAME) 484 with self.assertRaises(IOError): 485 self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) 486 487 488class XfrmInterface(IpSecBaseInterface): 489 490 def __init__(self, iface, netid, underlying_netid, ifindex, local, remote, 491 version, use_null_crypt=False): 492 super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local, 493 remote, version) 494 495 self.ifindex = ifindex 496 self.xfrm_if_id = netid 497 498 self.SetupInterface() 499 self.SetupXfrm(use_null_crypt) 500 501 def SetupInterface(self): 502 """Create an XFRM interface.""" 503 return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex) 504 505 def _SetupXfrmByType(self, auth_algo, crypt_algo): 506 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 507 self.out_sa.spi, crypt_algo, auth_algo, None, 508 self.underlying_netid, self.xfrm_if_id, 509 xfrm.MATCH_METHOD_ALL) 510 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 511 self.in_sa.spi, crypt_algo, auth_algo, None, None, 512 self.xfrm_if_id, xfrm.MATCH_METHOD_IFID) 513 514 def TeardownXfrm(self): 515 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, 516 self.out_sa.spi, None, self.xfrm_if_id) 517 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, 518 self.in_sa.spi, None, self.xfrm_if_id) 519 520 def _Rekey(self, outer_family, new_out_sa, new_in_sa): 521 # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly 522 # the same, but rekeys are asymmetric, and only update the outbound 523 # policy. 524 self.xfrm.AddSaInfo( 525 self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, 526 xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, 527 None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id) 528 529 self.xfrm.AddSaInfo( 530 self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, 531 xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, 532 None, None, xfrm_if_id=self.xfrm_if_id) 533 534 # Create new policies for IPv4 and IPv6. 535 for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: 536 # Add SPI-specific output policy to enforce using new outbound SPI 537 policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) 538 tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, 539 (self.local, self.remote)) 540 self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id) 541 542 def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): 543 self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None, 544 self.xfrm_if_id) 545 self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None, 546 self.xfrm_if_id) 547 548 def Migrate(self, new_underlying_netid, new_local, new_remote): 549 self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 550 new_remote, new_local, self.in_sa.spi, 551 self.crypt, self.auth, None, None, 552 new_underlying_netid, self.xfrm_if_id) 553 554 self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 555 new_local, new_remote, self.out_sa.spi, 556 self.crypt, self.auth, None, None, 557 new_underlying_netid, self.xfrm_if_id) 558 559 self.local = new_local 560 self.remote = new_remote 561 self.underlying_netid = new_underlying_netid 562 563 564class XfrmTunnelBase(xfrm_base.XfrmBaseTest): 565 566 # Subclass that does not allow multiple tunnels (e.g. XfrmInterfaceMigrateTest) 567 # should override this method. 568 @classmethod 569 def allowMultipleTunnels(cls): 570 return True 571 572 @classmethod 573 def setUpClass(cls): 574 xfrm_base.XfrmBaseTest.setUpClass() 575 # Tunnel interfaces use marks extensively, so configure realistic packet 576 # marking rules to make the test representative, make PMTUD work, etc. 577 cls.SetInboundMarks(True) 578 cls.SetMarkReflectSysctls(1) 579 580 # Group by tunnel version to ensure that we test at least one IPv4 and one 581 # IPv6 tunnel 582 cls.tunnelsV4 = {} 583 cls.tunnelsV6 = {} 584 585 if not cls.allowMultipleTunnels(): 586 return 587 588 for i, underlying_netid in enumerate(cls.tuns): 589 for version in 4, 6: 590 netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i 591 iface = "ipsec%s" % netid 592 local = cls.MyAddress(version, underlying_netid) 593 if version == 4: 594 remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2) 595 else: 596 remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2) 597 598 ifindex = cls.ifindices[underlying_netid] 599 tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex, 600 local, remote, version) 601 cls._SetInboundMarking(netid, iface, True) 602 cls._SetupTunnelNetwork(tunnel, True) 603 604 if version == 4: 605 cls.tunnelsV4[netid] = tunnel 606 else: 607 cls.tunnelsV6[netid] = tunnel 608 609 @classmethod 610 def tearDownClass(cls): 611 # The sysctls are restored by MultinetworkBaseTest.tearDownClass. 612 cls.SetInboundMarks(False) 613 for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()): 614 cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) 615 cls._SetupTunnelNetwork(tunnel, False) 616 tunnel.Teardown() 617 xfrm_base.XfrmBaseTest.tearDownClass() 618 619 def randomTunnel(self, outer_version): 620 version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6 621 return random.choice(list(version_dict.values())) 622 623 def setUp(self): 624 multinetwork_base.MultiNetworkBaseTest.setUp(self) 625 self.iproute = iproute.IPRoute() 626 self.xfrm = xfrm.Xfrm() 627 628 def tearDown(self): 629 multinetwork_base.MultiNetworkBaseTest.tearDown(self) 630 631 def _SwapInterfaceAddress(self, ifname, old_addr, new_addr): 632 """Exchange two addresses on a given interface. 633 634 Args: 635 ifname: Name of the interface 636 old_addr: An address to be removed from the interface 637 new_addr: An address to be added to an interface 638 """ 639 version = 6 if ":" in new_addr else 4 640 ifindex = net_test.GetInterfaceIndex(ifname) 641 self.iproute.AddAddress(new_addr, 642 net_test.AddressLengthBits(version), ifindex) 643 self.iproute.DelAddress(old_addr, 644 net_test.AddressLengthBits(version), ifindex) 645 646 @classmethod 647 def _GetLocalAddress(cls, version, netid): 648 if version == 4: 649 return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET) 650 else: 651 return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1" 652 653 @classmethod 654 def _SetupTunnelNetwork(cls, tunnel, is_add): 655 """Setup rules and routes for a tunnel Network. 656 657 Takes an interface and depending on the boolean 658 value of is_add, either adds or removes the rules 659 and routes for a tunnel interface to behave like an 660 Android Network for purposes of testing. 661 662 Args: 663 tunnel: A VtiInterface or XfrmInterface, the tunnel to set up. 664 is_add: Boolean that causes this method to perform setup if True or 665 teardown if False 666 """ 667 if is_add: 668 # Disable router solicitations to avoid occasional spurious packets 669 # arriving on the underlying network; there are two possible behaviors 670 # when that occurred: either only the RA packet is read, and when it 671 # is echoed back to the tunnel, it causes the test to fail by not 672 # receiving # the UDP_PAYLOAD; or, two packets may arrive on the 673 # underlying # network which fails the assertion that only one ESP packet 674 # is received. 675 cls.SetSysctl( 676 "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0) 677 net_test.SetInterfaceUp(tunnel.iface) 678 679 for version in [4, 6]: 680 ifindex = net_test.GetInterfaceIndex(tunnel.iface) 681 table = tunnel.netid 682 683 # Set up routing rules. 684 start, end = cls.UidRangeForNetid(tunnel.netid) 685 cls.iproute.UidRangeRule(version, is_add, start, end, table, 686 cls.PRIORITY_UID) 687 cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF) 688 cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK, 689 table, cls.PRIORITY_FWMARK) 690 691 # Configure IP addresses. 692 addr = cls._GetLocalAddress(version, tunnel.netid) 693 prefixlen = net_test.AddressLengthBits(version) 694 tunnel.addrs[version] = addr 695 if is_add: 696 cls.iproute.AddAddress(addr, prefixlen, ifindex) 697 cls.iproute.AddRoute(version, table, "default", 0, None, ifindex) 698 else: 699 cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) 700 cls.iproute.DelAddress(addr, prefixlen, ifindex) 701 702 def assertReceivedPacket(self, tunnel, sa_info): 703 tunnel.rx += 1 704 self.assertEqual((tunnel.rx, tunnel.tx), 705 self.iproute.GetRxTxPackets(tunnel.iface)) 706 sa_info.seq_num += 1 707 708 def assertSentPacket(self, tunnel, sa_info): 709 tunnel.tx += 1 710 self.assertEqual((tunnel.rx, tunnel.tx), 711 self.iproute.GetRxTxPackets(tunnel.iface)) 712 sa_info.seq_num += 1 713 714 def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner, 715 sa_info=None, expect_fail=False): 716 """Test null-crypt input path over an IPsec interface.""" 717 if sa_info is None: 718 sa_info = tunnel.in_sa 719 read_sock, local_port = _CreateReceiveSock(inner_version) 720 721 input_pkt = _GetNullAuthCryptTunnelModePkt( 722 inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT, 723 local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num) 724 self.ReceivePacketOn(tunnel.underlying_netid, input_pkt) 725 726 if expect_fail: 727 self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096) 728 else: 729 # Verify that the packet data and src are correct 730 data, src = read_sock.recvfrom(4096) 731 self.assertReceivedPacket(tunnel, sa_info) 732 self.assertEqual(net_test.UDP_PAYLOAD, data) 733 self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) 734 735 def _CheckTunnelOutput(self, tunnel, inner_version, local_inner, 736 remote_inner, sa_info=None): 737 """Test null-crypt output path over an IPsec interface.""" 738 if sa_info is None: 739 sa_info = tunnel.out_sa 740 local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 741 _TEST_REMOTE_PORT) 742 743 # Read a tunneled IP packet on the underlying (outbound) network 744 # verifying that it is an ESP packet. 745 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, 746 sa_info.seq_num, None, tunnel.local, 747 tunnel.remote) 748 749 # Get and update the IP headers on the inner payload so that we can do a simple 750 # comparison of byte data. Unfortunately, due to the scapy version this runs on, 751 # we cannot parse past the ESP header to the inner IP header, and thus have to 752 # workaround in this manner 753 if inner_version == 4: 754 ip_hdr_options = { 755 'id': scapy.IP(str(pkt.payload)[8:]).id, 756 'flags': scapy.IP(str(pkt.payload)[8:]).flags 757 } 758 else: 759 ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl} 760 761 expected = _GetNullAuthCryptTunnelModePkt( 762 inner_version, local_inner, tunnel.local, local_port, remote_inner, 763 tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num, 764 ip_hdr_options) 765 766 # Check outer header manually (Avoids having to overwrite outer header's 767 # id, flags or flow label) 768 self.assertSentPacket(tunnel, sa_info) 769 self.assertEqual(expected.src, pkt.src) 770 self.assertEqual(expected.dst, pkt.dst) 771 self.assertEqual(len(expected), len(pkt)) 772 773 # Check everything else 774 self.assertEqual(str(expected.payload), str(pkt.payload)) 775 776 def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner, 777 remote_inner): 778 """Test both input and output paths over an encrypted IPsec interface. 779 780 This tests specifically makes sure that the both encryption and decryption 781 work together, as opposed to the _CheckTunnel(Input|Output) where the 782 input and output paths are tested separately, and using null encryption. 783 """ 784 src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 785 _TEST_REMOTE_PORT) 786 787 # Make sure it appeared on the underlying interface 788 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi, 789 tunnel.out_sa.seq_num, None, tunnel.local, 790 tunnel.remote) 791 792 # Check that packet is not sent in plaintext 793 self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt)) 794 795 # Check src/dst 796 self.assertEqual(tunnel.local, pkt.src) 797 self.assertEqual(tunnel.remote, pkt.dst) 798 799 # Check that the interface statistics recorded the outbound packet 800 self.assertSentPacket(tunnel, tunnel.out_sa) 801 802 try: 803 # Swap the interface addresses to pretend we are the remote 804 self._SwapInterfaceAddress( 805 tunnel.iface, new_addr=remote_inner, old_addr=local_inner) 806 807 # Swap the packet's IP headers and write it back to the underlying 808 # network. 809 pkt = TunTwister.TwistPacket(pkt) 810 read_sock, local_port = _CreateReceiveSock(inner_version, 811 _TEST_REMOTE_PORT) 812 self.ReceivePacketOn(tunnel.underlying_netid, pkt) 813 814 # Verify that the packet data and src are correct 815 data, src = read_sock.recvfrom(4096) 816 self.assertEqual(net_test.UDP_PAYLOAD, data) 817 self.assertEqual((local_inner, src_port), src[:2]) 818 819 # Check that the interface statistics recorded the inbound packet 820 self.assertReceivedPacket(tunnel, tunnel.in_sa) 821 finally: 822 # Swap the interface addresses to pretend we are the remote 823 self._SwapInterfaceAddress( 824 tunnel.iface, new_addr=local_inner, old_addr=remote_inner) 825 826 def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner, 827 sa_info=None): 828 """Test ICMP error path over an IPsec interface.""" 829 if sa_info is None: 830 sa_info = tunnel.out_sa 831 # Now attempt to provoke an ICMP error. 832 # TODO: deduplicate with multinetwork_test.py. 833 dst_prefix, intermediate = { 834 4: ("172.19.", "172.16.9.12"), 835 6: ("2001:db8::", "2001:db8::1") 836 }[tunnel.version] 837 838 local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, 839 _TEST_REMOTE_PORT) 840 pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, 841 sa_info.seq_num, None, tunnel.local, 842 tunnel.remote) 843 self.assertSentPacket(tunnel, sa_info) 844 845 myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid) 846 _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr, 847 pkt) 848 self.ReceivePacketOn(tunnel.underlying_netid, toobig) 849 850 # Check that the packet too big reduced the MTU. 851 routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None) 852 self.assertEqual(1, len(routes)) 853 rtmsg, attributes = routes[0] 854 self.assertEqual(iproute.RTN_UNICAST, rtmsg.type) 855 self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) 856 857 # Clear PMTU information so that future tests don't have to worry about it. 858 self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid) 859 860 def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner, 861 remote_inner): 862 """Test combined encryption path with ICMP errors over an IPsec tunnel""" 863 self._CheckTunnelEncryption(tunnel, inner_version, local_inner, 864 remote_inner) 865 self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner) 866 self._CheckTunnelEncryption(tunnel, inner_version, local_inner, 867 remote_inner) 868 869 def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): 870 """Bootstrap method to setup and run tests for the given parameters.""" 871 tunnel = self.randomTunnel(outer_version) 872 873 try: 874 # Some tests require that the out_seq_num and in_seq_num are the same 875 # (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1 876 # 877 # Until we get better scapy support, the only way we can build an 878 # encrypted packet is to send it out, and read the packet from the wire. 879 # We then generally use this as the "inbound" encrypted packet, injecting 880 # it into the interface for which it is expected on. 881 # 882 # As such, this is required to ensure that encrypted packets (which we 883 # currently have no way to easily modify) are not considered replay 884 # attacks by the inbound SA. (eg: received 3 packets, seq_num_in = 3, 885 # sent only 1, # seq_num_out = 1, inbound SA would consider this a replay 886 # attack) 887 tunnel.TeardownXfrm() 888 tunnel.SetupXfrm(use_null_crypt) 889 890 local_inner = tunnel.addrs[inner_version] 891 remote_inner = _GetRemoteInnerAddress(inner_version) 892 893 for i in range(2): 894 func(tunnel, inner_version, local_inner, remote_inner) 895 finally: 896 if use_null_crypt: 897 tunnel.TeardownXfrm() 898 tunnel.SetupXfrm(False) 899 900 def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner): 901 old_out_sa = tunnel.out_sa 902 old_in_sa = tunnel.in_sa 903 904 # Check to make sure that both directions work before rekey 905 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 906 old_in_sa) 907 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 908 old_out_sa) 909 910 # Rekey 911 outer_family = net_test.GetAddressFamily(tunnel.version) 912 913 # Create new SA 914 # Distinguish the new SAs with new SPIs. 915 new_out_sa = SaInfo(old_out_sa.spi + 1) 916 new_in_sa = SaInfo(old_in_sa.spi + 1) 917 918 # Perform Rekey 919 tunnel.Rekey(outer_family, new_out_sa, new_in_sa) 920 921 # Expect that the old SPI still works for inbound packets 922 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 923 old_in_sa) 924 925 # Test both paths with new SPIs, expect outbound to use new SPI 926 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 927 new_in_sa) 928 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 929 new_out_sa) 930 931 # Delete old SAs 932 tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi) 933 934 # Test both paths with new SPIs; should still work 935 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 936 new_in_sa) 937 self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, 938 new_out_sa) 939 940 # Expect failure upon trying to receive a packet with the deleted SPI 941 self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, 942 old_in_sa, True) 943 944 def _TestTunnelRekey(self, inner_version, outer_version): 945 """Test packet input and output over a Virtual Tunnel Interface.""" 946 tunnel = self.randomTunnel(outer_version) 947 948 try: 949 # Always use null_crypt, so we can check input and output separately 950 tunnel.TeardownXfrm() 951 tunnel.SetupXfrm(True) 952 953 local_inner = tunnel.addrs[inner_version] 954 remote_inner = _GetRemoteInnerAddress(inner_version) 955 956 self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner) 957 finally: 958 tunnel.TeardownXfrm() 959 tunnel.SetupXfrm(False) 960 961 962@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") 963class XfrmVtiTest(XfrmTunnelBase): 964 965 INTERFACE_CLASS = VtiInterface 966 967 def ParamTestVtiInput(self, inner_version, outer_version): 968 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) 969 970 def ParamTestVtiOutput(self, inner_version, outer_version): 971 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 972 True) 973 974 def ParamTestVtiInOutEncrypted(self, inner_version, outer_version): 975 self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, 976 False) 977 978 def ParamTestVtiIcmp(self, inner_version, outer_version): 979 self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) 980 981 def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version): 982 self._TestTunnel(inner_version, outer_version, 983 self._CheckTunnelEncryptionWithIcmp, False) 984 985 def ParamTestVtiRekey(self, inner_version, outer_version): 986 self._TestTunnelRekey(inner_version, outer_version) 987 988 989@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") 990class XfrmInterfaceTest(XfrmTunnelBase): 991 992 INTERFACE_CLASS = XfrmInterface 993 994 def ParamTestXfrmIntfInput(self, inner_version, outer_version): 995 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) 996 997 def ParamTestXfrmIntfOutput(self, inner_version, outer_version): 998 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 999 True) 1000 1001 def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version): 1002 self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, 1003 False) 1004 1005 def ParamTestXfrmIntfIcmp(self, inner_version, outer_version): 1006 self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) 1007 1008 def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): 1009 self._TestTunnel(inner_version, outer_version, 1010 self._CheckTunnelEncryptionWithIcmp, False) 1011 1012 def ParamTestXfrmIntfRekey(self, inner_version, outer_version): 1013 self._TestTunnelRekey(inner_version, outer_version) 1014 1015@unittest.skipUnless(SUPPORTS_XFRM_MIGRATE, "XFRM migration unsupported") 1016class XfrmInterfaceMigrateTest(XfrmTunnelBase): 1017 # TODO: b/172497215 There is a kernel issue that XFRM_MIGRATE cannot work correctly 1018 # when there are multiple tunnels with the same selectors. Thus before this issue 1019 # is fixed, #allowMultipleTunnels must be overridden to avoid setting up multiple 1020 # tunnels. This need to be removed after the kernel issue is fixed. 1021 @classmethod 1022 def allowMultipleTunnels(cls): 1023 return False 1024 1025 def setUpTunnel(self, outer_version, use_null_crypt): 1026 underlying_netid = self.RandomNetid() 1027 netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET 1028 iface = "ipsec%s" % netid 1029 ifindex = self.ifindices[underlying_netid] 1030 1031 local = self.MyAddress(outer_version, underlying_netid) 1032 remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR 1033 1034 tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex, 1035 local, remote, outer_version, use_null_crypt) 1036 self._SetInboundMarking(netid, iface, True) 1037 self._SetupTunnelNetwork(tunnel, True) 1038 1039 return tunnel 1040 1041 def tearDownTunnel(self, tunnel): 1042 self._SetInboundMarking(tunnel.netid, tunnel.iface, False) 1043 self._SetupTunnelNetwork(tunnel, False) 1044 tunnel.Teardown() 1045 1046 def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): 1047 try: 1048 tunnel = self.setUpTunnel(outer_version, use_null_crypt) 1049 1050 # Verify functionality before migration 1051 local_inner = tunnel.addrs[inner_version] 1052 remote_inner = _GetRemoteInnerAddress(inner_version) 1053 func(tunnel, inner_version, local_inner, remote_inner) 1054 1055 # Migrate tunnel 1056 # TODO:b/169170981 Add tests that migrate 4 -> 6 and 6 -> 4 1057 new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid) 1058 new_local = self.MyAddress(outer_version, new_underlying_netid) 1059 new_remote = net_test.IPV4_ADDR2 if outer_version == 4 else net_test.IPV6_ADDR2 1060 1061 tunnel.Migrate(new_underlying_netid, new_local, new_remote) 1062 1063 # Verify functionality after migration 1064 func(tunnel, inner_version, local_inner, remote_inner) 1065 finally: 1066 self.tearDownTunnel(tunnel) 1067 1068 def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version): 1069 self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) 1070 1071 def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version): 1072 self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, 1073 True) 1074 1075 def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version): 1076 self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, 1077 False) 1078 1079 def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version): 1080 self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) 1081 1082 def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): 1083 self._TestTunnel(inner_version, outer_version, 1084 self._CheckTunnelEncryptionWithIcmp, False) 1085 1086 def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version): 1087 self._TestTunnel(inner_version, outer_version, self._CheckTunnelRekey, 1088 True) 1089 1090if __name__ == "__main__": 1091 InjectTests() 1092 unittest.main() 1093