• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright (c) PLUMgrid, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5from netaddr import IPAddress
6from bcc import BPF
7from pyroute2 import IPRoute, protocols
8from socket import socket, AF_INET, SOCK_DGRAM
9from subprocess import call
10import sys
11from time import sleep
12from unittest import main, TestCase
13
14arg1 = sys.argv.pop(1)
15arg2 = ""
16if len(sys.argv) > 1:
17  arg2 = sys.argv.pop(1)
18
19class TestBPFFilter(TestCase):
20    def setUp(self):
21        b = BPF(arg1, arg2, debug=0)
22        fn = b.load_func("on_packet", BPF.SCHED_ACT)
23        ip = IPRoute()
24        ifindex = ip.link_lookup(ifname="eth0")[0]
25        # set up a network to change the flow:
26        #             outside      |       inside
27        # 172.16.1.1 - 172.16.1.2  |  192.168.1.1 - 192.16.1.2
28        ip.addr("del", index=ifindex, address="172.16.1.2", mask=24)
29        ip.addr("add", index=ifindex, address="192.168.1.2", mask=24)
30        # add an ingress and egress qdisc
31        ip.tc("add", "ingress", ifindex, "ffff:")
32        ip.tc("add", "sfq", ifindex, "1:")
33        # add same program to both ingress/egress, so pkt is translated in both directions
34        action = {"kind": "bpf", "fd": fn.fd, "name": fn.name, "action": "ok"}
35        ip.tc("add-filter", "u32", ifindex, ":1", parent="ffff:", action=[action],
36                protocol=protocols.ETH_P_ALL, classid=1, target=0x10002, keys=['0x0/0x0+0'])
37        ip.tc("add-filter", "u32", ifindex, ":2", parent="1:", action=[action],
38                protocol=protocols.ETH_P_ALL, classid=1, target=0x10002, keys=['0x0/0x0+0'])
39        self.xlate = b.get_table("xlate")
40
41    def test_xlate(self):
42        key1 = self.xlate.Key(IPAddress("172.16.1.2").value, IPAddress("172.16.1.1").value)
43        leaf1 = self.xlate.Leaf(IPAddress("192.168.1.2").value, IPAddress("192.168.1.1").value, 0, 0)
44        self.xlate[key1] = leaf1
45        key2 = self.xlate.Key(IPAddress("192.168.1.1").value, IPAddress("192.168.1.2").value)
46        leaf2 = self.xlate.Leaf(IPAddress("172.16.1.1").value, IPAddress("172.16.1.2").value, 0, 0)
47        self.xlate[key2] = leaf2
48        call(["ping", "-c1", "192.168.1.1"])
49        leaf = self.xlate[key1]
50        self.assertGreater(leaf.ip_xlated_pkts, 0)
51        self.assertGreater(leaf.arp_xlated_pkts, 0)
52        leaf = self.xlate[key2]
53        self.assertGreater(leaf.ip_xlated_pkts, 0)
54        self.assertGreater(leaf.arp_xlated_pkts, 0)
55
56if __name__ == "__main__":
57    main()
58