• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
18from errno import *  # pylint: disable=wildcard-import
19from socket import *  # pylint: disable=wildcard-import
20
21import random
22import itertools
23import struct
24import unittest
25
26from scapy import all as scapy
27from tun_twister import TunTwister
28import csocket
29import iproute
30import multinetwork_base
31import net_test
32import packets
33import util
34import xfrm
35import xfrm_base
36
37_LOOPBACK_IFINDEX = 1
38_TEST_XFRM_IFNAME = "ipsec42"
39_TEST_XFRM_IF_ID = 42
40_TEST_SPI = 0x1234
41
42# Does the kernel support xfrmi interfaces?
43def HaveXfrmInterfaces():
44  if net_test.LINUX_VERSION >= (4, 19, 0):
45    return True
46
47  try:
48    i = iproute.IPRoute()
49    i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
50                          _LOOPBACK_IFINDEX)
51    i.DeleteLink(_TEST_XFRM_IFNAME)
52    try:
53      i.GetIfIndex(_TEST_XFRM_IFNAME)
54      assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME
55    except IOError:
56      pass
57    return True
58  except IOError:
59    return False
60
61HAVE_XFRM_INTERFACES = HaveXfrmInterfaces()
62
63# Does the kernel support CONFIG_XFRM_MIGRATE?
64def SupportsXfrmMigrate():
65  if net_test.LINUX_VERSION >= (5, 10, 0):
66    return True
67
68  # XFRM_MIGRATE depends on xfrmi interfaces
69  if not HAVE_XFRM_INTERFACES:
70    return False
71
72  try:
73    x = xfrm.Xfrm()
74    wildcard_addr = net_test.GetWildcardAddress(6)
75    selector = xfrm.EmptySelector(AF_INET6)
76
77    # Expect migration to fail with EINVAL because it is trying to migrate a
78    # non-existent SA.
79    x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr,
80                    wildcard_addr, wildcard_addr, _TEST_SPI,
81                    None, None, None, None, None, None)
82    print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled")
83    return True
84  except IOError as err:
85    if err.errno == ENOPROTOOPT:
86      return False
87    elif err.errno == EINVAL:
88      return True
89    else:
90      print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno)
91      return True
92
93SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate()
94
95# Parameters to setup tunnels as special networks
96_TUNNEL_NETID_OFFSET = 0xFC00  # Matches reserved netid range for IpSecService
97_BASE_TUNNEL_NETID = {4: 40, 6: 60}
98_BASE_VTI_OKEY = 2000000100
99_BASE_VTI_IKEY = 2000000200
100
101_TEST_OUT_SPI = _TEST_SPI
102_TEST_IN_SPI = _TEST_OUT_SPI
103
104_TEST_OKEY = 2000000100
105_TEST_IKEY = 2000000200
106
107_TEST_REMOTE_PORT = 1234
108
109_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6}
110
111
112def _GetLocalInnerAddress(version):
113  return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version]
114
115
116def _GetRemoteInnerAddress(version):
117  return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version]
118
119
120def _GetRemoteOuterAddress(version):
121  return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
122
123
124def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer,
125                                   src_port, dst_inner, dst_outer,
126                                   dst_port, spi, seq_num, ip_hdr_options=None):
127  if ip_hdr_options is None:
128    ip_hdr_options = {}
129
130  ip_hdr_options.update({'src': src_inner, 'dst': dst_inner})
131
132  # Build and receive an ESP packet destined for the inner socket
133  IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
134  input_pkt = (
135      IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) /
136      net_test.UDP_PAYLOAD)
137  input_pkt = IpType(str(input_pkt))  # Compute length, checksum.
138  input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num,
139                                              (src_outer, dst_outer))
140
141  return input_pkt
142
143
144def _CreateReceiveSock(version, port=0):
145  # Create a socket to receive packets.
146  read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
147  read_sock.bind((net_test.GetWildcardAddress(version), port))
148  # The second parameter of the tuple is the port number regardless of AF.
149  local_port = read_sock.getsockname()[1]
150  # Guard against the eventuality of the receive failing.
151  csocket.SetSocketTimeout(read_sock, 500)
152
153  return read_sock, local_port
154
155
156def _SendPacket(testInstance, netid, version, remote, remote_port):
157  # Send a packet out via the tunnel-backed network, bound for the port number
158  # of the input socket.
159  write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
160  testInstance.SelectInterface(write_sock, netid, "mark")
161  write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port))
162  local_port = write_sock.getsockname()[1]
163
164  return local_port
165
166
167def InjectTests():
168  InjectParameterizedTests(XfrmTunnelTest)
169  InjectParameterizedTests(XfrmInterfaceTest)
170  InjectParameterizedTests(XfrmVtiTest)
171  InjectParameterizedTests(XfrmInterfaceMigrateTest)
172
173
174def InjectParameterizedTests(cls):
175  VERSIONS = (4, 6)
176  param_list = itertools.product(VERSIONS, VERSIONS)
177
178  def NameGenerator(*args):
179    return "IPv%d_in_IPv%d" % tuple(args)
180
181  util.InjectParameterizedTest(cls, param_list, NameGenerator)
182
183
184class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
185
186  def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid,
187                         netid, local_inner, remote_inner, local_outer,
188                         remote_outer, write_sock):
189
190    write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
191    self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
192                            local_outer, remote_outer)
193
194  def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid,
195                        netid, local_inner, remote_inner, local_outer,
196                        remote_outer, read_sock):
197
198    # The second parameter of the tuple is the port number regardless of AF.
199    local_port = read_sock.getsockname()[1]
200
201    # Build and receive an ESP packet destined for the inner socket
202    input_pkt = _GetNullAuthCryptTunnelModePkt(
203        inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT,
204        local_inner, local_outer, local_port, _TEST_IN_SPI, 1)
205    self.ReceivePacketOn(underlying_netid, input_pkt)
206
207    # Verify that the packet data and src are correct
208    data, src = read_sock.recvfrom(4096)
209    self.assertEqual(net_test.UDP_PAYLOAD, data)
210    self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
211
212  def _TestTunnel(self, inner_version, outer_version, func, direction,
213                  test_output_mark_unset):
214    """Test a unidirectional XFRM Tunnel with explicit selectors"""
215    # Select the underlying netid, which represents the external
216    # interface from/to which to route ESP packets.
217    u_netid = self.RandomNetid()
218    # Select a random netid that will originate traffic locally and
219    # which represents the netid on which the plaintext is sent
220    netid = self.RandomNetid(exclude=u_netid)
221
222    local_inner = self.MyAddress(inner_version, netid)
223    remote_inner = _GetRemoteInnerAddress(inner_version)
224    local_outer = self.MyAddress(outer_version, u_netid)
225    remote_outer = _GetRemoteOuterAddress(outer_version)
226
227    output_mark = u_netid
228    if test_output_mark_unset:
229      output_mark = None
230      self.SetDefaultNetwork(u_netid)
231
232    try:
233      # Create input/ouput SPs, SAs and sockets to simulate a more realistic
234      # environment.
235      self.xfrm.CreateTunnel(
236          xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner),
237          remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL,
238          xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL)
239
240      self.xfrm.CreateTunnel(
241          xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner),
242          local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256,
243          xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL)
244
245      write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
246      self.SelectInterface(write_sock, netid, "mark")
247      read_sock, _ = _CreateReceiveSock(inner_version)
248
249      sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock
250      func(inner_version, outer_version, u_netid, netid, local_inner,
251          remote_inner, local_outer, remote_outer, sock)
252    finally:
253      if test_output_mark_unset:
254        self.ClearDefaultNetwork()
255
256  def ParamTestTunnelInput(self, inner_version, outer_version):
257    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput,
258                     xfrm.XFRM_POLICY_IN, False)
259
260  def ParamTestTunnelOutput(self, inner_version, outer_version):
261    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
262                     xfrm.XFRM_POLICY_OUT, False)
263
264  def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version):
265    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
266                     xfrm.XFRM_POLICY_OUT, True)
267
268
269@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
270class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
271  def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr,
272                         ikey, okey):
273    self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey)
274    self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey)
275
276    family = AF_INET if version == 4 else AF_INET6
277    self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]),
278                      local_addr)
279    self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]),
280                      remote_addr)
281
282  def testAddVti(self):
283    """Test the creation of a Virtual Tunnel Interface."""
284    for version in [4, 6]:
285      netid = self.RandomNetid()
286      local_addr = self.MyAddress(version, netid)
287      self.iproute.CreateVirtualTunnelInterface(
288          dev_name=_TEST_XFRM_IFNAME,
289          local_addr=local_addr,
290          remote_addr=_GetRemoteOuterAddress(version),
291          o_key=_TEST_OKEY,
292          i_key=_TEST_IKEY)
293      self._VerifyVtiInfoData(
294          self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
295          _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY)
296
297      new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2}
298      new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID
299      new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID
300      self.iproute.CreateVirtualTunnelInterface(
301          dev_name=_TEST_XFRM_IFNAME,
302          local_addr=local_addr,
303          remote_addr=new_remote_addr[version],
304          o_key=new_okey,
305          i_key=new_ikey,
306          is_update=True)
307
308      self._VerifyVtiInfoData(
309          self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
310          new_remote_addr[version], new_ikey, new_okey)
311
312      if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
313
314      # Validate that the netlink interface matches the ioctl interface.
315      self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
316      self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
317      with self.assertRaises(IOError):
318        self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
319
320  def _QuietDeleteLink(self, ifname):
321    try:
322      self.iproute.DeleteLink(ifname)
323    except IOError:
324      # The link was not present.
325      pass
326
327  def tearDown(self):
328    super(XfrmAddDeleteVtiTest, self).tearDown()
329    self._QuietDeleteLink(_TEST_XFRM_IFNAME)
330
331
332class SaInfo(object):
333
334  def __init__(self, spi):
335    self.spi = spi
336    self.seq_num = 1
337
338
339class IpSecBaseInterface(object):
340
341  def __init__(self, iface, netid, underlying_netid, local, remote, version):
342    self.iface = iface
343    self.netid = netid
344    self.underlying_netid = underlying_netid
345    self.local, self.remote = local, remote
346
347    # XFRM interfaces technically do not have a version. This keeps track of
348    # the IP version of the local and remote addresses.
349    self.version = version
350    self.rx = self.tx = 0
351    self.addrs = {}
352
353    self.iproute = iproute.IPRoute()
354    self.xfrm = xfrm.Xfrm()
355
356  def Teardown(self):
357    self.TeardownXfrm()
358    self.TeardownInterface()
359
360  def TeardownInterface(self):
361    self.iproute.DeleteLink(self.iface)
362
363  def SetupXfrm(self, use_null_crypt):
364    rand_spi = random.randint(0, 0x7fffffff)
365    self.in_sa = SaInfo(rand_spi)
366    self.out_sa = SaInfo(rand_spi)
367
368    # Select algorithms:
369    if use_null_crypt:
370      auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL
371    else:
372      auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256
373
374    self.auth = auth
375    self.crypt = crypt
376
377    self._SetupXfrmByType(auth, crypt)
378
379  def Rekey(self, outer_family, new_out_sa, new_in_sa):
380    """Rekeys the Tunnel Interface
381
382    Creates new SAs and updates the outbound security policy to use new SAs.
383
384    Args:
385      outer_family: AF_INET or AF_INET6
386      new_out_sa: An SaInfo struct representing the new outbound SA's info
387      new_in_sa: An SaInfo struct representing the new inbound SA's info
388    """
389    self._Rekey(outer_family, new_out_sa, new_in_sa)
390
391    # Update Interface object
392    self.out_sa = new_out_sa
393    self.in_sa = new_in_sa
394
395  def TeardownXfrm(self):
396    raise NotImplementedError("Subclasses should implement this")
397
398  def _SetupXfrmByType(self, auth_algo, crypt_algo):
399    raise NotImplementedError("Subclasses should implement this")
400
401  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
402    raise NotImplementedError("Subclasses should implement this")
403
404
405class VtiInterface(IpSecBaseInterface):
406
407  def __init__(self, iface, netid, underlying_netid, _, local, remote, version):
408    super(VtiInterface, self).__init__(iface, netid, underlying_netid, local,
409                                       remote, version)
410
411    self.ikey = _TEST_IKEY + netid
412    self.okey = _TEST_OKEY + netid
413
414    self.SetupInterface()
415    self.SetupXfrm(False)
416
417  def SetupInterface(self):
418    return self.iproute.CreateVirtualTunnelInterface(
419        self.iface, self.local, self.remote, self.ikey, self.okey)
420
421  def _SetupXfrmByType(self, auth_algo, crypt_algo):
422    # For the VTI, the selectors are wildcard since packets will only
423    # be selected if they have the appropriate mark, hence the inner
424    # addresses are wildcard.
425    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
426                           self.out_sa.spi, crypt_algo, auth_algo,
427                           xfrm.ExactMatchMark(self.okey),
428                           self.underlying_netid, None, xfrm.MATCH_METHOD_ALL)
429
430    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
431                           self.in_sa.spi, crypt_algo, auth_algo,
432                           xfrm.ExactMatchMark(self.ikey), None, None,
433                           xfrm.MATCH_METHOD_MARK)
434
435  def TeardownXfrm(self):
436    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
437                           self.out_sa.spi, self.okey, None)
438    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
439                           self.in_sa.spi, self.ikey, None)
440
441  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
442    # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
443    #       the same, but rekeys are asymmetric, and only update the outbound
444    #       policy.
445    self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi,
446                        xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
447                        xfrm_base._ALGO_AUTH_NULL, None, None,
448                        xfrm.ExactMatchMark(self.okey), self.underlying_netid)
449
450    self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi,
451                        xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
452                        xfrm_base._ALGO_AUTH_NULL, None, None,
453                        xfrm.ExactMatchMark(self.ikey), None)
454
455    # Create new policies for IPv4 and IPv6.
456    for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
457      # Add SPI-specific output policy to enforce using new outbound SPI
458      policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
459      tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
460                                    (self.local, self.remote))
461      self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey),
462                                 0)
463
464  def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
465    self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP,
466                           xfrm.ExactMatchMark(self.ikey))
467    self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP,
468                           xfrm.ExactMatchMark(self.okey))
469
470
471@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
472class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest):
473  """Test the creation of an XFRM Interface."""
474
475  def testAddXfrmInterface(self):
476    self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
477                                     _LOOPBACK_IFINDEX)
478    if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
479    net_test.SetInterfaceUp(_TEST_XFRM_IFNAME)
480
481    # Validate that the netlink interface matches the ioctl interface.
482    self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
483    self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
484    with self.assertRaises(IOError):
485      self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
486
487
488class XfrmInterface(IpSecBaseInterface):
489
490  def __init__(self, iface, netid, underlying_netid, ifindex, local, remote,
491               version, use_null_crypt=False):
492    super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local,
493                                        remote, version)
494
495    self.ifindex = ifindex
496    self.xfrm_if_id = netid
497
498    self.SetupInterface()
499    self.SetupXfrm(use_null_crypt)
500
501  def SetupInterface(self):
502    """Create an XFRM interface."""
503    return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex)
504
505  def _SetupXfrmByType(self, auth_algo, crypt_algo):
506    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
507                           self.out_sa.spi, crypt_algo, auth_algo, None,
508                           self.underlying_netid, self.xfrm_if_id,
509                           xfrm.MATCH_METHOD_ALL)
510    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
511                           self.in_sa.spi, crypt_algo, auth_algo, None, None,
512                           self.xfrm_if_id, xfrm.MATCH_METHOD_IFID)
513
514  def TeardownXfrm(self):
515    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
516                           self.out_sa.spi, None, self.xfrm_if_id)
517    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
518                           self.in_sa.spi, None, self.xfrm_if_id)
519
520  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
521    # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
522    #       the same, but rekeys are asymmetric, and only update the outbound
523    #       policy.
524    self.xfrm.AddSaInfo(
525        self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
526        xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
527        None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id)
528
529    self.xfrm.AddSaInfo(
530        self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
531        xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
532        None, None, xfrm_if_id=self.xfrm_if_id)
533
534    # Create new policies for IPv4 and IPv6.
535    for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
536      # Add SPI-specific output policy to enforce using new outbound SPI
537      policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
538      tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
539                                    (self.local, self.remote))
540      self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id)
541
542  def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
543    self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None,
544                           self.xfrm_if_id)
545    self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None,
546                           self.xfrm_if_id)
547
548  def Migrate(self, new_underlying_netid, new_local, new_remote):
549    self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
550                            new_remote, new_local, self.in_sa.spi,
551                            self.crypt, self.auth, None, None,
552                            new_underlying_netid, self.xfrm_if_id)
553
554    self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
555                            new_local, new_remote, self.out_sa.spi,
556                            self.crypt, self.auth, None, None,
557                            new_underlying_netid, self.xfrm_if_id)
558
559    self.local = new_local
560    self.remote = new_remote
561    self.underlying_netid = new_underlying_netid
562
563
564class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
565
566  # Subclass that does not allow multiple tunnels (e.g. XfrmInterfaceMigrateTest)
567  # should override this method.
568  @classmethod
569  def allowMultipleTunnels(cls):
570    return True
571
572  @classmethod
573  def setUpClass(cls):
574    xfrm_base.XfrmBaseTest.setUpClass()
575    # Tunnel interfaces use marks extensively, so configure realistic packet
576    # marking rules to make the test representative, make PMTUD work, etc.
577    cls.SetInboundMarks(True)
578    cls.SetMarkReflectSysctls(1)
579
580    # Group by tunnel version to ensure that we test at least one IPv4 and one
581    # IPv6 tunnel
582    cls.tunnelsV4 = {}
583    cls.tunnelsV6 = {}
584
585    if not cls.allowMultipleTunnels():
586      return
587
588    for i, underlying_netid in enumerate(cls.tuns):
589      for version in 4, 6:
590        netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i
591        iface = "ipsec%s" % netid
592        local = cls.MyAddress(version, underlying_netid)
593        if version == 4:
594          remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2)
595        else:
596          remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2)
597
598        ifindex = cls.ifindices[underlying_netid]
599        tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex,
600                                   local, remote, version)
601        cls._SetInboundMarking(netid, iface, True)
602        cls._SetupTunnelNetwork(tunnel, True)
603
604        if version == 4:
605          cls.tunnelsV4[netid] = tunnel
606        else:
607          cls.tunnelsV6[netid] = tunnel
608
609  @classmethod
610  def tearDownClass(cls):
611    # The sysctls are restored by MultinetworkBaseTest.tearDownClass.
612    cls.SetInboundMarks(False)
613    for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()):
614      cls._SetInboundMarking(tunnel.netid, tunnel.iface, False)
615      cls._SetupTunnelNetwork(tunnel, False)
616      tunnel.Teardown()
617    xfrm_base.XfrmBaseTest.tearDownClass()
618
619  def randomTunnel(self, outer_version):
620    version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6
621    return random.choice(list(version_dict.values()))
622
623  def setUp(self):
624    multinetwork_base.MultiNetworkBaseTest.setUp(self)
625    self.iproute = iproute.IPRoute()
626    self.xfrm = xfrm.Xfrm()
627
628  def tearDown(self):
629    multinetwork_base.MultiNetworkBaseTest.tearDown(self)
630
631  def _SwapInterfaceAddress(self, ifname, old_addr, new_addr):
632    """Exchange two addresses on a given interface.
633
634    Args:
635      ifname: Name of the interface
636      old_addr: An address to be removed from the interface
637      new_addr: An address to be added to an interface
638    """
639    version = 6 if ":" in new_addr else 4
640    ifindex = net_test.GetInterfaceIndex(ifname)
641    self.iproute.AddAddress(new_addr,
642                            net_test.AddressLengthBits(version), ifindex)
643    self.iproute.DelAddress(old_addr,
644                            net_test.AddressLengthBits(version), ifindex)
645
646  @classmethod
647  def _GetLocalAddress(cls, version, netid):
648    if version == 4:
649      return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET)
650    else:
651      return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1"
652
653  @classmethod
654  def _SetupTunnelNetwork(cls, tunnel, is_add):
655    """Setup rules and routes for a tunnel Network.
656
657    Takes an interface and depending on the boolean
658    value of is_add, either adds or removes the rules
659    and routes for a tunnel interface to behave like an
660    Android Network for purposes of testing.
661
662    Args:
663      tunnel: A VtiInterface or XfrmInterface, the tunnel to set up.
664      is_add: Boolean that causes this method to perform setup if True or
665        teardown if False
666    """
667    if is_add:
668      # Disable router solicitations to avoid occasional spurious packets
669      # arriving on the underlying network; there are two possible behaviors
670      # when that occurred: either only the RA packet is read, and when it
671      # is echoed back to the tunnel, it causes the test to fail by not
672      # receiving # the UDP_PAYLOAD; or, two packets may arrive on the
673      # underlying # network which fails the assertion that only one ESP packet
674      # is received.
675      cls.SetSysctl(
676          "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0)
677      net_test.SetInterfaceUp(tunnel.iface)
678
679    for version in [4, 6]:
680      ifindex = net_test.GetInterfaceIndex(tunnel.iface)
681      table = tunnel.netid
682
683      # Set up routing rules.
684      start, end = cls.UidRangeForNetid(tunnel.netid)
685      cls.iproute.UidRangeRule(version, is_add, start, end, table,
686                                cls.PRIORITY_UID)
687      cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF)
688      cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK,
689                              table, cls.PRIORITY_FWMARK)
690
691      # Configure IP addresses.
692      addr = cls._GetLocalAddress(version, tunnel.netid)
693      prefixlen = net_test.AddressLengthBits(version)
694      tunnel.addrs[version] = addr
695      if is_add:
696        cls.iproute.AddAddress(addr, prefixlen, ifindex)
697        cls.iproute.AddRoute(version, table, "default", 0, None, ifindex)
698      else:
699        cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
700        cls.iproute.DelAddress(addr, prefixlen, ifindex)
701
702  def assertReceivedPacket(self, tunnel, sa_info):
703    tunnel.rx += 1
704    self.assertEqual((tunnel.rx, tunnel.tx),
705                      self.iproute.GetRxTxPackets(tunnel.iface))
706    sa_info.seq_num += 1
707
708  def assertSentPacket(self, tunnel, sa_info):
709    tunnel.tx += 1
710    self.assertEqual((tunnel.rx, tunnel.tx),
711                      self.iproute.GetRxTxPackets(tunnel.iface))
712    sa_info.seq_num += 1
713
714  def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner,
715                        sa_info=None, expect_fail=False):
716    """Test null-crypt input path over an IPsec interface."""
717    if sa_info is None:
718      sa_info = tunnel.in_sa
719    read_sock, local_port = _CreateReceiveSock(inner_version)
720
721    input_pkt = _GetNullAuthCryptTunnelModePkt(
722        inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT,
723        local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num)
724    self.ReceivePacketOn(tunnel.underlying_netid, input_pkt)
725
726    if expect_fail:
727      self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096)
728    else:
729      # Verify that the packet data and src are correct
730      data, src = read_sock.recvfrom(4096)
731      self.assertReceivedPacket(tunnel, sa_info)
732      self.assertEqual(net_test.UDP_PAYLOAD, data)
733      self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
734
735  def _CheckTunnelOutput(self, tunnel, inner_version, local_inner,
736                         remote_inner, sa_info=None):
737    """Test null-crypt output path over an IPsec interface."""
738    if sa_info is None:
739      sa_info = tunnel.out_sa
740    local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
741                             _TEST_REMOTE_PORT)
742
743    # Read a tunneled IP packet on the underlying (outbound) network
744    # verifying that it is an ESP packet.
745    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
746                                  sa_info.seq_num, None, tunnel.local,
747                                  tunnel.remote)
748
749    # Get and update the IP headers on the inner payload so that we can do a simple
750    # comparison of byte data. Unfortunately, due to the scapy version this runs on,
751    # we cannot parse past the ESP header to the inner IP header, and thus have to
752    # workaround in this manner
753    if inner_version == 4:
754      ip_hdr_options = {
755        'id': scapy.IP(str(pkt.payload)[8:]).id,
756        'flags': scapy.IP(str(pkt.payload)[8:]).flags
757      }
758    else:
759      ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl}
760
761    expected = _GetNullAuthCryptTunnelModePkt(
762        inner_version, local_inner, tunnel.local, local_port, remote_inner,
763        tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num,
764        ip_hdr_options)
765
766    # Check outer header manually (Avoids having to overwrite outer header's
767    # id, flags or flow label)
768    self.assertSentPacket(tunnel, sa_info)
769    self.assertEqual(expected.src, pkt.src)
770    self.assertEqual(expected.dst, pkt.dst)
771    self.assertEqual(len(expected), len(pkt))
772
773    # Check everything else
774    self.assertEqual(str(expected.payload), str(pkt.payload))
775
776  def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner,
777                             remote_inner):
778    """Test both input and output paths over an encrypted IPsec interface.
779
780    This tests specifically makes sure that the both encryption and decryption
781    work together, as opposed to the _CheckTunnel(Input|Output) where the
782    input and output paths are tested separately, and using null encryption.
783    """
784    src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
785                           _TEST_REMOTE_PORT)
786
787    # Make sure it appeared on the underlying interface
788    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi,
789                                  tunnel.out_sa.seq_num, None, tunnel.local,
790                                  tunnel.remote)
791
792    # Check that packet is not sent in plaintext
793    self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt))
794
795    # Check src/dst
796    self.assertEqual(tunnel.local, pkt.src)
797    self.assertEqual(tunnel.remote, pkt.dst)
798
799    # Check that the interface statistics recorded the outbound packet
800    self.assertSentPacket(tunnel, tunnel.out_sa)
801
802    try:
803      # Swap the interface addresses to pretend we are the remote
804      self._SwapInterfaceAddress(
805          tunnel.iface, new_addr=remote_inner, old_addr=local_inner)
806
807      # Swap the packet's IP headers and write it back to the underlying
808      # network.
809      pkt = TunTwister.TwistPacket(pkt)
810      read_sock, local_port = _CreateReceiveSock(inner_version,
811                                                 _TEST_REMOTE_PORT)
812      self.ReceivePacketOn(tunnel.underlying_netid, pkt)
813
814      # Verify that the packet data and src are correct
815      data, src = read_sock.recvfrom(4096)
816      self.assertEqual(net_test.UDP_PAYLOAD, data)
817      self.assertEqual((local_inner, src_port), src[:2])
818
819      # Check that the interface statistics recorded the inbound packet
820      self.assertReceivedPacket(tunnel, tunnel.in_sa)
821    finally:
822      # Swap the interface addresses to pretend we are the remote
823      self._SwapInterfaceAddress(
824          tunnel.iface, new_addr=local_inner, old_addr=remote_inner)
825
826  def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner,
827                       sa_info=None):
828    """Test ICMP error path over an IPsec interface."""
829    if sa_info is None:
830      sa_info = tunnel.out_sa
831    # Now attempt to provoke an ICMP error.
832    # TODO: deduplicate with multinetwork_test.py.
833    dst_prefix, intermediate = {
834        4: ("172.19.", "172.16.9.12"),
835        6: ("2001:db8::", "2001:db8::1")
836    }[tunnel.version]
837
838    local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
839                             _TEST_REMOTE_PORT)
840    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
841                                  sa_info.seq_num, None, tunnel.local,
842                                  tunnel.remote)
843    self.assertSentPacket(tunnel, sa_info)
844
845    myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid)
846    _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr,
847                                         pkt)
848    self.ReceivePacketOn(tunnel.underlying_netid, toobig)
849
850    # Check that the packet too big reduced the MTU.
851    routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None)
852    self.assertEqual(1, len(routes))
853    rtmsg, attributes = routes[0]
854    self.assertEqual(iproute.RTN_UNICAST, rtmsg.type)
855    self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
856
857    # Clear PMTU information so that future tests don't have to worry about it.
858    self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid)
859
860  def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner,
861                                     remote_inner):
862    """Test combined encryption path with ICMP errors over an IPsec tunnel"""
863    self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
864                                remote_inner)
865    self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner)
866    self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
867                                remote_inner)
868
869  def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
870    """Bootstrap method to setup and run tests for the given parameters."""
871    tunnel = self.randomTunnel(outer_version)
872
873    try:
874      # Some tests require that the out_seq_num and in_seq_num are the same
875      # (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1
876      #
877      # Until we get better scapy support, the only way we can build an
878      # encrypted packet is to send it out, and read the packet from the wire.
879      # We then generally use this as the "inbound" encrypted packet, injecting
880      # it into the interface for which it is expected on.
881      #
882      # As such, this is required to ensure that encrypted packets (which we
883      # currently have no way to easily modify) are not considered replay
884      # attacks by the inbound SA.  (eg: received 3 packets, seq_num_in = 3,
885      # sent only 1, # seq_num_out = 1, inbound SA would consider this a replay
886      # attack)
887      tunnel.TeardownXfrm()
888      tunnel.SetupXfrm(use_null_crypt)
889
890      local_inner = tunnel.addrs[inner_version]
891      remote_inner = _GetRemoteInnerAddress(inner_version)
892
893      for i in range(2):
894        func(tunnel, inner_version, local_inner, remote_inner)
895    finally:
896      if use_null_crypt:
897        tunnel.TeardownXfrm()
898        tunnel.SetupXfrm(False)
899
900  def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner):
901    old_out_sa = tunnel.out_sa
902    old_in_sa = tunnel.in_sa
903
904    # Check to make sure that both directions work before rekey
905    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
906                           old_in_sa)
907    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
908                            old_out_sa)
909
910    # Rekey
911    outer_family = net_test.GetAddressFamily(tunnel.version)
912
913    # Create new SA
914    # Distinguish the new SAs with new SPIs.
915    new_out_sa = SaInfo(old_out_sa.spi + 1)
916    new_in_sa = SaInfo(old_in_sa.spi + 1)
917
918    # Perform Rekey
919    tunnel.Rekey(outer_family, new_out_sa, new_in_sa)
920
921    # Expect that the old SPI still works for inbound packets
922    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
923                           old_in_sa)
924
925    # Test both paths with new SPIs, expect outbound to use new SPI
926    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
927                           new_in_sa)
928    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
929                            new_out_sa)
930
931    # Delete old SAs
932    tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi)
933
934    # Test both paths with new SPIs; should still work
935    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
936                           new_in_sa)
937    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
938                            new_out_sa)
939
940    # Expect failure upon trying to receive a packet with the deleted SPI
941    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
942                           old_in_sa, True)
943
944  def _TestTunnelRekey(self, inner_version, outer_version):
945    """Test packet input and output over a Virtual Tunnel Interface."""
946    tunnel = self.randomTunnel(outer_version)
947
948    try:
949      # Always use null_crypt, so we can check input and output separately
950      tunnel.TeardownXfrm()
951      tunnel.SetupXfrm(True)
952
953      local_inner = tunnel.addrs[inner_version]
954      remote_inner = _GetRemoteInnerAddress(inner_version)
955
956      self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner)
957    finally:
958      tunnel.TeardownXfrm()
959      tunnel.SetupXfrm(False)
960
961
962@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
963class XfrmVtiTest(XfrmTunnelBase):
964
965  INTERFACE_CLASS = VtiInterface
966
967  def ParamTestVtiInput(self, inner_version, outer_version):
968    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
969
970  def ParamTestVtiOutput(self, inner_version, outer_version):
971    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
972                     True)
973
974  def ParamTestVtiInOutEncrypted(self, inner_version, outer_version):
975    self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
976                     False)
977
978  def ParamTestVtiIcmp(self, inner_version, outer_version):
979    self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
980
981  def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version):
982    self._TestTunnel(inner_version, outer_version,
983                     self._CheckTunnelEncryptionWithIcmp, False)
984
985  def ParamTestVtiRekey(self, inner_version, outer_version):
986    self._TestTunnelRekey(inner_version, outer_version)
987
988
989@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
990class XfrmInterfaceTest(XfrmTunnelBase):
991
992  INTERFACE_CLASS = XfrmInterface
993
994  def ParamTestXfrmIntfInput(self, inner_version, outer_version):
995    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
996
997  def ParamTestXfrmIntfOutput(self, inner_version, outer_version):
998    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
999                     True)
1000
1001  def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version):
1002    self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
1003                     False)
1004
1005  def ParamTestXfrmIntfIcmp(self, inner_version, outer_version):
1006    self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
1007
1008  def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
1009    self._TestTunnel(inner_version, outer_version,
1010                     self._CheckTunnelEncryptionWithIcmp, False)
1011
1012  def ParamTestXfrmIntfRekey(self, inner_version, outer_version):
1013    self._TestTunnelRekey(inner_version, outer_version)
1014
1015@unittest.skipUnless(SUPPORTS_XFRM_MIGRATE, "XFRM migration unsupported")
1016class XfrmInterfaceMigrateTest(XfrmTunnelBase):
1017  # TODO: b/172497215 There is a kernel issue that XFRM_MIGRATE cannot work correctly
1018  # when there are multiple tunnels with the same selectors. Thus before this issue
1019  # is fixed, #allowMultipleTunnels must be overridden to avoid setting up multiple
1020  # tunnels. This need to be removed after the kernel issue is fixed.
1021  @classmethod
1022  def allowMultipleTunnels(cls):
1023    return False
1024
1025  def setUpTunnel(self, outer_version, use_null_crypt):
1026    underlying_netid = self.RandomNetid()
1027    netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET
1028    iface = "ipsec%s" % netid
1029    ifindex = self.ifindices[underlying_netid]
1030
1031    local = self.MyAddress(outer_version, underlying_netid)
1032    remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR
1033
1034    tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex,
1035                           local, remote, outer_version, use_null_crypt)
1036    self._SetInboundMarking(netid, iface, True)
1037    self._SetupTunnelNetwork(tunnel, True)
1038
1039    return tunnel
1040
1041  def tearDownTunnel(self, tunnel):
1042    self._SetInboundMarking(tunnel.netid, tunnel.iface, False)
1043    self._SetupTunnelNetwork(tunnel, False)
1044    tunnel.Teardown()
1045
1046  def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
1047    try:
1048      tunnel = self.setUpTunnel(outer_version, use_null_crypt)
1049
1050      # Verify functionality before migration
1051      local_inner = tunnel.addrs[inner_version]
1052      remote_inner = _GetRemoteInnerAddress(inner_version)
1053      func(tunnel, inner_version, local_inner, remote_inner)
1054
1055      # Migrate tunnel
1056      # TODO:b/169170981 Add tests that migrate 4 -> 6 and 6 -> 4
1057      new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid)
1058      new_local = self.MyAddress(outer_version, new_underlying_netid)
1059      new_remote = net_test.IPV4_ADDR2 if outer_version == 4 else net_test.IPV6_ADDR2
1060
1061      tunnel.Migrate(new_underlying_netid, new_local, new_remote)
1062
1063      # Verify functionality after migration
1064      func(tunnel, inner_version, local_inner, remote_inner)
1065    finally:
1066      self.tearDownTunnel(tunnel)
1067
1068  def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version):
1069    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
1070
1071  def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version):
1072    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
1073                     True)
1074
1075  def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version):
1076    self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
1077                     False)
1078
1079  def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version):
1080    self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
1081
1082  def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
1083    self._TestTunnel(inner_version, outer_version,
1084                     self._CheckTunnelEncryptionWithIcmp, False)
1085
1086  def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version):
1087    self._TestTunnel(inner_version, outer_version, self._CheckTunnelRekey,
1088                     True)
1089
1090if __name__ == "__main__":
1091  InjectTests()
1092  unittest.main()
1093