1#!/usr/bin/env python 2# Copyright (c) Suchakra Sharma <suchakrapani.sharma@polymtl.ca> 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5from bcc import BPF, _get_num_open_probes, TRACEFS 6import os 7import sys 8from unittest import main, TestCase 9 10class TestKprobeCnt(TestCase): 11 def setUp(self): 12 self.b = BPF(text=""" 13 int wololo(void *ctx) { 14 return 0; 15 } 16 """) 17 self.b.attach_kprobe(event_re="^vfs_.*", fn_name="wololo") 18 19 def test_attach1(self): 20 actual_cnt = 0 21 with open("%s/available_filter_functions" % TRACEFS, "rb") as f: 22 for line in f: 23 if line.startswith(b"vfs_"): 24 actual_cnt += 1 25 open_cnt = self.b.num_open_kprobes() 26 self.assertEqual(actual_cnt, open_cnt) 27 28 def tearDown(self): 29 self.b.cleanup() 30 31 32class TestProbeGlobalCnt(TestCase): 33 def setUp(self): 34 self.b1 = BPF(text="""int count(void *ctx) { return 0; }""") 35 self.b2 = BPF(text="""int count(void *ctx) { return 0; }""") 36 37 def test_probe_quota(self): 38 self.b1.attach_kprobe(event="schedule", fn_name="count") 39 self.b2.attach_kprobe(event="submit_bio", fn_name="count") 40 self.assertEqual(1, self.b1.num_open_kprobes()) 41 self.assertEqual(1, self.b2.num_open_kprobes()) 42 self.assertEqual(2, _get_num_open_probes()) 43 self.b1.cleanup() 44 self.b2.cleanup() 45 self.assertEqual(0, _get_num_open_probes()) 46 47 48class TestAutoKprobe(TestCase): 49 def setUp(self): 50 self.b = BPF(text=""" 51 int kprobe__schedule(void *ctx) { return 0; } 52 int kretprobe__schedule(void *ctx) { return 0; } 53 """) 54 55 def test_count(self): 56 self.assertEqual(2, self.b.num_open_kprobes()) 57 58 def tearDown(self): 59 self.b.cleanup() 60 61 62class TestProbeQuota(TestCase): 63 def setUp(self): 64 self.b = BPF(text="""int count(void *ctx) { return 0; }""") 65 66 def test_probe_quota(self): 67 with self.assertRaises(Exception): 68 self.b.attach_kprobe(event_re=".*", fn_name="count") 69 70 def test_uprobe_quota(self): 71 with self.assertRaises(Exception): 72 self.b.attach_uprobe(name="c", sym_re=".*", fn_name="count") 73 74 def tearDown(self): 75 self.b.cleanup() 76 77 78class TestProbeNotExist(TestCase): 79 def setUp(self): 80 self.b = BPF(text="""int count(void *ctx) { return 0; }""") 81 82 def test_not_exist(self): 83 with self.assertRaises(Exception): 84 b.attach_kprobe(event="___doesnotexist", fn_name="count") 85 86 def tearDown(self): 87 self.b.cleanup() 88 89 90if __name__ == "__main__": 91 main() 92