1#!/usr/bin/env python 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5from ctypes import c_uint, c_ulong, Structure 6from bcc import BPF 7import os 8from time import sleep 9import sys 10from unittest import main, TestCase 11 12arg1 = sys.argv.pop(1) 13arg2 = "" 14if len(sys.argv) > 1: 15 arg2 = sys.argv.pop(1) 16 17Key = None 18Leaf = None 19if arg1.endswith(".b"): 20 class Key(Structure): 21 _fields_ = [("fd", c_ulong)] 22 class Leaf(Structure): 23 _fields_ = [("stat1", c_ulong), 24 ("stat2", c_ulong)] 25 26class TestKprobe(TestCase): 27 def setUp(self): 28 b = BPF(arg1, arg2, debug=0) 29 self.stats = b.get_table("stats", Key, Leaf) 30 b.attach_kprobe(event=b.get_syscall_fnname("write"), fn_name="sys_wr") 31 b.attach_kprobe(event=b.get_syscall_fnname("read"), fn_name="sys_rd") 32 b.attach_kprobe(event="htab_map_get_next_key", fn_name="sys_rd") 33 34 def test_trace1(self): 35 with open("/dev/null", "a") as f: 36 for i in range(0, 100): 37 os.write(f.fileno(), b"") 38 with open("/etc/services", "r") as f: 39 for i in range(0, 200): 40 os.read(f.fileno(), 1) 41 for key, leaf in self.stats.items(): 42 print("fd %x:" % key.fd, "stat1 %d" % leaf.stat1, "stat2 %d" % leaf.stat2) 43 44if __name__ == "__main__": 45 main() 46