1#!/usr/bin/env python3 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5from netaddr import IPAddress 6from bcc import BPF 7from pyroute2 import IPRoute, protocols 8from socket import socket, AF_INET, SOCK_DGRAM 9from subprocess import call 10import sys 11from time import sleep 12from unittest import main, TestCase 13 14arg1 = sys.argv.pop(1) 15arg2 = "" 16if len(sys.argv) > 1: 17 arg2 = sys.argv.pop(1) 18 19class TestBPFFilter(TestCase): 20 def setUp(self): 21 b = BPF(arg1, arg2, debug=0) 22 fn = b.load_func("on_packet", BPF.SCHED_ACT) 23 ip = IPRoute() 24 ifindex = ip.link_lookup(ifname="eth0")[0] 25 # set up a network to change the flow: 26 # outside | inside 27 # 172.16.1.1 - 172.16.1.2 | 192.168.1.1 - 192.16.1.2 28 ip.addr("del", index=ifindex, address="172.16.1.2", mask=24) 29 ip.addr("add", index=ifindex, address="192.168.1.2", mask=24) 30 # add an ingress and egress qdisc 31 ip.tc("add", "ingress", ifindex, "ffff:") 32 ip.tc("add", "sfq", ifindex, "1:") 33 # add same program to both ingress/egress, so pkt is translated in both directions 34 action = {"kind": "bpf", "fd": fn.fd, "name": fn.name, "action": "ok"} 35 ip.tc("add-filter", "u32", ifindex, ":1", parent="ffff:", action=[action], 36 protocol=protocols.ETH_P_ALL, classid=1, target=0x10002, keys=['0x0/0x0+0']) 37 ip.tc("add-filter", "u32", ifindex, ":2", parent="1:", action=[action], 38 protocol=protocols.ETH_P_ALL, classid=1, target=0x10002, keys=['0x0/0x0+0']) 39 self.xlate = b.get_table("xlate") 40 41 def test_xlate(self): 42 key1 = self.xlate.Key(IPAddress("172.16.1.2").value, IPAddress("172.16.1.1").value) 43 leaf1 = self.xlate.Leaf(IPAddress("192.168.1.2").value, IPAddress("192.168.1.1").value, 0, 0) 44 self.xlate[key1] = leaf1 45 key2 = self.xlate.Key(IPAddress("192.168.1.1").value, IPAddress("192.168.1.2").value) 46 leaf2 = self.xlate.Leaf(IPAddress("172.16.1.1").value, IPAddress("172.16.1.2").value, 0, 0) 47 self.xlate[key2] = leaf2 48 call(["ping", "-c1", "192.168.1.1"]) 49 leaf = self.xlate[key1] 50 self.assertGreater(leaf.ip_xlated_pkts, 0) 51 self.assertGreater(leaf.arp_xlated_pkts, 0) 52 leaf = self.xlate[key2] 53 self.assertGreater(leaf.ip_xlated_pkts, 0) 54 self.assertGreater(leaf.arp_xlated_pkts, 0) 55 56if __name__ == "__main__": 57 main() 58