• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17from socket import *  # pylint: disable=wildcard-import
18from scapy import all as scapy
19import struct
20
21import csocket
22import cstruct
23import multinetwork_base
24import net_test
25import util
26import xfrm
27
28_ENCRYPTION_KEY_256 = ("308146eb3bd84b044573d60f5a5fd159"
29                       "57c7d4fe567a2120f35bae0f9869ec22".decode("hex"))
30_AUTHENTICATION_KEY_128 = "af442892cdcd0ef650e9c299f9a8436a".decode("hex")
31
32_ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth(("digest_null", 0, 0)), "")
33_ALGO_HMAC_SHA1 = (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 128, 96)),
34                   _AUTHENTICATION_KEY_128)
35
36_ALGO_CRYPT_NULL = (xfrm.XfrmAlgo(("ecb(cipher_null)", 0)), "")
37_ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)),
38                     _ENCRYPTION_KEY_256)
39
40# Match all bits of the mark
41MARK_MASK_ALL = 0xffffffff
42
43
44def SetPolicySockopt(sock, family, opt_data):
45  optlen = len(opt_data) if opt_data is not None else 0
46  if family == AF_INET:
47    csocket.Setsockopt(sock, IPPROTO_IP, xfrm.IP_XFRM_POLICY, opt_data, optlen)
48  else:
49    csocket.Setsockopt(sock, IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, opt_data,
50                       optlen)
51
52
53def ApplySocketPolicy(sock, family, direction, spi, reqid, tun_addrs):
54  """Create and apply an ESP policy to a socket.
55
56  A socket may have only one policy per direction, so applying a policy will
57  remove any policy that was previously applied in that direction.
58
59  Args:
60    sock: The socket that needs a policy
61    family: AF_INET or AF_INET6
62    direction: XFRM_POLICY_IN or XFRM_POLICY_OUT
63    spi: 32-bit SPI in host byte order
64    reqid: 32-bit ID matched against SAs
65    tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None
66      to request a transport mode SA.
67  """
68  # Create a selector that matches all packets of the specified address family.
69  selector = xfrm.EmptySelector(family)
70
71  # Create an XFRM policy and template.
72  policy = xfrm.UserPolicy(direction, selector)
73  template = xfrm.UserTemplate(family, spi, reqid, tun_addrs)
74
75  # Set the policy and template on our socket.
76  opt_data = policy.Pack() + template.Pack()
77
78  # The policy family might not match the socket family. For example, we might
79  # have an IPv4 policy on a dual-stack socket.
80  sockfamily = sock.getsockopt(SOL_SOCKET, net_test.SO_DOMAIN)
81  SetPolicySockopt(sock, sockfamily, opt_data)
82
83def _GetCryptParameters(crypt_alg):
84  """Looks up encryption algorithm's block and IV lengths.
85
86  Args:
87    crypt_alg: the encryption algorithm constant
88  Returns:
89    A tuple of the block size, and IV length
90  """
91  cryptParameters = {
92    _ALGO_CRYPT_NULL: (4, 0),
93    _ALGO_CBC_AES_256: (16, 16)
94  }
95
96  return cryptParameters.get(crypt_alg, (0, 0))
97
98def GetEspPacketLength(mode, version, udp_encap, payload,
99                       auth_alg, crypt_alg):
100  """Calculates encrypted length of a UDP packet with the given payload.
101
102  Args:
103    mode: XFRM_MODE_TRANSPORT or XFRM_MODE_TUNNEL.
104    version: IPPROTO_IP for IPv4, IPPROTO_IPV6 for IPv6. The inner header.
105    udp_encap: whether UDP encap overhead should be accounted for. Since the
106               outermost IP header is ignored (payload only), only add for udp
107               encap'd packets.
108    payload: UDP payload bytes.
109    auth_alg: The xfrm_base authentication algorithm used in the SA.
110    crypt_alg: The xfrm_base encryption algorithm used in the SA.
111
112  Return: the packet length.
113  """
114
115  crypt_iv_len, crypt_blk_size=_GetCryptParameters(crypt_alg)
116  auth_trunc_len = auth_alg[0].trunc_len
117
118  # Wrap in UDP payload
119  payload_len = len(payload) + net_test.UDP_HDR_LEN
120
121  # Size constants
122  esp_hdr_len = len(xfrm.EspHdr) # SPI + Seq number
123  icv_len = auth_trunc_len / 8
124
125  # Add inner IP header if tunnel mode
126  if mode == xfrm.XFRM_MODE_TUNNEL:
127    payload_len += net_test.GetIpHdrLength(version)
128
129  # Add ESP trailer
130  payload_len += 2 # Pad Length + Next Header fields
131
132  # Align to block size of encryption algorithm
133  payload_len += util.GetPadLength(crypt_blk_size, payload_len)
134
135  # Add initialization vector, header length and ICV length
136  payload_len += esp_hdr_len + crypt_iv_len + icv_len
137
138  # Add encap as needed
139  if udp_encap:
140    payload_len += net_test.UDP_HDR_LEN
141
142  return payload_len
143
144
145def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
146  """Apply null encryption to a packet.
147
148  This performs ESP encapsulation on the given packet. The returned packet will
149  be a tunnel mode packet if tun_addrs is provided.
150
151  The input packet is assumed to be a UDP packet. The input packet *MUST* have
152  its length and checksum fields in IP and UDP headers set appropriately. This
153  can be done by "rebuilding" the scapy object. e.g.,
154      ip6_packet = scapy.IPv6(str(ip6_packet))
155
156  TODO: Support TCP
157
158  Args:
159    packet: a scapy.IPv6 or scapy.IP packet
160    spi: security parameter index for ESP header in host byte order
161    seq: sequence number for ESP header
162    tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None
163      to request a transport mode packet.
164
165  Return:
166    The encrypted packet (scapy.IPv6 or scapy.IP)
167  """
168  # The top-level packet changes in tunnel mode, which would invalidate
169  # the passed-in packet pointer. For consistency, this function now returns
170  # a new packet and does not modify the user's original packet.
171  packet = packet.copy()
172  udp_layer = packet.getlayer(scapy.UDP)
173  if not udp_layer:
174    raise ValueError("Expected a UDP packet")
175  # Build an ESP header.
176  esp_packet = scapy.Raw(xfrm.EspHdr((spi, seq)).Pack())
177
178  if tun_addrs:
179    tsrc_addr, tdst_addr = tun_addrs
180    outer_version = net_test.GetAddressVersion(tsrc_addr)
181    ip_type = {4: scapy.IP, 6: scapy.IPv6}[outer_version]
182    new_ip_layer = ip_type(src=tsrc_addr, dst=tdst_addr)
183    inner_layer = packet
184    esp_nexthdr = {scapy.IPv6: IPPROTO_IPV6,
185                   scapy.IP: IPPROTO_IPIP}[type(packet)]
186  else:
187    new_ip_layer = None
188    inner_layer = udp_layer
189    esp_nexthdr = IPPROTO_UDP
190
191
192  # ESP padding per RFC 4303 section 2.4.
193  # For a null cipher with a block size of 1, padding is only necessary to
194  # ensure that the 1-byte Pad Length and Next Header fields are right aligned
195  # on a 4-byte boundary.
196  esplen = (len(inner_layer) + 2)  # UDP length plus Pad Length and Next Header.
197  padlen = util.GetPadLength(4, esplen)
198  # The pad bytes are consecutive integers starting from 0x01.
199  padding = "".join((chr(i) for i in range(1, padlen + 1)))
200  trailer = padding + struct.pack("BB", padlen, esp_nexthdr)
201
202  # Assemble the packet.
203  esp_packet.payload = scapy.Raw(inner_layer)
204  packet = new_ip_layer if new_ip_layer else packet
205  packet.payload = scapy.Raw(str(esp_packet) + trailer)
206
207  # TODO: Can we simplify this and avoid the initial copy()?
208  # Fix the IPv4/IPv6 headers.
209  if type(packet) is scapy.IPv6:
210    packet.nh = IPPROTO_ESP
211    # Recompute plen.
212    packet.plen = None
213    packet = scapy.IPv6(str(packet))
214  elif type(packet) is scapy.IP:
215    packet.proto = IPPROTO_ESP
216    # Recompute IPv4 len and checksum.
217    packet.len = None
218    packet.chksum = None
219    packet = scapy.IP(str(packet))
220  else:
221    raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet))
222  return packet
223
224
225def DecryptPacketWithNull(packet):
226  """Apply null decryption to a packet.
227
228  This performs ESP decapsulation on the given packet. The input packet is
229  assumed to be a UDP packet. This function will remove the ESP header and
230  trailer bytes from an ESP packet.
231
232  TODO: Support TCP
233
234  Args:
235    packet: a scapy.IPv6 or scapy.IP packet
236
237  Returns:
238    A tuple of decrypted packet (scapy.IPv6 or scapy.IP) and EspHdr
239  """
240  esp_hdr, esp_data = cstruct.Read(str(packet.payload), xfrm.EspHdr)
241  # Parse and strip ESP trailer.
242  pad_len, esp_nexthdr = struct.unpack("BB", esp_data[-2:])
243  trailer_len = pad_len + 2 # Add the size of the pad_len and next_hdr fields.
244  LayerType = {
245          IPPROTO_IPIP: scapy.IP,
246          IPPROTO_IPV6: scapy.IPv6,
247          IPPROTO_UDP: scapy.UDP}[esp_nexthdr]
248  next_layer = LayerType(esp_data[:-trailer_len])
249  if esp_nexthdr in [IPPROTO_IPIP, IPPROTO_IPV6]:
250    # Tunnel mode decap is simple. Return the inner packet.
251    return next_layer, esp_hdr
252
253  # Cut out the ESP header.
254  packet.payload = next_layer
255  # Fix the IPv4/IPv6 headers.
256  if type(packet) is scapy.IPv6:
257    packet.nh = IPPROTO_UDP
258    packet.plen = None # Recompute packet length.
259    packet = scapy.IPv6(str(packet))
260  elif type(packet) is scapy.IP:
261    packet.proto = IPPROTO_UDP
262    packet.len = None # Recompute packet length.
263    packet.chksum = None # Recompute IPv4 checksum.
264    packet = scapy.IP(str(packet))
265  else:
266    raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet))
267  return packet, esp_hdr
268
269
270class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest):
271  """Base test class for all XFRM-related testing."""
272
273  def _isIcmpv6(self, payload):
274    if not isinstance(payload, scapy.IPv6):
275      return False
276    if payload.nh == IPPROTO_ICMPV6:
277      return True
278    return payload.nh == IPPROTO_HOPOPTS and payload.payload.nh == IPPROTO_ICMPV6
279
280  def _ExpectEspPacketOn(self, netid, spi, seq, length, src_addr, dst_addr):
281    """Read a packet from a netid and verify its properties.
282
283    Args:
284      netid: netid from which to read an ESP packet
285      spi: SPI of the ESP packet in host byte order
286      seq: sequence number of the ESP packet
287      length: length of the packet's ESP payload or None to skip this check
288      src_addr: source address of the packet or None to skip this check
289      dst_addr: destination address of the packet or None to skip this check
290
291    Returns:
292      scapy.IP/IPv6: the read packet
293    """
294    packets = []
295    for packet in self.ReadAllPacketsOn(netid):
296      if not self._isIcmpv6(packet):
297        packets.append(packet)
298
299    self.assertEqual(1, len(packets))
300    packet = packets[0]
301    if length is not None:
302      self.assertEqual(length, len(packet.payload))
303    if dst_addr is not None:
304      self.assertEqual(dst_addr, packet.dst)
305    if src_addr is not None:
306      self.assertEqual(src_addr, packet.src)
307    # extract the ESP header
308    esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr)
309    self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr)
310    return packet
311
312
313# TODO: delete this when we're more diligent about deleting our SAs.
314class XfrmLazyTest(XfrmBaseTest):
315  """Base test class Xfrm tests that cleans XFRM state on teardown."""
316  def setUp(self):
317    super(XfrmBaseTest, self).setUp()
318    self.xfrm = xfrm.Xfrm()
319    self.xfrm.FlushSaInfo()
320    self.xfrm.FlushPolicyInfo()
321
322  def tearDown(self):
323    super(XfrmBaseTest, self).tearDown()
324    self.xfrm.FlushSaInfo()
325    self.xfrm.FlushPolicyInfo()
326