1#!/usr/bin/python3 2# 3# Copyright 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import 18from errno import * # pylint: disable=wildcard-import 19import os 20import itertools 21from scapy import all as scapy 22from socket import * # pylint: disable=wildcard-import 23import threading 24import unittest 25 26import multinetwork_base 27import net_test 28from tun_twister import TapTwister 29import util 30import xfrm 31import xfrm_base 32import xfrm_test 33 34ANY_KVER = net_test.LINUX_ANY_VERSION 35 36# List of encryption algorithms for use in ParamTests. 37CRYPT_ALGOS = [ 38 (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 128)), ANY_KVER), 39 (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 192)), ANY_KVER), 40 (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)), ANY_KVER), 41 # RFC 3686 specifies that key length must be 128, 192 or 256 bits, with 42 # an additional 4 bytes (32 bits) of nonce. A fresh nonce value MUST be 43 # assigned for each SA. 44 # CTR-AES is enforced since kernel version 5.8 45 (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CTR_AES, 128+32)), (5, 8)), 46 (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CTR_AES, 192+32)), (5, 8)), 47 (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CTR_AES, 256+32)), (5, 8)), 48] 49 50# List of auth algorithms for use in ParamTests. 51AUTH_ALGOS = [ 52 # RFC 4868 specifies that the only supported truncation length is half the 53 # hash size. 54 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_MD5, 128, 96)), ANY_KVER), 55 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 160, 96)), ANY_KVER), 56 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA256, 256, 128)), ANY_KVER), 57 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA384, 384, 192)), ANY_KVER), 58 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA512, 512, 256)), ANY_KVER), 59 # Test larger truncation lengths for good measure. 60 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_MD5, 128, 128)), ANY_KVER), 61 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 160, 160)), ANY_KVER), 62 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA256, 256, 256)), ANY_KVER), 63 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA384, 384, 384)), ANY_KVER), 64 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA512, 512, 512)), ANY_KVER), 65 # RFC 3566 specifies that the only supported truncation length 66 # is 96 bits. 67 # XCBC-AES is enforced since kernel version 5.8 68 (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_AUTH_XCBC_AES, 128, 96)), (5, 8)), 69] 70 71# List of aead algorithms for use in ParamTests. 72AEAD_ALGOS = [ 73 # RFC 4106 specifies that key length must be 128, 192 or 256 bits, 74 # with an additional 4 bytes (32 bits) of salt. The salt must be unique 75 # for each new SA using the same key. 76 # RFC 4106 specifies that ICV length must be 8, 12, or 16 bytes 77 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 128+32, 8*8)), ANY_KVER), 78 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 128+32, 12*8)), ANY_KVER), 79 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 128+32, 16*8)), ANY_KVER), 80 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 192+32, 8*8)), ANY_KVER), 81 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 192+32, 12*8)), ANY_KVER), 82 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 192+32, 16*8)), ANY_KVER), 83 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 256+32, 8*8)), ANY_KVER), 84 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 256+32, 12*8)), ANY_KVER), 85 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_GCM_AES, 256+32, 16*8)), ANY_KVER), 86 # RFC 7634 specifies that key length must be 256 bits, with an additional 87 # 4 bytes (32 bits) of nonce. A fresh nonce value MUST be assigned for 88 # each SA. RFC 7634 also specifies that ICV length must be 16 bytes. 89 # ChaCha20-Poly1305 is enforced since kernel version 5.8 90 (xfrm.XfrmAlgoAead((xfrm.XFRM_AEAD_CHACHA20_POLY1305, 256+32, 16*8)), (5, 8)), 91] 92 93def GenerateKey(key_len): 94 if key_len % 8 != 0: 95 raise ValueError("Invalid key length in bits: " + str(key_len)) 96 return os.urandom(key_len // 8) 97 98# Does the kernel support this algorithm? 99def HaveAlgo(crypt_algo, auth_algo, aead_algo): 100 try: 101 test_xfrm = xfrm.Xfrm() 102 test_xfrm.FlushSaInfo() 103 test_xfrm.FlushPolicyInfo() 104 105 test_xfrm.AddSaInfo( 106 src=xfrm_test.TEST_ADDR1, 107 dst=xfrm_test.TEST_ADDR2, 108 spi=xfrm_test.TEST_SPI, 109 mode=xfrm.XFRM_MODE_TRANSPORT, 110 reqid=100, 111 encryption=(crypt_algo, 112 GenerateKey(crypt_algo.key_len)) if crypt_algo else None, 113 auth_trunc=(auth_algo, 114 GenerateKey(auth_algo.key_len)) if auth_algo else None, 115 aead=(aead_algo, GenerateKey(aead_algo.key_len)) if aead_algo else None, 116 encap=None, 117 mark=None, 118 output_mark=None) 119 120 test_xfrm.FlushSaInfo() 121 test_xfrm.FlushPolicyInfo() 122 123 return True 124 except IOError as err: 125 if err.errno == ENOSYS: 126 return False 127 else: 128 print("Unexpected error:", err.errno) 129 return True 130 131# Dictionary to record the algorithm state. Mark the state True if this 132# algorithm is enforced or enabled on this kernel. Otherwise, mark it 133# False. 134algoState = {} 135 136def AlgoEnforcedOrEnabled(crypt, auth, aead, target_algo, target_kernel): 137 if algoState.get(target_algo) is None: 138 algoState[target_algo] = net_test.LINUX_VERSION >= target_kernel or HaveAlgo( 139 crypt, auth, aead) 140 return algoState.get(target_algo) 141 142# Return true if this algorithm should be enforced or is enabled on this kernel 143def AuthEnforcedOrEnabled(authCase): 144 auth = authCase[0] 145 crypt = xfrm.XfrmAlgo((b"ecb(cipher_null)", 0)) 146 return AlgoEnforcedOrEnabled(crypt, auth, None, auth.name, authCase[1]) 147 148# Return true if this algorithm should be enforced or is enabled on this kernel 149def CryptEnforcedOrEnabled(cryptCase): 150 crypt = cryptCase[0] 151 auth = xfrm.XfrmAlgoAuth((b"digest_null", 0, 0)) 152 return AlgoEnforcedOrEnabled(crypt, auth, None, crypt.name, cryptCase[1]) 153 154# Return true if this algorithm should be enforced or is enabled on this kernel 155def AeadEnforcedOrEnabled(aeadCase): 156 aead = aeadCase[0] 157 return AlgoEnforcedOrEnabled(None, None, aead, aead.name, aeadCase[1]) 158 159def InjectTests(): 160 XfrmAlgorithmTest.InjectTests() 161 162 163class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): 164 @classmethod 165 def InjectTests(cls): 166 VERSIONS = (4, 6) 167 TYPES = (SOCK_DGRAM, SOCK_STREAM) 168 169 # Tests all combinations of auth & crypt. Mutually exclusive with aead. 170 param_list = itertools.product(VERSIONS, TYPES, AUTH_ALGOS, CRYPT_ALGOS, 171 [None]) 172 util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator) 173 174 # Tests all combinations of aead. Mutually exclusive with auth/crypt. 175 param_list = itertools.product(VERSIONS, TYPES, [None], [None], AEAD_ALGOS) 176 util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator) 177 178 @staticmethod 179 def TestNameGenerator(version, proto, authCase, cryptCase, aeadCase): 180 # Produce a unique and readable name for each test. e.g. 181 # testSocketPolicySimple_cbc-aes_256_hmac-sha512_512_256_IPv6_UDP 182 param_string = "" 183 if cryptCase is not None: 184 crypt = cryptCase[0] 185 param_string += "%s_%d_" % (crypt.name.decode(), crypt.key_len) 186 187 if authCase is not None: 188 auth = authCase[0] 189 param_string += "%s_%d_%d_" % (auth.name.decode(), auth.key_len, 190 auth.trunc_len) 191 192 if aeadCase is not None: 193 aead = aeadCase[0] 194 param_string += "%s_%d_%d_" % (aead.name.decode(), aead.key_len, 195 aead.icv_len) 196 197 param_string += "%s_%s" % ("IPv4" if version == 4 else "IPv6", 198 "UDP" if proto == SOCK_DGRAM else "TCP") 199 return param_string 200 201 def ParamTestSocketPolicySimple(self, version, proto, authCase, cryptCase, aeadCase): 202 """Test two-way traffic using transport mode and socket policies.""" 203 204 # Bypass the test if any algorithm going to be tested is not enforced 205 # or enabled on this kernel 206 if authCase is not None and not AuthEnforcedOrEnabled(authCase): 207 return 208 if cryptCase is not None and not CryptEnforcedOrEnabled(cryptCase): 209 return 210 if aeadCase is not None and not AeadEnforcedOrEnabled(aeadCase): 211 return 212 213 auth = authCase[0] if authCase else None 214 crypt = cryptCase[0] if cryptCase else None 215 aead = aeadCase[0] if aeadCase else None 216 217 def AssertEncrypted(packet): 218 # This gives a free pass to ICMP and ICMPv6 packets, which show up 219 # nondeterministically in tests. 220 self.assertEqual(None, 221 packet.getlayer(scapy.UDP), 222 "UDP packet sent in the clear") 223 self.assertEqual(None, 224 packet.getlayer(scapy.TCP), 225 "TCP packet sent in the clear") 226 227 # We create a pair of sockets, "left" and "right", that will talk to each 228 # other using transport mode ESP. Because of TapTwister, both sockets 229 # perceive each other as owning "remote_addr". 230 netid = self.RandomNetid() 231 family = net_test.GetAddressFamily(version) 232 local_addr = self.MyAddress(version, netid) 233 remote_addr = self.GetRemoteSocketAddress(version) 234 auth_left = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)), 235 os.urandom(auth.key_len // 8)) if auth else None 236 auth_right = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)), 237 os.urandom(auth.key_len // 8)) if auth else None 238 crypt_left = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)), 239 os.urandom(crypt.key_len // 8)) if crypt else None 240 crypt_right = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)), 241 os.urandom(crypt.key_len // 8)) if crypt else None 242 aead_left = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)), 243 os.urandom(aead.key_len // 8)) if aead else None 244 aead_right = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)), 245 os.urandom(aead.key_len // 8)) if aead else None 246 spi_left = 0xbeefface 247 spi_right = 0xcafed00d 248 req_ids = [100, 200, 300, 400] # Used to match templates and SAs. 249 250 # Left outbound SA 251 self.xfrm.AddSaInfo( 252 src=local_addr, 253 dst=remote_addr, 254 spi=spi_right, 255 mode=xfrm.XFRM_MODE_TRANSPORT, 256 reqid=req_ids[0], 257 encryption=crypt_right, 258 auth_trunc=auth_right, 259 aead=aead_right, 260 encap=None, 261 mark=None, 262 output_mark=None) 263 # Right inbound SA 264 self.xfrm.AddSaInfo( 265 src=remote_addr, 266 dst=local_addr, 267 spi=spi_right, 268 mode=xfrm.XFRM_MODE_TRANSPORT, 269 reqid=req_ids[1], 270 encryption=crypt_right, 271 auth_trunc=auth_right, 272 aead=aead_right, 273 encap=None, 274 mark=None, 275 output_mark=None) 276 # Right outbound SA 277 self.xfrm.AddSaInfo( 278 src=local_addr, 279 dst=remote_addr, 280 spi=spi_left, 281 mode=xfrm.XFRM_MODE_TRANSPORT, 282 reqid=req_ids[2], 283 encryption=crypt_left, 284 auth_trunc=auth_left, 285 aead=aead_left, 286 encap=None, 287 mark=None, 288 output_mark=None) 289 # Left inbound SA 290 self.xfrm.AddSaInfo( 291 src=remote_addr, 292 dst=local_addr, 293 spi=spi_left, 294 mode=xfrm.XFRM_MODE_TRANSPORT, 295 reqid=req_ids[3], 296 encryption=crypt_left, 297 auth_trunc=auth_left, 298 aead=aead_left, 299 encap=None, 300 mark=None, 301 output_mark=None) 302 303 # Make two sockets. 304 sock_left = socket(family, proto, 0) 305 sock_left.settimeout(2.0) 306 sock_left.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 307 self.SelectInterface(sock_left, netid, "mark") 308 sock_right = socket(family, proto, 0) 309 sock_right.settimeout(2.0) 310 sock_right.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 311 self.SelectInterface(sock_right, netid, "mark") 312 313 # For UDP, set SO_LINGER to 0, to prevent TCP sockets from hanging around 314 # in a TIME_WAIT state. 315 if proto == SOCK_STREAM: 316 net_test.DisableFinWait(sock_left) 317 net_test.DisableFinWait(sock_right) 318 319 # Apply the left outbound socket policy. 320 xfrm_base.ApplySocketPolicy(sock_left, family, xfrm.XFRM_POLICY_OUT, 321 spi_right, req_ids[0], None) 322 # Apply right inbound socket policy. 323 xfrm_base.ApplySocketPolicy(sock_right, family, xfrm.XFRM_POLICY_IN, 324 spi_right, req_ids[1], None) 325 # Apply right outbound socket policy. 326 xfrm_base.ApplySocketPolicy(sock_right, family, xfrm.XFRM_POLICY_OUT, 327 spi_left, req_ids[2], None) 328 # Apply left inbound socket policy. 329 xfrm_base.ApplySocketPolicy(sock_left, family, xfrm.XFRM_POLICY_IN, 330 spi_left, req_ids[3], None) 331 332 server_ready = threading.Event() 333 server_error = None # Save exceptions thrown by the server. 334 335 def TcpServer(sock, client_port): 336 try: 337 sock.listen(1) 338 server_ready.set() 339 accepted, peer = sock.accept() 340 self.assertEqual(remote_addr, peer[0]) 341 self.assertEqual(client_port, peer[1]) 342 data = accepted.recv(2048) 343 self.assertEqual(b"hello request", data) 344 accepted.send(b"hello response") 345 except Exception as e: 346 server_error = e 347 finally: 348 sock.close() 349 350 def UdpServer(sock, client_port): 351 try: 352 server_ready.set() 353 data, peer = sock.recvfrom(2048) 354 self.assertEqual(remote_addr, peer[0]) 355 self.assertEqual(client_port, peer[1]) 356 self.assertEqual(b"hello request", data) 357 sock.sendto(b"hello response", peer) 358 except Exception as e: 359 server_error = e 360 finally: 361 sock.close() 362 363 # Server and client need to know each other's port numbers in advance. 364 wildcard_addr = net_test.GetWildcardAddress(version) 365 sock_left.bind((wildcard_addr, 0)) 366 sock_right.bind((wildcard_addr, 0)) 367 left_port = sock_left.getsockname()[1] 368 right_port = sock_right.getsockname()[1] 369 370 # Start the appropriate server type on sock_right. 371 target = TcpServer if proto == SOCK_STREAM else UdpServer 372 server = threading.Thread( 373 target=target, 374 args=(sock_right, left_port), 375 name="SocketServer") 376 server.start() 377 # Wait for server to be ready before attempting to connect. TCP retries 378 # hide this problem, but UDP will fail outright if the server socket has 379 # not bound when we send. 380 self.assertTrue(server_ready.wait(2.0), "Timed out waiting for server thread") 381 382 with TapTwister(fd=self.tuns[netid].fileno(), validator=AssertEncrypted): 383 sock_left.connect((remote_addr, right_port)) 384 sock_left.send(b"hello request") 385 data = sock_left.recv(2048) 386 self.assertEqual(b"hello response", data) 387 sock_left.close() 388 server.join(timeout=2.0) 389 self.assertFalse(server.is_alive(), "Timed out waiting for server exit") 390 if server_error: 391 raise server_error 392 393 394if __name__ == "__main__": 395 XfrmAlgorithmTest.InjectTests() 396 unittest.main() 397