1#!/usr/bin/python 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import 18from errno import * # pylint: disable=wildcard-import 19import random 20from scapy import all as scapy 21from socket import * # pylint: disable=wildcard-import 22import struct 23import subprocess 24import unittest 25 26import multinetwork_base 27import net_test 28import xfrm 29 30XFRM_ADDR_ANY = 16 * "\x00" 31LOOPBACK = 15 * "\x00" + "\x01" 32ENCRYPTED_PAYLOAD = ("b1c74998efd6326faebe2061f00f2c750e90e76001664a80c287b150" 33 "59e74bf949769cc6af71e51b539e7de3a2a14cb05a231b969e035174" 34 "d98c5aa0cef1937db98889ec0d08fa408fecf616") 35ENCRYPTION_KEY = ("308146eb3bd84b044573d60f5a5fd159" 36 "57c7d4fe567a2120f35bae0f9869ec22".decode("hex")) 37AUTH_TRUNC_KEY = "af442892cdcd0ef650e9c299f9a8436a".decode("hex") 38 39TEST_ADDR1 = "2001:4860:4860::8888" 40TEST_ADDR2 = "2001:4860:4860::8844" 41 42TEST_SPI = 0x1234 43 44ALL_ALGORITHMS = 0xffffffff 45ALGO_CBC_AES_256 = xfrm.XfrmAlgo(("cbc(aes)", 256)) 46ALGO_HMAC_SHA1 = xfrm.XfrmAlgoAuth(("hmac(sha1)", 128, 96)) 47 48 49class XfrmTest(multinetwork_base.MultiNetworkBaseTest): 50 51 @classmethod 52 def setUpClass(cls): 53 super(XfrmTest, cls).setUpClass() 54 cls.xfrm = xfrm.Xfrm() 55 56 def setUp(self): 57 # TODO: delete this when we're more diligent about deleting our SAs. 58 super(XfrmTest, self).setUp() 59 self.xfrm.FlushSaInfo() 60 61 def tearDown(self): 62 super(XfrmTest, self).tearDown() 63 self.xfrm.FlushSaInfo() 64 65 def expectIPv6EspPacketOn(self, netid, spi, seq, length): 66 packets = self.ReadAllPacketsOn(netid) 67 self.assertEquals(1, len(packets)) 68 packet = packets[0] 69 self.assertEquals(IPPROTO_ESP, packet.nh) 70 spi_seq = struct.pack("!II", spi, seq) 71 self.assertEquals(spi_seq, str(packet.payload)[:len(spi_seq)]) 72 self.assertEquals(length, len(packet.payload)) 73 74 def assertIsUdpEncapEsp(self, packet, spi, seq, length): 75 self.assertEquals(IPPROTO_UDP, packet.proto) 76 self.assertEquals(4500, packet.dport) 77 # Skip UDP header. TODO: isn't there a better way to do this? 78 payload = str(packet.payload)[8:] 79 self.assertEquals(length, len(payload)) 80 spi_seq = struct.pack("!II", ntohl(spi), seq) 81 self.assertEquals(spi_seq, str(payload)[:len(spi_seq)]) 82 83 def testAddSa(self): 84 self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP, 85 xfrm.XFRM_MODE_TRANSPORT, 3320, 86 ALGO_CBC_AES_256, ENCRYPTION_KEY, 87 ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None) 88 expected = ( 89 "src :: dst 2001:4860:4860::8888\n" 90 "\tproto esp spi 0x00001234 reqid 3320 mode transport\n" 91 "\treplay-window 4 \n" 92 "\tauth-trunc hmac(sha1) 0x%s 96\n" 93 "\tenc cbc(aes) 0x%s\n" 94 "\tsel src ::/0 dst ::/0 \n" % ( 95 AUTH_TRUNC_KEY.encode("hex"), ENCRYPTION_KEY.encode("hex"))) 96 97 actual = subprocess.check_output("ip xfrm state".split()) 98 try: 99 self.assertMultiLineEqual(expected, actual) 100 finally: 101 self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP) 102 103 def testFlush(self): 104 self.assertEquals(0, len(self.xfrm.DumpSaInfo())) 105 self.xfrm.AddMinimalSaInfo("::", "2000::", htonl(TEST_SPI), 106 IPPROTO_ESP, xfrm.XFRM_MODE_TRANSPORT, 1234, 107 ALGO_CBC_AES_256, ENCRYPTION_KEY, 108 ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None) 109 self.xfrm.AddMinimalSaInfo("0.0.0.0", "192.0.2.1", htonl(TEST_SPI), 110 IPPROTO_ESP, xfrm.XFRM_MODE_TRANSPORT, 4321, 111 ALGO_CBC_AES_256, ENCRYPTION_KEY, 112 ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None) 113 self.assertEquals(2, len(self.xfrm.DumpSaInfo())) 114 self.xfrm.FlushSaInfo() 115 self.assertEquals(0, len(self.xfrm.DumpSaInfo())) 116 117 @unittest.skipUnless(net_test.LINUX_VERSION < (4, 4, 0), "regression") 118 def testSocketPolicy(self): 119 # Open an IPv6 UDP socket and connect it. 120 s = socket(AF_INET6, SOCK_DGRAM, 0) 121 netid = random.choice(self.NETIDS) 122 self.SelectInterface(s, netid, "mark") 123 s.connect((TEST_ADDR1, 53)) 124 saddr, sport = s.getsockname()[:2] 125 daddr, dport = s.getpeername()[:2] 126 127 # Create a selector that matches all UDP packets. It's not actually used to 128 # select traffic, that will be done by the socket policy, which selects the 129 # SA entry (i.e., xfrm state) via the SPI and reqid. 130 sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0, 131 AF_INET6, 0, 0, IPPROTO_UDP, 0, 0)) 132 133 # Create a user policy that specifies that all outbound packets matching the 134 # (essentially no-op) selector should be encrypted. 135 info = xfrm.XfrmUserpolicyInfo((sel, 136 xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR, 137 100, 0, 138 xfrm.XFRM_POLICY_OUT, 139 xfrm.XFRM_POLICY_ALLOW, 140 xfrm.XFRM_POLICY_LOCALOK, 141 xfrm.XFRM_SHARE_UNIQUE)) 142 143 # Create a template that specifies the SPI and the protocol. 144 xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, htonl(TEST_SPI), IPPROTO_ESP)) 145 tmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET6, XFRM_ADDR_ANY, 0, 146 xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE, 147 0, # require 148 ALL_ALGORITHMS, # auth algos 149 ALL_ALGORITHMS, # encryption algos 150 ALL_ALGORITHMS)) # compression algos 151 152 # Set the policy and template on our socket. 153 data = info.Pack() + tmpl.Pack() 154 s.setsockopt(IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, data) 155 156 # Because the policy has level set to "require" (the default), attempting 157 # to send a packet results in an error, because there is no SA that 158 # matches the socket policy we set. 159 self.assertRaisesErrno( 160 EAGAIN, 161 s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) 162 163 # Adding a matching SA causes the packet to go out encrypted. The SA's 164 # SPI must match the one in our template, and the destination address must 165 # match the packet's destination address (in tunnel mode, it has to match 166 # the tunnel destination). 167 reqid = 0 168 self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP, 169 xfrm.XFRM_MODE_TRANSPORT, reqid, 170 ALGO_CBC_AES_256, ENCRYPTION_KEY, 171 ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None) 172 173 s.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) 174 self.expectIPv6EspPacketOn(netid, TEST_SPI, 1, 84) 175 176 # Sending to another destination doesn't work: again, no matching SA. 177 self.assertRaisesErrno( 178 EAGAIN, 179 s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR2, 53)) 180 181 # Sending on another socket without the policy applied results in an 182 # unencrypted packet going out. 183 s2 = socket(AF_INET6, SOCK_DGRAM, 0) 184 self.SelectInterface(s2, netid, "mark") 185 s2.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) 186 packets = self.ReadAllPacketsOn(netid) 187 self.assertEquals(1, len(packets)) 188 packet = packets[0] 189 self.assertEquals(IPPROTO_UDP, packet.nh) 190 191 # Deleting the SA causes the first socket to return errors again. 192 self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP) 193 self.assertRaisesErrno( 194 EAGAIN, 195 s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53)) 196 197 198 def testUdpEncapWithSocketPolicy(self): 199 # TODO: test IPv6 instead of IPv4. 200 netid = random.choice(self.NETIDS) 201 myaddr = self.MyAddress(4, netid) 202 remoteaddr = self.GetRemoteAddress(4) 203 204 # Reserve a port on which to receive UDP encapsulated packets. Sending 205 # packets works without this (and potentially can send packets with a source 206 # port belonging to another application), but receiving requires the port to 207 # be bound and the encapsulation socket option enabled. 208 encap_socket = net_test.Socket(AF_INET, SOCK_DGRAM, 0) 209 encap_socket.bind((myaddr, 0)) 210 encap_port = encap_socket.getsockname()[1] 211 encap_socket.setsockopt(IPPROTO_UDP, xfrm.UDP_ENCAP, 212 xfrm.UDP_ENCAP_ESPINUDP) 213 214 # Open a socket to send traffic. 215 s = socket(AF_INET, SOCK_DGRAM, 0) 216 self.SelectInterface(s, netid, "mark") 217 s.connect((remoteaddr, 53)) 218 219 # Create a UDP encap policy and template inbound and outbound and apply 220 # them to s. 221 sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0, 222 AF_INET, 0, 0, IPPROTO_UDP, 0, 0)) 223 224 # Use the same SPI both inbound and outbound because this lets us receive 225 # encrypted packets by simply replaying the packets the kernel sends. 226 in_reqid = 123 227 in_spi = htonl(TEST_SPI) 228 out_reqid = 456 229 out_spi = htonl(TEST_SPI) 230 231 # Start with the outbound policy. 232 # TODO: what happens without XFRM_SHARE_UNIQUE? 233 info = xfrm.XfrmUserpolicyInfo((sel, 234 xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR, 235 100, 0, 236 xfrm.XFRM_POLICY_OUT, 237 xfrm.XFRM_POLICY_ALLOW, 238 xfrm.XFRM_POLICY_LOCALOK, 239 xfrm.XFRM_SHARE_UNIQUE)) 240 xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, out_spi, IPPROTO_ESP)) 241 usertmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET, XFRM_ADDR_ANY, out_reqid, 242 xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE, 243 0, # require 244 ALL_ALGORITHMS, # auth algos 245 ALL_ALGORITHMS, # encryption algos 246 ALL_ALGORITHMS)) # compression algos 247 248 data = info.Pack() + usertmpl.Pack() 249 s.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data) 250 251 # Uncomment for debugging. 252 # subprocess.call("ip xfrm policy".split()) 253 254 # Create inbound and outbound SAs that specify UDP encapsulation. 255 encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port), 256 htons(4500), 16 * "\x00")) 257 self.xfrm.AddMinimalSaInfo(myaddr, remoteaddr, out_spi, IPPROTO_ESP, 258 xfrm.XFRM_MODE_TRANSPORT, out_reqid, 259 ALGO_CBC_AES_256, ENCRYPTION_KEY, 260 ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl) 261 262 # Add an encap template that's the mirror of the outbound one. 263 encaptmpl.sport, encaptmpl.dport = encaptmpl.dport, encaptmpl.sport 264 self.xfrm.AddMinimalSaInfo(remoteaddr, myaddr, in_spi, IPPROTO_ESP, 265 xfrm.XFRM_MODE_TRANSPORT, in_reqid, 266 ALGO_CBC_AES_256, ENCRYPTION_KEY, 267 ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl) 268 269 # Uncomment for debugging. 270 # subprocess.call("ip xfrm state".split()) 271 272 # Now send a packet. 273 s.sendto("foo", (remoteaddr, 53)) 274 srcport = s.getsockname()[1] 275 # s.send("foo") # TODO: WHY DOES THIS NOT WORK? 276 277 # Expect to see an UDP encapsulated packet. 278 packets = self.ReadAllPacketsOn(netid) 279 self.assertEquals(1, len(packets)) 280 packet = packets[0] 281 self.assertIsUdpEncapEsp(packet, out_spi, 1, 52) 282 283 # Now test the receive path. Because we don't know how to decrypt packets, 284 # we just play back the encrypted packet that kernel sent earlier. We swap 285 # the addresses in the IP header to make the packet look like it's bound for 286 # us, but we can't do that for the port numbers because the UDP header is 287 # part of the integrity protected payload, which we can only replay as is. 288 # So the source and destination ports are swapped and the packet appears to 289 # be sent from srcport to port 53. Open another socket on that port, and 290 # apply the inbound policy to it. 291 twisted_socket = socket(AF_INET, SOCK_DGRAM, 0) 292 net_test.SetSocketTimeout(twisted_socket, 100) 293 twisted_socket.bind(("0.0.0.0", 53)) 294 295 # TODO: why does this work even without the per-socket policy applied? The 296 # received packet obviously matches an SA, but don't inbound packets need to 297 # match a policy as well? 298 info.dir = xfrm.XFRM_POLICY_IN 299 xfrmid.spi = in_spi 300 usertmpl.reqid = in_reqid 301 data = info.Pack() + usertmpl.Pack() 302 twisted_socket.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data) 303 304 # Save the payload of the packet so we can replay it back to ourselves. 305 payload = str(packet.payload)[8:] 306 spi_seq = struct.pack("!II", ntohl(in_spi), 1) 307 payload = spi_seq + payload[len(spi_seq):] 308 309 # Tamper with the packet and check that it's dropped and counted as invalid. 310 sainfo = self.xfrm.FindSaInfo(in_spi) 311 self.assertEquals(0, sainfo.stats.integrity_failed) 312 broken = payload[:25] + chr((ord(payload[25]) + 1) % 256) + payload[26:] 313 incoming = (scapy.IP(src=remoteaddr, dst=myaddr) / 314 scapy.UDP(sport=4500, dport=encap_port) / broken) 315 self.ReceivePacketOn(netid, incoming) 316 sainfo = self.xfrm.FindSaInfo(in_spi) 317 self.assertEquals(1, sainfo.stats.integrity_failed) 318 319 # Now play back the valid packet and check that we receive it. 320 incoming = (scapy.IP(src=remoteaddr, dst=myaddr) / 321 scapy.UDP(sport=4500, dport=encap_port) / payload) 322 self.ReceivePacketOn(netid, incoming) 323 data, src = twisted_socket.recvfrom(4096) 324 self.assertEquals("foo", data) 325 self.assertEquals((remoteaddr, srcport), src) 326 327 # Check that unencrypted packets are not received. 328 unencrypted = (scapy.IP(src=remoteaddr, dst=myaddr) / 329 scapy.UDP(sport=srcport, dport=53) / "foo") 330 self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096) 331 332 def testAllocSpecificSpi(self): 333 spi = 0xABCD 334 new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi) 335 self.assertEquals(spi, ntohl(new_sa.id.spi)) 336 337 def testAllocSpecificSpiUnavailable(self): 338 """Attempt to allocate the same SPI twice.""" 339 spi = 0xABCD 340 new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi) 341 self.assertEquals(spi, ntohl(new_sa.id.spi)) 342 with self.assertRaisesErrno(ENOENT): 343 new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi) 344 345 def testAllocRangeSpi(self): 346 start, end = 0xABCD0, 0xABCDF 347 new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end) 348 spi = ntohl(new_sa.id.spi) 349 self.assertGreaterEqual(spi, start) 350 self.assertLessEqual(spi, end) 351 352 def testAllocRangeSpiUnavailable(self): 353 """Attempt to allocate N+1 SPIs from a range of size N.""" 354 start, end = 0xABCD0, 0xABCDF 355 range_size = end - start + 1 356 spis = set() 357 # Assert that allocating SPI fails when none are available. 358 with self.assertRaisesErrno(ENOENT): 359 # Allocating range_size + 1 SPIs is guaranteed to fail. Due to the way 360 # kernel picks random SPIs, this has a high probability of failing before 361 # reaching that limit. 362 for i in xrange(range_size + 1): 363 new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end) 364 spi = ntohl(new_sa.id.spi) 365 self.assertNotIn(spi, spis) 366 spis.add(spi) 367 368 369if __name__ == "__main__": 370 unittest.main() 371