• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python3
2#
3# Copyright 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
18from errno import *  # pylint: disable=wildcard-import
19from socket import *  # pylint: disable=wildcard-import
20
21import random
22import itertools
23import struct
24import unittest
25
26from net_test import LINUX_VERSION
27from scapy import all as scapy
28from tun_twister import TunTwister
29import csocket
30import iproute
31import multinetwork_base
32import net_test
33import packets
34import util
35import xfrm
36import xfrm_base
37
38_LOOPBACK_IFINDEX = 1
39_TEST_XFRM_IFNAME = "ipsec42"
40_TEST_XFRM_IF_ID = 42
41_TEST_SPI = 0x1234
42
43# Does the kernel support CONFIG_XFRM_INTERFACE?
44def HaveXfrmInterfaces():
45  # 4.19+ must have CONFIG_XFRM_INTERFACE enabled
46  if LINUX_VERSION >= (4, 19, 0):
47    return True
48
49  try:
50    i = iproute.IPRoute()
51    i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
52                          _LOOPBACK_IFINDEX)
53    i.DeleteLink(_TEST_XFRM_IFNAME)
54    try:
55      i.GetIfIndex(_TEST_XFRM_IFNAME)
56      assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME
57    except IOError:
58      pass
59    return True
60  except IOError:
61    return False
62
63HAVE_XFRM_INTERFACES = HaveXfrmInterfaces()
64
65# Two kernel fixes have been added in 5.17 to allow XFRM_MIGRATE to work correctly
66# when (1) there are multiple tunnels with the same selectors; and (2) addresses
67# are updated to a different IP family. These two fixes were pulled into upstream
68# LTS releases 4.14.273, 4.19.236, 5.4.186, 5.10.107 and 5.15.30, from whence they
69# flowed into the Android Common Kernel (via standard LTS merges).
70# As such we require 4.14.273+, 4.19.236+, 5.4.186+, 5.10.107+, 5.15.30+ or 5.17+
71# to have these fixes.
72def HasXfrmMigrateFixes():
73    return (
74            ((LINUX_VERSION >= (4, 14, 273)) and (LINUX_VERSION < (4, 19, 0))) or
75            ((LINUX_VERSION >= (4, 19, 236)) and (LINUX_VERSION < (5, 4, 0))) or
76            ((LINUX_VERSION >= (5, 4, 186)) and (LINUX_VERSION < (5, 10, 0))) or
77            ((LINUX_VERSION >= (5, 10, 107)) and (LINUX_VERSION < (5, 15, 0))) or
78            (LINUX_VERSION >= (5, 15, 30))
79           )
80
81
82# Does the kernel support CONFIG_XFRM_MIGRATE and include the kernel fixes?
83def SupportsXfrmMigrate():
84  if not HasXfrmMigrateFixes():
85    return False
86
87  # 5.10+ must have CONFIG_XFRM_MIGRATE enabled
88  if LINUX_VERSION >= (5, 10, 0):
89    return True
90
91  # XFRM_MIGRATE depends on xfrmi interfaces
92  if not HAVE_XFRM_INTERFACES:
93    return False
94
95  try:
96    x = xfrm.Xfrm()
97    wildcard_addr = net_test.GetWildcardAddress(6)
98    selector = xfrm.EmptySelector(AF_INET6)
99
100    # Expect migration to fail with EINVAL because it is trying to migrate a
101    # non-existent SA.
102    x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr,
103                    wildcard_addr, wildcard_addr, _TEST_SPI,
104                    None, None, None, None, None, None)
105    print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled")
106    return True
107  except IOError as err:
108    if err.errno == ENOPROTOOPT:
109      return False
110    elif err.errno == EINVAL:
111      return True
112    else:
113      print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno)
114      return True
115
116SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate()
117
118# Parameters to setup tunnels as special networks
119_TUNNEL_NETID_OFFSET = 0xFC00  # Matches reserved netid range for IpSecService
120_BASE_TUNNEL_NETID = {4: 40, 6: 60}
121_BASE_VTI_OKEY = 2000000100
122_BASE_VTI_IKEY = 2000000200
123
124_TEST_OUT_SPI = _TEST_SPI
125_TEST_IN_SPI = _TEST_OUT_SPI
126
127_TEST_OKEY = 2000000100
128_TEST_IKEY = 2000000200
129
130_TEST_REMOTE_PORT = 1234
131
132_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6}
133
134
135def _GetLocalInnerAddress(version):
136  return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version]
137
138
139def _GetRemoteInnerAddress(version):
140  return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version]
141
142
143def _GetRemoteOuterAddress(version):
144  return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
145
146
147def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer,
148                                   src_port, dst_inner, dst_outer,
149                                   dst_port, spi, seq_num, ip_hdr_options=None):
150  if ip_hdr_options is None:
151    ip_hdr_options = {}
152
153  ip_hdr_options.update({'src': src_inner, 'dst': dst_inner})
154
155  # Build and receive an ESP packet destined for the inner socket
156  IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
157  input_pkt = (
158      IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) /
159      net_test.UDP_PAYLOAD)
160  input_pkt = IpType(bytes(input_pkt))  # Compute length, checksum.
161  input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num,
162                                              (src_outer, dst_outer))
163
164  return input_pkt
165
166
167def _CreateReceiveSock(version, port=0):
168  # Create a socket to receive packets.
169  read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
170  read_sock.bind((net_test.GetWildcardAddress(version), port))
171  # The second parameter of the tuple is the port number regardless of AF.
172  local_port = read_sock.getsockname()[1]
173  # Guard against the eventuality of the receive failing.
174  csocket.SetSocketTimeout(read_sock, 500)
175
176  return read_sock, local_port
177
178
179def _SendPacket(testInstance, netid, version, remote, remote_port):
180  # Send a packet out via the tunnel-backed network, bound for the port number
181  # of the input socket.
182  write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
183  testInstance.SelectInterface(write_sock, netid, "mark")
184  write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port))
185  local_port = write_sock.getsockname()[1]
186
187  return local_port
188
189
190def InjectTests():
191  InjectParameterizedTests(XfrmTunnelTest)
192  InjectParameterizedTests(XfrmInterfaceTest)
193  InjectParameterizedTests(XfrmVtiTest)
194  InjectParameterizedMigrateTests(XfrmInterfaceMigrateTest)
195
196
197def InjectParameterizedTests(cls):
198  VERSIONS = (4, 6)
199  param_list = itertools.product(VERSIONS, VERSIONS)
200
201  def NameGenerator(*args):
202    return "IPv%d_in_IPv%d" % tuple(args)
203
204  util.InjectParameterizedTest(cls, param_list, NameGenerator)
205
206def InjectParameterizedMigrateTests(cls):
207  VERSIONS = (4, 6)
208  param_list = itertools.product(VERSIONS, VERSIONS, VERSIONS)
209
210  def NameGenerator(*args):
211    return "IPv%d_in_IPv%d_to_outer_IPv%d" % tuple(args)
212
213  util.InjectParameterizedTest(cls, param_list, NameGenerator)
214
215
216class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
217
218  def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid,
219                         netid, local_inner, remote_inner, local_outer,
220                         remote_outer, write_sock):
221
222    write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
223    self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
224                            local_outer, remote_outer)
225
226  def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid,
227                        netid, local_inner, remote_inner, local_outer,
228                        remote_outer, read_sock):
229
230    # The second parameter of the tuple is the port number regardless of AF.
231    local_port = read_sock.getsockname()[1]
232
233    # Build and receive an ESP packet destined for the inner socket
234    input_pkt = _GetNullAuthCryptTunnelModePkt(
235        inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT,
236        local_inner, local_outer, local_port, _TEST_IN_SPI, 1)
237    self.ReceivePacketOn(underlying_netid, input_pkt)
238
239    # Verify that the packet data and src are correct
240    data, src = read_sock.recvfrom(4096)
241    self.assertEqual(net_test.UDP_PAYLOAD, data)
242    self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
243
244  def _TestTunnel(self, inner_version, outer_version, func, direction,
245                  test_output_mark_unset):
246    """Test a unidirectional XFRM Tunnel with explicit selectors"""
247    # Select the underlying netid, which represents the external
248    # interface from/to which to route ESP packets.
249    u_netid = self.RandomNetid()
250    # Select a random netid that will originate traffic locally and
251    # which represents the netid on which the plaintext is sent
252    netid = self.RandomNetid(exclude=u_netid)
253
254    local_inner = self.MyAddress(inner_version, netid)
255    remote_inner = _GetRemoteInnerAddress(inner_version)
256    local_outer = self.MyAddress(outer_version, u_netid)
257    remote_outer = _GetRemoteOuterAddress(outer_version)
258
259    output_mark = u_netid
260    if test_output_mark_unset:
261      output_mark = None
262      self.SetDefaultNetwork(u_netid)
263
264    try:
265      # Create input/ouput SPs, SAs and sockets to simulate a more realistic
266      # environment.
267      self.xfrm.CreateTunnel(
268          xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner),
269          remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL,
270          xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL)
271
272      self.xfrm.CreateTunnel(
273          xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner),
274          local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256,
275          xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL)
276
277      write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
278      self.SelectInterface(write_sock, netid, "mark")
279      read_sock, _ = _CreateReceiveSock(inner_version)
280
281      sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock
282      func(inner_version, outer_version, u_netid, netid, local_inner,
283          remote_inner, local_outer, remote_outer, sock)
284    finally:
285      if test_output_mark_unset:
286        self.ClearDefaultNetwork()
287
288  def ParamTestTunnelInput(self, inner_version, outer_version):
289    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput,
290                     xfrm.XFRM_POLICY_IN, False)
291
292  def ParamTestTunnelOutput(self, inner_version, outer_version):
293    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
294                     xfrm.XFRM_POLICY_OUT, False)
295
296  def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version):
297    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
298                     xfrm.XFRM_POLICY_OUT, True)
299
300
301class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
302  def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr,
303                         ikey, okey):
304    self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey)
305    self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey)
306
307    family = AF_INET if version == 4 else AF_INET6
308    self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]),
309                      local_addr)
310    self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]),
311                      remote_addr)
312
313  def testAddVti(self):
314    """Test the creation of a Virtual Tunnel Interface."""
315    for version in [4, 6]:
316      netid = self.RandomNetid()
317      local_addr = self.MyAddress(version, netid)
318      self.iproute.CreateVirtualTunnelInterface(
319          dev_name=_TEST_XFRM_IFNAME,
320          local_addr=local_addr,
321          remote_addr=_GetRemoteOuterAddress(version),
322          o_key=_TEST_OKEY,
323          i_key=_TEST_IKEY)
324      self._VerifyVtiInfoData(
325          self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
326          _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY)
327
328      new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2}
329      new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID
330      new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID
331      self.iproute.CreateVirtualTunnelInterface(
332          dev_name=_TEST_XFRM_IFNAME,
333          local_addr=local_addr,
334          remote_addr=new_remote_addr[version],
335          o_key=new_okey,
336          i_key=new_ikey,
337          is_update=True)
338
339      self._VerifyVtiInfoData(
340          self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
341          new_remote_addr[version], new_ikey, new_okey)
342
343      if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
344
345      # Validate that the netlink interface matches the ioctl interface.
346      self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
347      self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
348      with self.assertRaises(IOError):
349        self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
350
351  def _QuietDeleteLink(self, ifname):
352    try:
353      self.iproute.DeleteLink(ifname)
354    except IOError:
355      # The link was not present.
356      pass
357
358  def tearDown(self):
359    super(XfrmAddDeleteVtiTest, self).tearDown()
360    self._QuietDeleteLink(_TEST_XFRM_IFNAME)
361
362
363class SaInfo(object):
364
365  def __init__(self, spi):
366    self.spi = spi
367    self.seq_num = 1
368
369
370class IpSecBaseInterface(object):
371
372  def __init__(self, iface, netid, underlying_netid, local, remote, version):
373    self.iface = iface
374    self.netid = netid
375    self.underlying_netid = underlying_netid
376    self.local, self.remote = local, remote
377
378    # XFRM interfaces technically do not have a version. This keeps track of
379    # the IP version of the local and remote addresses.
380    self.version = version
381    self.rx = self.tx = 0
382    self.addrs = {}
383
384    self.iproute = iproute.IPRoute()
385    self.xfrm = xfrm.Xfrm()
386
387  def Teardown(self):
388    self.TeardownXfrm()
389    self.TeardownInterface()
390
391  def TeardownInterface(self):
392    self.iproute.DeleteLink(self.iface)
393
394  def SetupXfrm(self, use_null_crypt):
395    rand_spi = random.randint(0, 0x7fffffff)
396    self.in_sa = SaInfo(rand_spi)
397    self.out_sa = SaInfo(rand_spi)
398
399    # Select algorithms:
400    if use_null_crypt:
401      auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL
402    else:
403      auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256
404
405    self.auth = auth
406    self.crypt = crypt
407
408    self._SetupXfrmByType(auth, crypt)
409
410  def Rekey(self, outer_family, new_out_sa, new_in_sa):
411    """Rekeys the Tunnel Interface
412
413    Creates new SAs and updates the outbound security policy to use new SAs.
414
415    Args:
416      outer_family: AF_INET or AF_INET6
417      new_out_sa: An SaInfo struct representing the new outbound SA's info
418      new_in_sa: An SaInfo struct representing the new inbound SA's info
419    """
420    self._Rekey(outer_family, new_out_sa, new_in_sa)
421
422    # Update Interface object
423    self.out_sa = new_out_sa
424    self.in_sa = new_in_sa
425
426  def TeardownXfrm(self):
427    raise NotImplementedError("Subclasses should implement this")
428
429  def _SetupXfrmByType(self, auth_algo, crypt_algo):
430    raise NotImplementedError("Subclasses should implement this")
431
432  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
433    raise NotImplementedError("Subclasses should implement this")
434
435
436class VtiInterface(IpSecBaseInterface):
437
438  def __init__(self, iface, netid, underlying_netid, _, local, remote, version):
439    super(VtiInterface, self).__init__(iface, netid, underlying_netid, local,
440                                       remote, version)
441
442    self.ikey = _TEST_IKEY + netid
443    self.okey = _TEST_OKEY + netid
444
445    self.SetupInterface()
446    self.SetupXfrm(False)
447
448  def SetupInterface(self):
449    return self.iproute.CreateVirtualTunnelInterface(
450        self.iface, self.local, self.remote, self.ikey, self.okey)
451
452  def _SetupXfrmByType(self, auth_algo, crypt_algo):
453    # For the VTI, the selectors are wildcard since packets will only
454    # be selected if they have the appropriate mark, hence the inner
455    # addresses are wildcard.
456    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
457                           self.out_sa.spi, crypt_algo, auth_algo,
458                           xfrm.ExactMatchMark(self.okey),
459                           self.underlying_netid, None, xfrm.MATCH_METHOD_ALL)
460
461    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
462                           self.in_sa.spi, crypt_algo, auth_algo,
463                           xfrm.ExactMatchMark(self.ikey), None, None,
464                           xfrm.MATCH_METHOD_MARK)
465
466  def TeardownXfrm(self):
467    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
468                           self.out_sa.spi, self.okey, None)
469    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
470                           self.in_sa.spi, self.ikey, None)
471
472  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
473    # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
474    #       the same, but rekeys are asymmetric, and only update the outbound
475    #       policy.
476    self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi,
477                        xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
478                        xfrm_base._ALGO_AUTH_NULL, None, None,
479                        xfrm.ExactMatchMark(self.okey), self.underlying_netid)
480
481    self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi,
482                        xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
483                        xfrm_base._ALGO_AUTH_NULL, None, None,
484                        xfrm.ExactMatchMark(self.ikey), None)
485
486    # Create new policies for IPv4 and IPv6.
487    for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
488      # Add SPI-specific output policy to enforce using new outbound SPI
489      policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
490      tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
491                                    (self.local, self.remote))
492      self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey),
493                                 0)
494
495  def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
496    self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP,
497                           xfrm.ExactMatchMark(self.ikey))
498    self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP,
499                           xfrm.ExactMatchMark(self.okey))
500
501
502@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
503class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest):
504  """Test the creation of an XFRM Interface."""
505
506  def testAddXfrmInterface(self):
507    self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
508                                     _LOOPBACK_IFINDEX)
509    if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
510    net_test.SetInterfaceUp(_TEST_XFRM_IFNAME)
511
512    # Validate that the netlink interface matches the ioctl interface.
513    self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
514    self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
515    with self.assertRaises(IOError):
516      self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
517
518
519class XfrmInterface(IpSecBaseInterface):
520
521  def __init__(self, iface, netid, underlying_netid, ifindex, local, remote,
522               version, use_null_crypt=False):
523    super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local,
524                                        remote, version)
525
526    self.ifindex = ifindex
527    self.xfrm_if_id = netid
528
529    self.SetupInterface()
530    self.SetupXfrm(use_null_crypt)
531
532  def SetupInterface(self):
533    """Create an XFRM interface."""
534    return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex)
535
536  def _SetupXfrmByType(self, auth_algo, crypt_algo):
537    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
538                           self.out_sa.spi, crypt_algo, auth_algo, None,
539                           self.underlying_netid, self.xfrm_if_id,
540                           xfrm.MATCH_METHOD_ALL)
541    self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
542                           self.in_sa.spi, crypt_algo, auth_algo, None, None,
543                           self.xfrm_if_id, xfrm.MATCH_METHOD_IFID)
544
545  def TeardownXfrm(self):
546    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
547                           self.out_sa.spi, None, self.xfrm_if_id)
548    self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
549                           self.in_sa.spi, None, self.xfrm_if_id)
550
551  def _Rekey(self, outer_family, new_out_sa, new_in_sa):
552    # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
553    #       the same, but rekeys are asymmetric, and only update the outbound
554    #       policy.
555    self.xfrm.AddSaInfo(
556        self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
557        xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
558        None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id)
559
560    self.xfrm.AddSaInfo(
561        self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
562        xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
563        None, None, xfrm_if_id=self.xfrm_if_id)
564
565    # Create new policies for IPv4 and IPv6.
566    for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
567      # Add SPI-specific output policy to enforce using new outbound SPI
568      policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
569      tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
570                                    (self.local, self.remote))
571      self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id)
572
573  def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
574    self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None,
575                           self.xfrm_if_id)
576    self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None,
577                           self.xfrm_if_id)
578
579  def Migrate(self, new_underlying_netid, new_local, new_remote):
580    self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
581                            new_remote, new_local, self.in_sa.spi,
582                            self.crypt, self.auth, None, None,
583                            new_underlying_netid, self.xfrm_if_id)
584
585    self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
586                            new_local, new_remote, self.out_sa.spi,
587                            self.crypt, self.auth, None, None,
588                            new_underlying_netid, self.xfrm_if_id)
589
590    self.local = new_local
591    self.remote = new_remote
592    self.version = net_test.GetAddressVersion(new_local)
593    self.underlying_netid = new_underlying_netid
594
595
596class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
597
598  @classmethod
599  def setUpClass(cls):
600    xfrm_base.XfrmBaseTest.setUpClass()
601    # Tunnel interfaces use marks extensively, so configure realistic packet
602    # marking rules to make the test representative, make PMTUD work, etc.
603    cls.SetInboundMarks(True)
604    cls.SetMarkReflectSysctls(1)
605
606    # Group by tunnel version to ensure that we test at least one IPv4 and one
607    # IPv6 tunnel
608    cls.tunnelsV4 = {}
609    cls.tunnelsV6 = {}
610
611    for i, underlying_netid in enumerate(cls.tuns):
612      for version in 4, 6:
613        netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i
614        iface = "ipsec%s" % netid
615        local = cls.MyAddress(version, underlying_netid)
616        if version == 4:
617          remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2)
618        else:
619          remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2)
620
621        ifindex = cls.ifindices[underlying_netid]
622        tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex,
623                                   local, remote, version)
624        cls._SetInboundMarking(netid, iface, True)
625        cls._SetupTunnelNetwork(tunnel, True)
626
627        if version == 4:
628          cls.tunnelsV4[netid] = tunnel
629        else:
630          cls.tunnelsV6[netid] = tunnel
631
632  @classmethod
633  def tearDownClass(cls):
634    # The sysctls are restored by MultinetworkBaseTest.tearDownClass.
635    cls.SetInboundMarks(False)
636    for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()):
637      cls._SetInboundMarking(tunnel.netid, tunnel.iface, False)
638      cls._SetupTunnelNetwork(tunnel, False)
639      tunnel.Teardown()
640    xfrm_base.XfrmBaseTest.tearDownClass()
641
642  def randomTunnel(self, outer_version):
643    version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6
644    return random.choice(list(version_dict.values()))
645
646  def setUp(self):
647    multinetwork_base.MultiNetworkBaseTest.setUp(self)
648    self.iproute = iproute.IPRoute()
649    self.xfrm = xfrm.Xfrm()
650
651  def tearDown(self):
652    multinetwork_base.MultiNetworkBaseTest.tearDown(self)
653
654  def _SwapInterfaceAddress(self, ifname, old_addr, new_addr):
655    """Exchange two addresses on a given interface.
656
657    Args:
658      ifname: Name of the interface
659      old_addr: An address to be removed from the interface
660      new_addr: An address to be added to an interface
661    """
662    version = 6 if ":" in new_addr else 4
663    ifindex = net_test.GetInterfaceIndex(ifname)
664    self.iproute.AddAddress(new_addr,
665                            net_test.AddressLengthBits(version), ifindex)
666    self.iproute.DelAddress(old_addr,
667                            net_test.AddressLengthBits(version), ifindex)
668
669  @classmethod
670  def _GetLocalAddress(cls, version, netid):
671    if version == 4:
672      return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET)
673    else:
674      return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1"
675
676  @classmethod
677  def _SetupTunnelNetwork(cls, tunnel, is_add):
678    """Setup rules and routes for a tunnel Network.
679
680    Takes an interface and depending on the boolean
681    value of is_add, either adds or removes the rules
682    and routes for a tunnel interface to behave like an
683    Android Network for purposes of testing.
684
685    Args:
686      tunnel: A VtiInterface or XfrmInterface, the tunnel to set up.
687      is_add: Boolean that causes this method to perform setup if True or
688        teardown if False
689    """
690    if is_add:
691      # Disable router solicitations to avoid occasional spurious packets
692      # arriving on the underlying network; there are two possible behaviors
693      # when that occurred: either only the RA packet is read, and when it
694      # is echoed back to the tunnel, it causes the test to fail by not
695      # receiving # the UDP_PAYLOAD; or, two packets may arrive on the
696      # underlying # network which fails the assertion that only one ESP packet
697      # is received.
698      cls.SetSysctl(
699          "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0)
700      net_test.SetInterfaceUp(tunnel.iface)
701
702    for version in [4, 6]:
703      ifindex = net_test.GetInterfaceIndex(tunnel.iface)
704      table = tunnel.netid
705
706      # Set up routing rules.
707      start, end = cls.UidRangeForNetid(tunnel.netid)
708      cls.iproute.UidRangeRule(version, is_add, start, end, table,
709                                cls.PRIORITY_UID)
710      cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF)
711      cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK,
712                              table, cls.PRIORITY_FWMARK)
713
714      # Configure IP addresses.
715      addr = cls._GetLocalAddress(version, tunnel.netid)
716      prefixlen = net_test.AddressLengthBits(version)
717      tunnel.addrs[version] = addr
718      if is_add:
719        cls.iproute.AddAddress(addr, prefixlen, ifindex)
720        cls.iproute.AddRoute(version, table, "default", 0, None, ifindex)
721      else:
722        cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
723        cls.iproute.DelAddress(addr, prefixlen, ifindex)
724
725  def assertReceivedPacket(self, tunnel, sa_info):
726    tunnel.rx += 1
727    self.assertEqual((tunnel.rx, tunnel.tx),
728                      self.iproute.GetRxTxPackets(tunnel.iface))
729    sa_info.seq_num += 1
730
731  def assertSentPacket(self, tunnel, sa_info):
732    tunnel.tx += 1
733    self.assertEqual((tunnel.rx, tunnel.tx),
734                      self.iproute.GetRxTxPackets(tunnel.iface))
735    sa_info.seq_num += 1
736
737  def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner,
738                        sa_info=None, expect_fail=False):
739    """Test null-crypt input path over an IPsec interface."""
740    if sa_info is None:
741      sa_info = tunnel.in_sa
742    read_sock, local_port = _CreateReceiveSock(inner_version)
743
744    input_pkt = _GetNullAuthCryptTunnelModePkt(
745        inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT,
746        local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num)
747    self.ReceivePacketOn(tunnel.underlying_netid, input_pkt)
748
749    if expect_fail:
750      self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096)
751    else:
752      # Verify that the packet data and src are correct
753      data, src = read_sock.recvfrom(4096)
754      self.assertReceivedPacket(tunnel, sa_info)
755      self.assertEqual(net_test.UDP_PAYLOAD, data)
756      self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
757
758  def _CheckTunnelOutput(self, tunnel, inner_version, local_inner,
759                         remote_inner, sa_info=None):
760    """Test null-crypt output path over an IPsec interface."""
761    if sa_info is None:
762      sa_info = tunnel.out_sa
763    local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
764                             _TEST_REMOTE_PORT)
765
766    # Read a tunneled IP packet on the underlying (outbound) network
767    # verifying that it is an ESP packet.
768    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
769                                  sa_info.seq_num, None, tunnel.local,
770                                  tunnel.remote)
771
772    # Get and update the IP headers on the inner payload so that we can do a simple
773    # comparison of byte data. Unfortunately, due to the scapy version this runs on,
774    # we cannot parse past the ESP header to the inner IP header, and thus have to
775    # workaround in this manner
776    if inner_version == 4:
777      ip_hdr_options = {
778        'id': scapy.IP(bytes(pkt.payload)[8:]).id,
779        'flags': scapy.IP(bytes(pkt.payload)[8:]).flags
780      }
781    else:
782      ip_hdr_options = {'fl': scapy.IPv6(bytes(pkt.payload)[8:]).fl}
783
784    expected = _GetNullAuthCryptTunnelModePkt(
785        inner_version, local_inner, tunnel.local, local_port, remote_inner,
786        tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num,
787        ip_hdr_options)
788
789    # Check outer header manually (Avoids having to overwrite outer header's
790    # id, flags or flow label)
791    self.assertSentPacket(tunnel, sa_info)
792    self.assertEqual(expected.src, pkt.src)
793    self.assertEqual(expected.dst, pkt.dst)
794    self.assertEqual(len(expected), len(pkt))
795
796    # Check everything else
797    self.assertEqual(bytes(expected.payload), bytes(pkt.payload))
798
799  def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner,
800                             remote_inner):
801    """Test both input and output paths over an encrypted IPsec interface.
802
803    This tests specifically makes sure that the both encryption and decryption
804    work together, as opposed to the _CheckTunnel(Input|Output) where the
805    input and output paths are tested separately, and using null encryption.
806    """
807    src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
808                           _TEST_REMOTE_PORT)
809
810    # Make sure it appeared on the underlying interface
811    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi,
812                                  tunnel.out_sa.seq_num, None, tunnel.local,
813                                  tunnel.remote)
814
815    # Check that packet is not sent in plaintext
816    self.assertTrue(bytes(net_test.UDP_PAYLOAD) not in bytes(pkt))
817
818    # Check src/dst
819    self.assertEqual(tunnel.local, pkt.src)
820    self.assertEqual(tunnel.remote, pkt.dst)
821
822    # Check that the interface statistics recorded the outbound packet
823    self.assertSentPacket(tunnel, tunnel.out_sa)
824
825    try:
826      # Swap the interface addresses to pretend we are the remote
827      self._SwapInterfaceAddress(
828          tunnel.iface, new_addr=remote_inner, old_addr=local_inner)
829
830      # Swap the packet's IP headers and write it back to the underlying
831      # network.
832      pkt = TunTwister.TwistPacket(pkt)
833      read_sock, local_port = _CreateReceiveSock(inner_version,
834                                                 _TEST_REMOTE_PORT)
835      self.ReceivePacketOn(tunnel.underlying_netid, pkt)
836
837      # Verify that the packet data and src are correct
838      data, src = read_sock.recvfrom(4096)
839      self.assertEqual(net_test.UDP_PAYLOAD, data)
840      self.assertEqual((local_inner, src_port), src[:2])
841
842      # Check that the interface statistics recorded the inbound packet
843      self.assertReceivedPacket(tunnel, tunnel.in_sa)
844    finally:
845      # Swap the interface addresses to pretend we are the remote
846      self._SwapInterfaceAddress(
847          tunnel.iface, new_addr=local_inner, old_addr=remote_inner)
848
849  def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner,
850                       sa_info=None):
851    """Test ICMP error path over an IPsec interface."""
852    if sa_info is None:
853      sa_info = tunnel.out_sa
854    # Now attempt to provoke an ICMP error.
855    # TODO: deduplicate with multinetwork_test.py.
856    dst_prefix, intermediate = {
857        4: ("172.19.", "172.16.9.12"),
858        6: ("2001:db8::", "2001:db8::1")
859    }[tunnel.version]
860
861    local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
862                             _TEST_REMOTE_PORT)
863    pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
864                                  sa_info.seq_num, None, tunnel.local,
865                                  tunnel.remote)
866    self.assertSentPacket(tunnel, sa_info)
867
868    myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid)
869    _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr,
870                                         pkt)
871    self.ReceivePacketOn(tunnel.underlying_netid, toobig)
872
873    # Check that the packet too big reduced the MTU.
874    routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None)
875    self.assertEqual(1, len(routes))
876    rtmsg, attributes = routes[0]
877    self.assertEqual(iproute.RTN_UNICAST, rtmsg.type)
878    self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
879
880    # Clear PMTU information so that future tests don't have to worry about it.
881    self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid)
882
883  def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner,
884                                     remote_inner):
885    """Test combined encryption path with ICMP errors over an IPsec tunnel"""
886    self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
887                                remote_inner)
888    self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner)
889    self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
890                                remote_inner)
891
892  def  _RebuildTunnel(self, tunnel, use_null_crypt):
893    # Some tests require that the out_seq_num and in_seq_num are the same
894    # (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1
895    #
896    # Until we get better scapy support, the only way we can build an
897    # encrypted packet is to send it out, and read the packet from the wire.
898    # We then generally use this as the "inbound" encrypted packet, injecting
899    # it into the interface for which it is expected on.
900    #
901    # As such, this is required to ensure that encrypted packets (which we
902    # currently have no way to easily modify) are not considered replay
903    # attacks by the inbound SA.  (eg: received 3 packets, seq_num_in = 3,
904    # sent only 1, # seq_num_out = 1, inbound SA would consider this a replay
905    # attack)
906    tunnel.TeardownXfrm()
907    tunnel.SetupXfrm(use_null_crypt)
908
909  def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
910    """Bootstrap method to setup and run tests for the given parameters."""
911    tunnel = self.randomTunnel(outer_version)
912
913    try:
914      self._RebuildTunnel(tunnel, use_null_crypt)
915
916      local_inner = tunnel.addrs[inner_version]
917      remote_inner = _GetRemoteInnerAddress(inner_version)
918
919      for i in range(2):
920        func(tunnel, inner_version, local_inner, remote_inner)
921    finally:
922      if use_null_crypt:
923        tunnel.TeardownXfrm()
924        tunnel.SetupXfrm(False)
925
926  def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner):
927    old_out_sa = tunnel.out_sa
928    old_in_sa = tunnel.in_sa
929
930    # Check to make sure that both directions work before rekey
931    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
932                           old_in_sa)
933    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
934                            old_out_sa)
935
936    # Rekey
937    outer_family = net_test.GetAddressFamily(tunnel.version)
938
939    # Create new SA
940    # Distinguish the new SAs with new SPIs.
941    new_out_sa = SaInfo(old_out_sa.spi + 1)
942    new_in_sa = SaInfo(old_in_sa.spi + 1)
943
944    # Perform Rekey
945    tunnel.Rekey(outer_family, new_out_sa, new_in_sa)
946
947    # Expect that the old SPI still works for inbound packets
948    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
949                           old_in_sa)
950
951    # Test both paths with new SPIs, expect outbound to use new SPI
952    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
953                           new_in_sa)
954    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
955                            new_out_sa)
956
957    # Delete old SAs
958    tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi)
959
960    # Test both paths with new SPIs; should still work
961    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
962                           new_in_sa)
963    self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
964                            new_out_sa)
965
966    # Expect failure upon trying to receive a packet with the deleted SPI
967    self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
968                           old_in_sa, True)
969
970  def _TestTunnelRekey(self, inner_version, outer_version):
971    """Test packet input and output over a Virtual Tunnel Interface."""
972    tunnel = self.randomTunnel(outer_version)
973
974    try:
975      # Always use null_crypt, so we can check input and output separately
976      tunnel.TeardownXfrm()
977      tunnel.SetupXfrm(True)
978
979      local_inner = tunnel.addrs[inner_version]
980      remote_inner = _GetRemoteInnerAddress(inner_version)
981
982      self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner)
983    finally:
984      tunnel.TeardownXfrm()
985      tunnel.SetupXfrm(False)
986
987
988class XfrmVtiTest(XfrmTunnelBase):
989
990  INTERFACE_CLASS = VtiInterface
991
992  def ParamTestVtiInput(self, inner_version, outer_version):
993    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
994
995  def ParamTestVtiOutput(self, inner_version, outer_version):
996    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
997                     True)
998
999  def ParamTestVtiInOutEncrypted(self, inner_version, outer_version):
1000    self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
1001                     False)
1002
1003  def ParamTestVtiIcmp(self, inner_version, outer_version):
1004    self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
1005
1006  def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version):
1007    self._TestTunnel(inner_version, outer_version,
1008                     self._CheckTunnelEncryptionWithIcmp, False)
1009
1010  def ParamTestVtiRekey(self, inner_version, outer_version):
1011    self._TestTunnelRekey(inner_version, outer_version)
1012
1013
1014@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
1015class XfrmInterfaceTest(XfrmTunnelBase):
1016
1017  INTERFACE_CLASS = XfrmInterface
1018
1019  def ParamTestXfrmIntfInput(self, inner_version, outer_version):
1020    self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
1021
1022  def ParamTestXfrmIntfOutput(self, inner_version, outer_version):
1023    self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
1024                     True)
1025
1026  def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version):
1027    self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
1028                     False)
1029
1030  def ParamTestXfrmIntfIcmp(self, inner_version, outer_version):
1031    self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
1032
1033  def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
1034    self._TestTunnel(inner_version, outer_version,
1035                     self._CheckTunnelEncryptionWithIcmp, False)
1036
1037  def ParamTestXfrmIntfRekey(self, inner_version, outer_version):
1038    self._TestTunnelRekey(inner_version, outer_version)
1039
1040##############################################################################
1041#
1042# Test for presence of CONFIG_XFRM_MIGRATE and kernel patches
1043#
1044#   xfrm: Check if_id in xfrm_migrate
1045#   Upstream commit: c1aca3080e382886e2e58e809787441984a2f89b
1046#
1047#   xfrm: Fix xfrm migrate issues when address family changes
1048#   Upstream commit: e03c3bba351f99ad932e8f06baa9da1afc418e02
1049#
1050# Those two upstream 5.17 fixes above were pulled in to LTS in kernel versions
1051# 4.14.273, 4.19.236, 5.4.186, 5.10.107, 5.15.30.
1052#
1053@unittest.skipUnless(SUPPORTS_XFRM_MIGRATE,
1054                     "XFRM migration unsupported or fixes not included")
1055class XfrmInterfaceMigrateTest(XfrmTunnelBase):
1056  INTERFACE_CLASS = XfrmInterface
1057
1058  def setUpTunnel(self, outer_version, use_null_crypt):
1059    underlying_netid = self.RandomNetid()
1060    netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET
1061    iface = "ipsec%s" % netid
1062    ifindex = self.ifindices[underlying_netid]
1063
1064    local = self.MyAddress(outer_version, underlying_netid)
1065    remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR
1066
1067    tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex,
1068                           local, remote, outer_version, use_null_crypt)
1069    self._SetInboundMarking(netid, iface, True)
1070    self._SetupTunnelNetwork(tunnel, True)
1071
1072    return tunnel
1073
1074  def tearDownTunnel(self, tunnel):
1075    self._SetInboundMarking(tunnel.netid, tunnel.iface, False)
1076    self._SetupTunnelNetwork(tunnel, False)
1077    tunnel.Teardown()
1078
1079  def _TestTunnel(self, inner_version, outer_version, new_outer_version, func,
1080                  use_null_crypt):
1081    tunnel = self.randomTunnel(outer_version)
1082
1083    old_underlying_netid = tunnel.underlying_netid
1084    old_local = tunnel.local
1085    old_remote = tunnel.remote
1086
1087
1088    try:
1089      self._RebuildTunnel(tunnel, use_null_crypt)
1090
1091      # Verify functionality before migration
1092      local_inner = tunnel.addrs[inner_version]
1093      remote_inner = _GetRemoteInnerAddress(inner_version)
1094      func(tunnel, inner_version, local_inner, remote_inner)
1095
1096      # Migrate tunnel
1097      new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid)
1098      new_version = new_outer_version
1099      new_local = self.MyAddress(new_version, new_underlying_netid)
1100      new_remote = net_test.IPV4_ADDR2 if new_version == 4 else net_test.IPV6_ADDR2
1101
1102      tunnel.Migrate(new_underlying_netid, new_local, new_remote)
1103
1104      # Verify functionality after migration
1105      func(tunnel, inner_version, local_inner, remote_inner)
1106    finally:
1107      # Reset the tunnel to the original configuration
1108      tunnel.TeardownXfrm()
1109
1110      self.local = old_local
1111      self.remote = old_remote
1112      self.underlying_netid = old_underlying_netid
1113      tunnel.SetupXfrm(False)
1114
1115
1116  def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version,
1117                                    new_outer_version):
1118    self._TestTunnel(inner_version, outer_version, new_outer_version,
1119                     self._CheckTunnelInput, True)
1120
1121  def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version,
1122                                     new_outer_version):
1123    self._TestTunnel(inner_version, outer_version, new_outer_version,
1124                     self._CheckTunnelOutput, True)
1125
1126  def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version,
1127                                             new_outer_version):
1128    self._TestTunnel(inner_version, outer_version, new_outer_version,
1129                     self._CheckTunnelEncryption, False)
1130
1131  def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version,
1132                                   new_outer_version):
1133    self._TestTunnel(inner_version, outer_version, new_outer_version,
1134                     self._CheckTunnelIcmp, False)
1135
1136  def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version,
1137                                                 new_outer_version):
1138    self._TestTunnel(inner_version, outer_version, new_outer_version,
1139                     self._CheckTunnelEncryptionWithIcmp, False)
1140
1141  def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version,
1142                                    new_outer_version):
1143    self._TestTunnel(inner_version, outer_version, new_outer_version,
1144                     self._CheckTunnelRekey, True)
1145
1146if __name__ == "__main__":
1147  InjectTests()
1148  unittest.main()
1149