• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
18from errno import *  # pylint: disable=wildcard-import
19import random
20from scapy import all as scapy
21from socket import *  # pylint: disable=wildcard-import
22import struct
23import subprocess
24import unittest
25
26import multinetwork_base
27import net_test
28import xfrm
29
30XFRM_ADDR_ANY = 16 * "\x00"
31LOOPBACK = 15 * "\x00" + "\x01"
32ENCRYPTED_PAYLOAD = ("b1c74998efd6326faebe2061f00f2c750e90e76001664a80c287b150"
33                     "59e74bf949769cc6af71e51b539e7de3a2a14cb05a231b969e035174"
34                     "d98c5aa0cef1937db98889ec0d08fa408fecf616")
35ENCRYPTION_KEY = ("308146eb3bd84b044573d60f5a5fd159"
36                  "57c7d4fe567a2120f35bae0f9869ec22".decode("hex"))
37AUTH_TRUNC_KEY = "af442892cdcd0ef650e9c299f9a8436a".decode("hex")
38
39TEST_ADDR1 = "2001:4860:4860::8888"
40TEST_ADDR2 = "2001:4860:4860::8844"
41
42TEST_SPI = 0x1234
43
44ALL_ALGORITHMS = 0xffffffff
45ALGO_CBC_AES_256 = xfrm.XfrmAlgo(("cbc(aes)", 256))
46ALGO_HMAC_SHA1 = xfrm.XfrmAlgoAuth(("hmac(sha1)", 128, 96))
47
48
49class XfrmTest(multinetwork_base.MultiNetworkBaseTest):
50
51  @classmethod
52  def setUpClass(cls):
53    super(XfrmTest, cls).setUpClass()
54    cls.xfrm = xfrm.Xfrm()
55
56  def setUp(self):
57    # TODO: delete this when we're more diligent about deleting our SAs.
58    super(XfrmTest, self).setUp()
59    self.xfrm.FlushSaInfo()
60
61  def tearDown(self):
62    super(XfrmTest, self).tearDown()
63    self.xfrm.FlushSaInfo()
64
65  def expectIPv6EspPacketOn(self, netid, spi, seq, length):
66    packets = self.ReadAllPacketsOn(netid)
67    self.assertEquals(1, len(packets))
68    packet = packets[0]
69    self.assertEquals(IPPROTO_ESP, packet.nh)
70    spi_seq = struct.pack("!II", spi, seq)
71    self.assertEquals(spi_seq, str(packet.payload)[:len(spi_seq)])
72    self.assertEquals(length, len(packet.payload))
73
74  def assertIsUdpEncapEsp(self, packet, spi, seq, length):
75    self.assertEquals(IPPROTO_UDP, packet.proto)
76    self.assertEquals(4500, packet.dport)
77    # Skip UDP header. TODO: isn't there a better way to do this?
78    payload = str(packet.payload)[8:]
79    self.assertEquals(length, len(payload))
80    spi_seq = struct.pack("!II", ntohl(spi), seq)
81    self.assertEquals(spi_seq, str(payload)[:len(spi_seq)])
82
83  def testAddSa(self):
84    self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP,
85                               xfrm.XFRM_MODE_TRANSPORT, 3320,
86                               ALGO_CBC_AES_256, ENCRYPTION_KEY,
87                               ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None)
88    expected = (
89        "src :: dst 2001:4860:4860::8888\n"
90        "\tproto esp spi 0x00001234 reqid 3320 mode transport\n"
91        "\treplay-window 4 \n"
92        "\tauth-trunc hmac(sha1) 0x%s 96\n"
93        "\tenc cbc(aes) 0x%s\n"
94        "\tsel src ::/0 dst ::/0 \n" % (
95            AUTH_TRUNC_KEY.encode("hex"), ENCRYPTION_KEY.encode("hex")))
96
97    actual = subprocess.check_output("ip xfrm state".split())
98    try:
99      self.assertMultiLineEqual(expected, actual)
100    finally:
101      self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP)
102
103  def testFlush(self):
104    self.assertEquals(0, len(self.xfrm.DumpSaInfo()))
105    self.xfrm.AddMinimalSaInfo("::", "2000::", htonl(TEST_SPI),
106                               IPPROTO_ESP, xfrm.XFRM_MODE_TRANSPORT, 1234,
107                               ALGO_CBC_AES_256, ENCRYPTION_KEY,
108                               ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None)
109    self.xfrm.AddMinimalSaInfo("0.0.0.0", "192.0.2.1", htonl(TEST_SPI),
110                               IPPROTO_ESP, xfrm.XFRM_MODE_TRANSPORT, 4321,
111                               ALGO_CBC_AES_256, ENCRYPTION_KEY,
112                               ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None)
113    self.assertEquals(2, len(self.xfrm.DumpSaInfo()))
114    self.xfrm.FlushSaInfo()
115    self.assertEquals(0, len(self.xfrm.DumpSaInfo()))
116
117  @unittest.skipUnless(net_test.LINUX_VERSION < (4, 4, 0), "regression")
118  def testSocketPolicy(self):
119    # Open an IPv6 UDP socket and connect it.
120    s = socket(AF_INET6, SOCK_DGRAM, 0)
121    netid = random.choice(self.NETIDS)
122    self.SelectInterface(s, netid, "mark")
123    s.connect((TEST_ADDR1, 53))
124    saddr, sport = s.getsockname()[:2]
125    daddr, dport = s.getpeername()[:2]
126
127    # Create a selector that matches all UDP packets. It's not actually used to
128    # select traffic, that will be done by the socket policy, which selects the
129    # SA entry (i.e., xfrm state) via the SPI and reqid.
130    sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0,
131                             AF_INET6, 0, 0, IPPROTO_UDP, 0, 0))
132
133    # Create a user policy that specifies that all outbound packets matching the
134    # (essentially no-op) selector should be encrypted.
135    info = xfrm.XfrmUserpolicyInfo((sel,
136                                    xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR,
137                                    100, 0,
138                                    xfrm.XFRM_POLICY_OUT,
139                                    xfrm.XFRM_POLICY_ALLOW,
140                                    xfrm.XFRM_POLICY_LOCALOK,
141                                    xfrm.XFRM_SHARE_UNIQUE))
142
143    # Create a template that specifies the SPI and the protocol.
144    xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, htonl(TEST_SPI), IPPROTO_ESP))
145    tmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET6, XFRM_ADDR_ANY, 0,
146                              xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE,
147                              0,                # require
148                              ALL_ALGORITHMS,   # auth algos
149                              ALL_ALGORITHMS,   # encryption algos
150                              ALL_ALGORITHMS))  # compression algos
151
152    # Set the policy and template on our socket.
153    data = info.Pack() + tmpl.Pack()
154    s.setsockopt(IPPROTO_IPV6, xfrm.IPV6_XFRM_POLICY, data)
155
156    # Because the policy has level set to "require" (the default), attempting
157    # to send a packet results in an error, because there is no SA that
158    # matches the socket policy we set.
159    self.assertRaisesErrno(
160        EAGAIN,
161        s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
162
163    # Adding a matching SA causes the packet to go out encrypted. The SA's
164    # SPI must match the one in our template, and the destination address must
165    # match the packet's destination address (in tunnel mode, it has to match
166    # the tunnel destination).
167    reqid = 0
168    self.xfrm.AddMinimalSaInfo("::", TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP,
169                               xfrm.XFRM_MODE_TRANSPORT, reqid,
170                               ALGO_CBC_AES_256, ENCRYPTION_KEY,
171                               ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, None)
172
173    s.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
174    self.expectIPv6EspPacketOn(netid, TEST_SPI, 1, 84)
175
176    # Sending to another destination doesn't work: again, no matching SA.
177    self.assertRaisesErrno(
178        EAGAIN,
179        s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR2, 53))
180
181    # Sending on another socket without the policy applied results in an
182    # unencrypted packet going out.
183    s2 = socket(AF_INET6, SOCK_DGRAM, 0)
184    self.SelectInterface(s2, netid, "mark")
185    s2.sendto(net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
186    packets = self.ReadAllPacketsOn(netid)
187    self.assertEquals(1, len(packets))
188    packet = packets[0]
189    self.assertEquals(IPPROTO_UDP, packet.nh)
190
191    # Deleting the SA causes the first socket to return errors again.
192    self.xfrm.DeleteSaInfo(TEST_ADDR1, htonl(TEST_SPI), IPPROTO_ESP)
193    self.assertRaisesErrno(
194        EAGAIN,
195        s.sendto, net_test.UDP_PAYLOAD, (TEST_ADDR1, 53))
196
197
198  def testUdpEncapWithSocketPolicy(self):
199    # TODO: test IPv6 instead of IPv4.
200    netid = random.choice(self.NETIDS)
201    myaddr = self.MyAddress(4, netid)
202    remoteaddr = self.GetRemoteAddress(4)
203
204    # Reserve a port on which to receive UDP encapsulated packets. Sending
205    # packets works without this (and potentially can send packets with a source
206    # port belonging to another application), but receiving requires the port to
207    # be bound and the encapsulation socket option enabled.
208    encap_socket = net_test.Socket(AF_INET, SOCK_DGRAM, 0)
209    encap_socket.bind((myaddr, 0))
210    encap_port = encap_socket.getsockname()[1]
211    encap_socket.setsockopt(IPPROTO_UDP, xfrm.UDP_ENCAP,
212                               xfrm.UDP_ENCAP_ESPINUDP)
213
214    # Open a socket to send traffic.
215    s = socket(AF_INET, SOCK_DGRAM, 0)
216    self.SelectInterface(s, netid, "mark")
217    s.connect((remoteaddr, 53))
218
219    # Create a UDP encap policy and template inbound and outbound and apply
220    # them to s.
221    sel = xfrm.XfrmSelector((XFRM_ADDR_ANY, XFRM_ADDR_ANY, 0, 0, 0, 0,
222                             AF_INET, 0, 0, IPPROTO_UDP, 0, 0))
223
224    # Use the same SPI both inbound and outbound because this lets us receive
225    # encrypted packets by simply replaying the packets the kernel sends.
226    in_reqid = 123
227    in_spi = htonl(TEST_SPI)
228    out_reqid = 456
229    out_spi = htonl(TEST_SPI)
230
231    # Start with the outbound policy.
232    # TODO: what happens without XFRM_SHARE_UNIQUE?
233    info = xfrm.XfrmUserpolicyInfo((sel,
234                                    xfrm.NO_LIFETIME_CFG, xfrm.NO_LIFETIME_CUR,
235                                    100, 0,
236                                    xfrm.XFRM_POLICY_OUT,
237                                    xfrm.XFRM_POLICY_ALLOW,
238                                    xfrm.XFRM_POLICY_LOCALOK,
239                                    xfrm.XFRM_SHARE_UNIQUE))
240    xfrmid = xfrm.XfrmId((XFRM_ADDR_ANY, out_spi, IPPROTO_ESP))
241    usertmpl = xfrm.XfrmUserTmpl((xfrmid, AF_INET, XFRM_ADDR_ANY, out_reqid,
242                              xfrm.XFRM_MODE_TRANSPORT, xfrm.XFRM_SHARE_UNIQUE,
243                              0,                # require
244                              ALL_ALGORITHMS,   # auth algos
245                              ALL_ALGORITHMS,   # encryption algos
246                              ALL_ALGORITHMS))  # compression algos
247
248    data = info.Pack() + usertmpl.Pack()
249    s.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data)
250
251    # Uncomment for debugging.
252    # subprocess.call("ip xfrm policy".split())
253
254    # Create inbound and outbound SAs that specify UDP encapsulation.
255    encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port),
256                                    htons(4500), 16 * "\x00"))
257    self.xfrm.AddMinimalSaInfo(myaddr, remoteaddr, out_spi, IPPROTO_ESP,
258                               xfrm.XFRM_MODE_TRANSPORT, out_reqid,
259                               ALGO_CBC_AES_256, ENCRYPTION_KEY,
260                               ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl)
261
262    # Add an encap template that's the mirror of the outbound one.
263    encaptmpl.sport, encaptmpl.dport = encaptmpl.dport, encaptmpl.sport
264    self.xfrm.AddMinimalSaInfo(remoteaddr, myaddr, in_spi, IPPROTO_ESP,
265                               xfrm.XFRM_MODE_TRANSPORT, in_reqid,
266                               ALGO_CBC_AES_256, ENCRYPTION_KEY,
267                               ALGO_HMAC_SHA1, AUTH_TRUNC_KEY, encaptmpl)
268
269    # Uncomment for debugging.
270    # subprocess.call("ip xfrm state".split())
271
272    # Now send a packet.
273    s.sendto("foo", (remoteaddr, 53))
274    srcport = s.getsockname()[1]
275    # s.send("foo")  # TODO: WHY DOES THIS NOT WORK?
276
277    # Expect to see an UDP encapsulated packet.
278    packets = self.ReadAllPacketsOn(netid)
279    self.assertEquals(1, len(packets))
280    packet = packets[0]
281    self.assertIsUdpEncapEsp(packet, out_spi, 1, 52)
282
283    # Now test the receive path. Because we don't know how to decrypt packets,
284    # we just play back the encrypted packet that kernel sent earlier. We swap
285    # the addresses in the IP header to make the packet look like it's bound for
286    # us, but we can't do that for the port numbers because the UDP header is
287    # part of the integrity protected payload, which we can only replay as is.
288    # So the source and destination ports are swapped and the packet appears to
289    # be sent from srcport to port 53. Open another socket on that port, and
290    # apply the inbound policy to it.
291    twisted_socket = socket(AF_INET, SOCK_DGRAM, 0)
292    net_test.SetSocketTimeout(twisted_socket, 100)
293    twisted_socket.bind(("0.0.0.0", 53))
294
295    # TODO: why does this work even without the per-socket policy applied? The
296    # received packet obviously matches an SA, but don't inbound packets need to
297    # match a policy as well?
298    info.dir = xfrm.XFRM_POLICY_IN
299    xfrmid.spi = in_spi
300    usertmpl.reqid = in_reqid
301    data = info.Pack() + usertmpl.Pack()
302    twisted_socket.setsockopt(IPPROTO_IP, xfrm.IP_XFRM_POLICY, data)
303
304    # Save the payload of the packet so we can replay it back to ourselves.
305    payload = str(packet.payload)[8:]
306    spi_seq = struct.pack("!II", ntohl(in_spi), 1)
307    payload = spi_seq + payload[len(spi_seq):]
308
309    # Tamper with the packet and check that it's dropped and counted as invalid.
310    sainfo = self.xfrm.FindSaInfo(in_spi)
311    self.assertEquals(0, sainfo.stats.integrity_failed)
312    broken = payload[:25] + chr((ord(payload[25]) + 1) % 256) + payload[26:]
313    incoming = (scapy.IP(src=remoteaddr, dst=myaddr) /
314                scapy.UDP(sport=4500, dport=encap_port) / broken)
315    self.ReceivePacketOn(netid, incoming)
316    sainfo = self.xfrm.FindSaInfo(in_spi)
317    self.assertEquals(1, sainfo.stats.integrity_failed)
318
319    # Now play back the valid packet and check that we receive it.
320    incoming = (scapy.IP(src=remoteaddr, dst=myaddr) /
321                scapy.UDP(sport=4500, dport=encap_port) / payload)
322    self.ReceivePacketOn(netid, incoming)
323    data, src = twisted_socket.recvfrom(4096)
324    self.assertEquals("foo", data)
325    self.assertEquals((remoteaddr, srcport), src)
326
327    # Check that unencrypted packets are not received.
328    unencrypted = (scapy.IP(src=remoteaddr, dst=myaddr) /
329                   scapy.UDP(sport=srcport, dport=53) / "foo")
330    self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
331
332  def testAllocSpecificSpi(self):
333    spi = 0xABCD
334    new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
335    self.assertEquals(spi, ntohl(new_sa.id.spi))
336
337  def testAllocSpecificSpiUnavailable(self):
338    """Attempt to allocate the same SPI twice."""
339    spi = 0xABCD
340    new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
341    self.assertEquals(spi, ntohl(new_sa.id.spi))
342    with self.assertRaisesErrno(ENOENT):
343      new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
344
345  def testAllocRangeSpi(self):
346    start, end = 0xABCD0, 0xABCDF
347    new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end)
348    spi = ntohl(new_sa.id.spi)
349    self.assertGreaterEqual(spi, start)
350    self.assertLessEqual(spi, end)
351
352  def testAllocRangeSpiUnavailable(self):
353    """Attempt to allocate N+1 SPIs from a range of size N."""
354    start, end = 0xABCD0, 0xABCDF
355    range_size = end - start + 1
356    spis = set()
357    # Assert that allocating SPI fails when none are available.
358    with self.assertRaisesErrno(ENOENT):
359      # Allocating range_size + 1 SPIs is guaranteed to fail.  Due to the way
360      # kernel picks random SPIs, this has a high probability of failing before
361      # reaching that limit.
362      for i in xrange(range_size + 1):
363        new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end)
364        spi = ntohl(new_sa.id.spi)
365        self.assertNotIn(spi, spis)
366        spis.add(spi)
367
368
369if __name__ == "__main__":
370  unittest.main()
371