1#!/usr/bin/python 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from socket import * # pylint: disable=wildcard-import 18from scapy import all as scapy 19import struct 20 21import csocket 22import cstruct 23import multinetwork_base 24import net_test 25import util 26import xfrm 27 28_ENCRYPTION_KEY_256 = ("308146eb3bd84b044573d60f5a5fd159" 29 "57c7d4fe567a2120f35bae0f9869ec22".decode("hex")) 30_AUTHENTICATION_KEY_128 = "af442892cdcd0ef650e9c299f9a8436a".decode("hex") 31 32_ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth(("digest_null", 0, 0)), "") 33_ALGO_HMAC_SHA1 = (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 128, 96)), 34 _AUTHENTICATION_KEY_128) 35 36_ALGO_CRYPT_NULL = (xfrm.XfrmAlgo(("ecb(cipher_null)", 0)), "") 37_ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)), 38 _ENCRYPTION_KEY_256) 39 40# Match all bits of the mark 41MARK_MASK_ALL = 0xffffffff 42 43 44def SetPolicySockopt(sock, family, opt_data): 45 optlen = len(opt_data) if opt_data is not None else 0 46 if family == AF_INET: 47 csocket.Setsockopt(sock, IPPROTO_IP, xfrm.IP_XFRM_POLICY, opt_data, optlen) 48 else: 49 csocket.Setsockopt(sock, IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, opt_data, 50 optlen) 51 52 53def ApplySocketPolicy(sock, family, direction, spi, reqid, tun_addrs): 54 """Create and apply an ESP policy to a socket. 55 56 A socket may have only one policy per direction, so applying a policy will 57 remove any policy that was previously applied in that direction. 58 59 Args: 60 sock: The socket that needs a policy 61 family: AF_INET or AF_INET6 62 direction: XFRM_POLICY_IN or XFRM_POLICY_OUT 63 spi: 32-bit SPI in host byte order 64 reqid: 32-bit ID matched against SAs 65 tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None 66 to request a transport mode SA. 67 """ 68 # Create a selector that matches all packets of the specified address family. 69 selector = xfrm.EmptySelector(family) 70 71 # Create an XFRM policy and template. 72 policy = xfrm.UserPolicy(direction, selector) 73 template = xfrm.UserTemplate(family, spi, reqid, tun_addrs) 74 75 # Set the policy and template on our socket. 76 opt_data = policy.Pack() + template.Pack() 77 78 # The policy family might not match the socket family. For example, we might 79 # have an IPv4 policy on a dual-stack socket. 80 sockfamily = sock.getsockopt(SOL_SOCKET, net_test.SO_DOMAIN) 81 SetPolicySockopt(sock, sockfamily, opt_data) 82 83def _GetCryptParameters(crypt_alg): 84 """Looks up encryption algorithm's block and IV lengths. 85 86 Args: 87 crypt_alg: the encryption algorithm constant 88 Returns: 89 A tuple of the block size, and IV length 90 """ 91 cryptParameters = { 92 _ALGO_CRYPT_NULL: (4, 0), 93 _ALGO_CBC_AES_256: (16, 16) 94 } 95 96 return cryptParameters.get(crypt_alg, (0, 0)) 97 98def GetEspPacketLength(mode, version, udp_encap, payload, 99 auth_alg, crypt_alg): 100 """Calculates encrypted length of a UDP packet with the given payload. 101 102 Args: 103 mode: XFRM_MODE_TRANSPORT or XFRM_MODE_TUNNEL. 104 version: IPPROTO_IP for IPv4, IPPROTO_IPV6 for IPv6. The inner header. 105 udp_encap: whether UDP encap overhead should be accounted for. Since the 106 outermost IP header is ignored (payload only), only add for udp 107 encap'd packets. 108 payload: UDP payload bytes. 109 auth_alg: The xfrm_base authentication algorithm used in the SA. 110 crypt_alg: The xfrm_base encryption algorithm used in the SA. 111 112 Return: the packet length. 113 """ 114 115 crypt_iv_len, crypt_blk_size=_GetCryptParameters(crypt_alg) 116 auth_trunc_len = auth_alg[0].trunc_len 117 118 # Wrap in UDP payload 119 payload_len = len(payload) + net_test.UDP_HDR_LEN 120 121 # Size constants 122 esp_hdr_len = len(xfrm.EspHdr) # SPI + Seq number 123 icv_len = auth_trunc_len / 8 124 125 # Add inner IP header if tunnel mode 126 if mode == xfrm.XFRM_MODE_TUNNEL: 127 payload_len += net_test.GetIpHdrLength(version) 128 129 # Add ESP trailer 130 payload_len += 2 # Pad Length + Next Header fields 131 132 # Align to block size of encryption algorithm 133 payload_len += util.GetPadLength(crypt_blk_size, payload_len) 134 135 # Add initialization vector, header length and ICV length 136 payload_len += esp_hdr_len + crypt_iv_len + icv_len 137 138 # Add encap as needed 139 if udp_encap: 140 payload_len += net_test.UDP_HDR_LEN 141 142 return payload_len 143 144 145def EncryptPacketWithNull(packet, spi, seq, tun_addrs): 146 """Apply null encryption to a packet. 147 148 This performs ESP encapsulation on the given packet. The returned packet will 149 be a tunnel mode packet if tun_addrs is provided. 150 151 The input packet is assumed to be a UDP packet. The input packet *MUST* have 152 its length and checksum fields in IP and UDP headers set appropriately. This 153 can be done by "rebuilding" the scapy object. e.g., 154 ip6_packet = scapy.IPv6(str(ip6_packet)) 155 156 TODO: Support TCP 157 158 Args: 159 packet: a scapy.IPv6 or scapy.IP packet 160 spi: security parameter index for ESP header in host byte order 161 seq: sequence number for ESP header 162 tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None 163 to request a transport mode packet. 164 165 Return: 166 The encrypted packet (scapy.IPv6 or scapy.IP) 167 """ 168 # The top-level packet changes in tunnel mode, which would invalidate 169 # the passed-in packet pointer. For consistency, this function now returns 170 # a new packet and does not modify the user's original packet. 171 packet = packet.copy() 172 udp_layer = packet.getlayer(scapy.UDP) 173 if not udp_layer: 174 raise ValueError("Expected a UDP packet") 175 # Build an ESP header. 176 esp_packet = scapy.Raw(xfrm.EspHdr((spi, seq)).Pack()) 177 178 if tun_addrs: 179 tsrc_addr, tdst_addr = tun_addrs 180 outer_version = net_test.GetAddressVersion(tsrc_addr) 181 ip_type = {4: scapy.IP, 6: scapy.IPv6}[outer_version] 182 new_ip_layer = ip_type(src=tsrc_addr, dst=tdst_addr) 183 inner_layer = packet 184 esp_nexthdr = {scapy.IPv6: IPPROTO_IPV6, 185 scapy.IP: IPPROTO_IPIP}[type(packet)] 186 else: 187 new_ip_layer = None 188 inner_layer = udp_layer 189 esp_nexthdr = IPPROTO_UDP 190 191 192 # ESP padding per RFC 4303 section 2.4. 193 # For a null cipher with a block size of 1, padding is only necessary to 194 # ensure that the 1-byte Pad Length and Next Header fields are right aligned 195 # on a 4-byte boundary. 196 esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header. 197 padlen = util.GetPadLength(4, esplen) 198 # The pad bytes are consecutive integers starting from 0x01. 199 padding = "".join((chr(i) for i in range(1, padlen + 1))) 200 trailer = padding + struct.pack("BB", padlen, esp_nexthdr) 201 202 # Assemble the packet. 203 esp_packet.payload = scapy.Raw(inner_layer) 204 packet = new_ip_layer if new_ip_layer else packet 205 packet.payload = scapy.Raw(str(esp_packet) + trailer) 206 207 # TODO: Can we simplify this and avoid the initial copy()? 208 # Fix the IPv4/IPv6 headers. 209 if type(packet) is scapy.IPv6: 210 packet.nh = IPPROTO_ESP 211 # Recompute plen. 212 packet.plen = None 213 packet = scapy.IPv6(str(packet)) 214 elif type(packet) is scapy.IP: 215 packet.proto = IPPROTO_ESP 216 # Recompute IPv4 len and checksum. 217 packet.len = None 218 packet.chksum = None 219 packet = scapy.IP(str(packet)) 220 else: 221 raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) 222 return packet 223 224 225def DecryptPacketWithNull(packet): 226 """Apply null decryption to a packet. 227 228 This performs ESP decapsulation on the given packet. The input packet is 229 assumed to be a UDP packet. This function will remove the ESP header and 230 trailer bytes from an ESP packet. 231 232 TODO: Support TCP 233 234 Args: 235 packet: a scapy.IPv6 or scapy.IP packet 236 237 Returns: 238 A tuple of decrypted packet (scapy.IPv6 or scapy.IP) and EspHdr 239 """ 240 esp_hdr, esp_data = cstruct.Read(str(packet.payload), xfrm.EspHdr) 241 # Parse and strip ESP trailer. 242 pad_len, esp_nexthdr = struct.unpack("BB", esp_data[-2:]) 243 trailer_len = pad_len + 2 # Add the size of the pad_len and next_hdr fields. 244 LayerType = { 245 IPPROTO_IPIP: scapy.IP, 246 IPPROTO_IPV6: scapy.IPv6, 247 IPPROTO_UDP: scapy.UDP}[esp_nexthdr] 248 next_layer = LayerType(esp_data[:-trailer_len]) 249 if esp_nexthdr in [IPPROTO_IPIP, IPPROTO_IPV6]: 250 # Tunnel mode decap is simple. Return the inner packet. 251 return next_layer, esp_hdr 252 253 # Cut out the ESP header. 254 packet.payload = next_layer 255 # Fix the IPv4/IPv6 headers. 256 if type(packet) is scapy.IPv6: 257 packet.nh = IPPROTO_UDP 258 packet.plen = None # Recompute packet length. 259 packet = scapy.IPv6(str(packet)) 260 elif type(packet) is scapy.IP: 261 packet.proto = IPPROTO_UDP 262 packet.len = None # Recompute packet length. 263 packet.chksum = None # Recompute IPv4 checksum. 264 packet = scapy.IP(str(packet)) 265 else: 266 raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) 267 return packet, esp_hdr 268 269 270class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest): 271 """Base test class for all XFRM-related testing.""" 272 273 def _isIcmpv6(self, payload): 274 if not isinstance(payload, scapy.IPv6): 275 return False 276 if payload.nh == IPPROTO_ICMPV6: 277 return True 278 return payload.nh == IPPROTO_HOPOPTS and payload.payload.nh == IPPROTO_ICMPV6 279 280 def _ExpectEspPacketOn(self, netid, spi, seq, length, src_addr, dst_addr): 281 """Read a packet from a netid and verify its properties. 282 283 Args: 284 netid: netid from which to read an ESP packet 285 spi: SPI of the ESP packet in host byte order 286 seq: sequence number of the ESP packet 287 length: length of the packet's ESP payload or None to skip this check 288 src_addr: source address of the packet or None to skip this check 289 dst_addr: destination address of the packet or None to skip this check 290 291 Returns: 292 scapy.IP/IPv6: the read packet 293 """ 294 packets = [] 295 for packet in self.ReadAllPacketsOn(netid): 296 if not self._isIcmpv6(packet): 297 packets.append(packet) 298 299 self.assertEqual(1, len(packets)) 300 packet = packets[0] 301 if length is not None: 302 self.assertEqual(length, len(packet.payload)) 303 if dst_addr is not None: 304 self.assertEqual(dst_addr, packet.dst) 305 if src_addr is not None: 306 self.assertEqual(src_addr, packet.src) 307 # extract the ESP header 308 esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr) 309 self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr) 310 return packet 311 312 313# TODO: delete this when we're more diligent about deleting our SAs. 314class XfrmLazyTest(XfrmBaseTest): 315 """Base test class Xfrm tests that cleans XFRM state on teardown.""" 316 def setUp(self): 317 super(XfrmBaseTest, self).setUp() 318 self.xfrm = xfrm.Xfrm() 319 self.xfrm.FlushSaInfo() 320 self.xfrm.FlushPolicyInfo() 321 322 def tearDown(self): 323 super(XfrmBaseTest, self).tearDown() 324 self.xfrm.FlushSaInfo() 325 self.xfrm.FlushPolicyInfo() 326