• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright (c) PLUMgrid, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5import os
6import unittest
7from bcc import BPF
8import multiprocessing
9
10class TestPercpu(unittest.TestCase):
11
12    def setUp(self):
13        try:
14            b = BPF(text='BPF_PERCPU_ARRAY(stub, u32, 1);')
15        except:
16            raise unittest.SkipTest("PerCpu unsupported on this kernel")
17
18    def test_helper(self):
19        test_prog1 = """
20        BPF_PERCPU_ARRAY(stub_default);
21        BPF_PERCPU_ARRAY(stub_type, u64);
22        BPF_PERCPU_ARRAY(stub_full, u64, 1024);
23        """
24        BPF(text=test_prog1)
25
26    def test_u64(self):
27        test_prog1 = """
28        BPF_PERCPU_HASH(stats, u32, u64, 1);
29        int hello_world(void *ctx) {
30            u32 key=0;
31            u64 value = 0, *val;
32            val = stats.lookup_or_try_init(&key, &value);
33            if (val) {
34                *val += 1;
35            }
36            return 0;
37        }
38        """
39        bpf_code = BPF(text=test_prog1)
40        stats_map = bpf_code.get_table("stats")
41        event_name = bpf_code.get_syscall_fnname("clone")
42        bpf_code.attach_kprobe(event=event_name, fn_name="hello_world")
43        ini = stats_map.Leaf()
44        for i in range(0, multiprocessing.cpu_count()):
45            ini[i] = 0
46        stats_map[ stats_map.Key(0) ] = ini
47        f = os.popen("hostname")
48        f.close()
49        self.assertEqual(len(stats_map),1)
50        val = stats_map[ stats_map.Key(0) ]
51        sum = stats_map.sum(stats_map.Key(0))
52        avg = stats_map.average(stats_map.Key(0))
53        max = stats_map.max(stats_map.Key(0))
54        self.assertGreater(sum.value, int(0))
55        self.assertGreater(max.value, int(0))
56        bpf_code.detach_kprobe(event_name)
57
58    def test_u32(self):
59        test_prog1 = """
60        BPF_PERCPU_ARRAY(stats, u32, 1);
61        int hello_world(void *ctx) {
62            u32 key=0;
63            u32 value = 0, *val;
64            val = stats.lookup_or_try_init(&key, &value);
65            if (val) {
66                *val += 1;
67            }
68            return 0;
69        }
70        """
71        bpf_code = BPF(text=test_prog1)
72        stats_map = bpf_code.get_table("stats")
73        event_name = bpf_code.get_syscall_fnname("clone")
74        bpf_code.attach_kprobe(event=event_name, fn_name="hello_world")
75        ini = stats_map.Leaf()
76        for i in range(0, multiprocessing.cpu_count()):
77            ini[i] = 0
78        stats_map[ stats_map.Key(0) ] = ini
79        f = os.popen("hostname")
80        f.close()
81        self.assertEqual(len(stats_map),1)
82        val = stats_map[ stats_map.Key(0) ]
83        sum = stats_map.sum(stats_map.Key(0))
84        avg = stats_map.average(stats_map.Key(0))
85        max = stats_map.max(stats_map.Key(0))
86        self.assertGreater(sum.value, int(0))
87        self.assertGreater(max.value, int(0))
88        bpf_code.detach_kprobe(event_name)
89
90    def test_struct_custom_func(self):
91        test_prog2 = """
92        typedef struct counter {
93        u32 c1;
94        u32 c2;
95        } counter;
96        BPF_PERCPU_HASH(stats, u32, counter, 1);
97        int hello_world(void *ctx) {
98            u32 key=0;
99            counter value = {0,0}, *val;
100            val = stats.lookup_or_try_init(&key, &value);
101            if (val) {
102                val->c1 += 1;
103                val->c2 += 1;
104            }
105            return 0;
106        }
107        """
108        bpf_code = BPF(text=test_prog2)
109        stats_map = bpf_code.get_table("stats",
110                reducer=lambda x,y: stats_map.sLeaf(x.c1+y.c1))
111        event_name = bpf_code.get_syscall_fnname("clone")
112        bpf_code.attach_kprobe(event=event_name, fn_name="hello_world")
113        ini = stats_map.Leaf()
114        for i in ini:
115            i = stats_map.sLeaf(0,0)
116        stats_map[ stats_map.Key(0) ] = ini
117        f = os.popen("hostname")
118        f.close()
119        self.assertEqual(len(stats_map),1)
120        k = stats_map[ stats_map.Key(0) ]
121        self.assertGreater(k.c1, int(0))
122        bpf_code.detach_kprobe(event_name)
123
124
125if __name__ == "__main__":
126    unittest.main()
127