#!/usr/bin/python # # Copyright 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from socket import * # pylint: disable=wildcard-import from scapy import all as scapy import struct import csocket import cstruct import multinetwork_base import net_test import util import xfrm _ENCRYPTION_KEY_256 = ("308146eb3bd84b044573d60f5a5fd159" "57c7d4fe567a2120f35bae0f9869ec22".decode("hex")) _AUTHENTICATION_KEY_128 = "af442892cdcd0ef650e9c299f9a8436a".decode("hex") _ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth(("digest_null", 0, 0)), "") _ALGO_HMAC_SHA1 = (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 128, 96)), _AUTHENTICATION_KEY_128) _ALGO_CRYPT_NULL = (xfrm.XfrmAlgo(("ecb(cipher_null)", 0)), "") _ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)), _ENCRYPTION_KEY_256) # Match all bits of the mark MARK_MASK_ALL = 0xffffffff def SetPolicySockopt(sock, family, opt_data): optlen = len(opt_data) if opt_data is not None else 0 if family == AF_INET: csocket.Setsockopt(sock, IPPROTO_IP, xfrm.IP_XFRM_POLICY, opt_data, optlen) else: csocket.Setsockopt(sock, IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, opt_data, optlen) def ApplySocketPolicy(sock, family, direction, spi, reqid, tun_addrs): """Create and apply an ESP policy to a socket. A socket may have only one policy per direction, so applying a policy will remove any policy that was previously applied in that direction. Args: sock: The socket that needs a policy family: AF_INET or AF_INET6 direction: XFRM_POLICY_IN or XFRM_POLICY_OUT spi: 32-bit SPI in host byte order reqid: 32-bit ID matched against SAs tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None to request a transport mode SA. """ # Create a selector that matches all packets of the specified address family. selector = xfrm.EmptySelector(family) # Create an XFRM policy and template. policy = xfrm.UserPolicy(direction, selector) template = xfrm.UserTemplate(family, spi, reqid, tun_addrs) # Set the policy and template on our socket. opt_data = policy.Pack() + template.Pack() # The policy family might not match the socket family. For example, we might # have an IPv4 policy on a dual-stack socket. sockfamily = sock.getsockopt(SOL_SOCKET, net_test.SO_DOMAIN) SetPolicySockopt(sock, sockfamily, opt_data) def _GetCryptParameters(crypt_alg): """Looks up encryption algorithm's block and IV lengths. Args: crypt_alg: the encryption algorithm constant Returns: A tuple of the block size, and IV length """ cryptParameters = { _ALGO_CRYPT_NULL: (4, 0), _ALGO_CBC_AES_256: (16, 16) } return cryptParameters.get(crypt_alg, (0, 0)) def GetEspPacketLength(mode, version, udp_encap, payload, auth_alg, crypt_alg): """Calculates encrypted length of a UDP packet with the given payload. Args: mode: XFRM_MODE_TRANSPORT or XFRM_MODE_TUNNEL. version: IPPROTO_IP for IPv4, IPPROTO_IPV6 for IPv6. The inner header. udp_encap: whether UDP encap overhead should be accounted for. Since the outermost IP header is ignored (payload only), only add for udp encap'd packets. payload: UDP payload bytes. auth_alg: The xfrm_base authentication algorithm used in the SA. crypt_alg: The xfrm_base encryption algorithm used in the SA. Return: the packet length. """ crypt_iv_len, crypt_blk_size=_GetCryptParameters(crypt_alg) auth_trunc_len = auth_alg[0].trunc_len # Wrap in UDP payload payload_len = len(payload) + net_test.UDP_HDR_LEN # Size constants esp_hdr_len = len(xfrm.EspHdr) # SPI + Seq number icv_len = auth_trunc_len / 8 # Add inner IP header if tunnel mode if mode == xfrm.XFRM_MODE_TUNNEL: payload_len += net_test.GetIpHdrLength(version) # Add ESP trailer payload_len += 2 # Pad Length + Next Header fields # Align to block size of encryption algorithm payload_len += util.GetPadLength(crypt_blk_size, payload_len) # Add initialization vector, header length and ICV length payload_len += esp_hdr_len + crypt_iv_len + icv_len # Add encap as needed if udp_encap: payload_len += net_test.UDP_HDR_LEN return payload_len def EncryptPacketWithNull(packet, spi, seq, tun_addrs): """Apply null encryption to a packet. This performs ESP encapsulation on the given packet. The returned packet will be a tunnel mode packet if tun_addrs is provided. The input packet is assumed to be a UDP packet. The input packet *MUST* have its length and checksum fields in IP and UDP headers set appropriately. This can be done by "rebuilding" the scapy object. e.g., ip6_packet = scapy.IPv6(str(ip6_packet)) TODO: Support TCP Args: packet: a scapy.IPv6 or scapy.IP packet spi: security parameter index for ESP header in host byte order seq: sequence number for ESP header tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None to request a transport mode packet. Return: The encrypted packet (scapy.IPv6 or scapy.IP) """ # The top-level packet changes in tunnel mode, which would invalidate # the passed-in packet pointer. For consistency, this function now returns # a new packet and does not modify the user's original packet. packet = packet.copy() udp_layer = packet.getlayer(scapy.UDP) if not udp_layer: raise ValueError("Expected a UDP packet") # Build an ESP header. esp_packet = scapy.Raw(xfrm.EspHdr((spi, seq)).Pack()) if tun_addrs: tsrc_addr, tdst_addr = tun_addrs outer_version = net_test.GetAddressVersion(tsrc_addr) ip_type = {4: scapy.IP, 6: scapy.IPv6}[outer_version] new_ip_layer = ip_type(src=tsrc_addr, dst=tdst_addr) inner_layer = packet esp_nexthdr = {scapy.IPv6: IPPROTO_IPV6, scapy.IP: IPPROTO_IPIP}[type(packet)] else: new_ip_layer = None inner_layer = udp_layer esp_nexthdr = IPPROTO_UDP # ESP padding per RFC 4303 section 2.4. # For a null cipher with a block size of 1, padding is only necessary to # ensure that the 1-byte Pad Length and Next Header fields are right aligned # on a 4-byte boundary. esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header. padlen = util.GetPadLength(4, esplen) # The pad bytes are consecutive integers starting from 0x01. padding = "".join((chr(i) for i in range(1, padlen + 1))) trailer = padding + struct.pack("BB", padlen, esp_nexthdr) # Assemble the packet. esp_packet.payload = scapy.Raw(inner_layer) packet = new_ip_layer if new_ip_layer else packet packet.payload = scapy.Raw(str(esp_packet) + trailer) # TODO: Can we simplify this and avoid the initial copy()? # Fix the IPv4/IPv6 headers. if type(packet) is scapy.IPv6: packet.nh = IPPROTO_ESP # Recompute plen. packet.plen = None packet = scapy.IPv6(str(packet)) elif type(packet) is scapy.IP: packet.proto = IPPROTO_ESP # Recompute IPv4 len and checksum. packet.len = None packet.chksum = None packet = scapy.IP(str(packet)) else: raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) return packet def DecryptPacketWithNull(packet): """Apply null decryption to a packet. This performs ESP decapsulation on the given packet. The input packet is assumed to be a UDP packet. This function will remove the ESP header and trailer bytes from an ESP packet. TODO: Support TCP Args: packet: a scapy.IPv6 or scapy.IP packet Returns: A tuple of decrypted packet (scapy.IPv6 or scapy.IP) and EspHdr """ esp_hdr, esp_data = cstruct.Read(str(packet.payload), xfrm.EspHdr) # Parse and strip ESP trailer. pad_len, esp_nexthdr = struct.unpack("BB", esp_data[-2:]) trailer_len = pad_len + 2 # Add the size of the pad_len and next_hdr fields. LayerType = { IPPROTO_IPIP: scapy.IP, IPPROTO_IPV6: scapy.IPv6, IPPROTO_UDP: scapy.UDP}[esp_nexthdr] next_layer = LayerType(esp_data[:-trailer_len]) if esp_nexthdr in [IPPROTO_IPIP, IPPROTO_IPV6]: # Tunnel mode decap is simple. Return the inner packet. return next_layer, esp_hdr # Cut out the ESP header. packet.payload = next_layer # Fix the IPv4/IPv6 headers. if type(packet) is scapy.IPv6: packet.nh = IPPROTO_UDP packet.plen = None # Recompute packet length. packet = scapy.IPv6(str(packet)) elif type(packet) is scapy.IP: packet.proto = IPPROTO_UDP packet.len = None # Recompute packet length. packet.chksum = None # Recompute IPv4 checksum. packet = scapy.IP(str(packet)) else: raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) return packet, esp_hdr class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest): """Base test class for all XFRM-related testing.""" def _isIcmpv6(self, payload): if not isinstance(payload, scapy.IPv6): return False if payload.nh == IPPROTO_ICMPV6: return True return payload.nh == IPPROTO_HOPOPTS and payload.payload.nh == IPPROTO_ICMPV6 def _ExpectEspPacketOn(self, netid, spi, seq, length, src_addr, dst_addr): """Read a packet from a netid and verify its properties. Args: netid: netid from which to read an ESP packet spi: SPI of the ESP packet in host byte order seq: sequence number of the ESP packet length: length of the packet's ESP payload or None to skip this check src_addr: source address of the packet or None to skip this check dst_addr: destination address of the packet or None to skip this check Returns: scapy.IP/IPv6: the read packet """ packets = [] for packet in self.ReadAllPacketsOn(netid): if not self._isIcmpv6(packet): packets.append(packet) self.assertEqual(1, len(packets)) packet = packets[0] if length is not None: self.assertEqual(length, len(packet.payload)) if dst_addr is not None: self.assertEqual(dst_addr, packet.dst) if src_addr is not None: self.assertEqual(src_addr, packet.src) # extract the ESP header esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr) self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr) return packet # TODO: delete this when we're more diligent about deleting our SAs. class XfrmLazyTest(XfrmBaseTest): """Base test class Xfrm tests that cleans XFRM state on teardown.""" def setUp(self): super(XfrmBaseTest, self).setUp() self.xfrm = xfrm.Xfrm() self.xfrm.FlushSaInfo() self.xfrm.FlushPolicyInfo() def tearDown(self): super(XfrmBaseTest, self).tearDown() self.xfrm.FlushSaInfo() self.xfrm.FlushPolicyInfo()