• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright (c) PLUMgrid, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5import bcc
6import ctypes
7import errno
8import os
9import subprocess
10import shutil
11import time
12import unittest
13
14class TestUprobes(unittest.TestCase):
15    def test_simple_library(self):
16        text = """
17#include <uapi/linux/ptrace.h>
18BPF_ARRAY(stats, u64, 1);
19static void incr(int idx) {
20    u64 *ptr = stats.lookup(&idx);
21    if (ptr)
22        ++(*ptr);
23}
24int count(struct pt_regs *ctx) {
25    bpf_trace_printk("count() uprobe fired");
26    u32 pid = bpf_get_current_pid_tgid();
27    if (pid == PID)
28        incr(0);
29    return 0;
30}"""
31        test_pid = os.getpid()
32        text = text.replace("PID", "%d" % test_pid)
33        b = bcc.BPF(text=text)
34        b.attach_uprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
35        b.attach_uretprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
36        libc = ctypes.CDLL("libc.so.6")
37        libc.malloc_stats.restype = None
38        libc.malloc_stats.argtypes = []
39        libc.malloc_stats()
40        self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
41        b.detach_uretprobe(name="c", sym="malloc_stats", pid=test_pid)
42        b.detach_uprobe(name="c", sym="malloc_stats", pid=test_pid)
43
44    def test_simple_binary(self):
45        text = """
46#include <uapi/linux/ptrace.h>
47BPF_ARRAY(stats, u64, 1);
48static void incr(int idx) {
49    u64 *ptr = stats.lookup(&idx);
50    if (ptr)
51        ++(*ptr);
52}
53int count(struct pt_regs *ctx) {
54    u32 pid = bpf_get_current_pid_tgid();
55    incr(0);
56    return 0;
57}"""
58        b = bcc.BPF(text=text)
59        pythonpath = "/usr/bin/python3"
60        symname = "_start"
61        b.attach_uprobe(name=pythonpath, sym=symname, fn_name="count")
62        b.attach_uretprobe(name=pythonpath, sym=symname, fn_name="count")
63        with os.popen(pythonpath + " -V") as f:
64            pass
65        self.assertGreater(b["stats"][ctypes.c_int(0)].value, 0)
66        b.detach_uretprobe(name=pythonpath, sym=symname)
67        b.detach_uprobe(name=pythonpath, sym=symname)
68
69    def test_mount_namespace(self):
70        text = """
71#include <uapi/linux/ptrace.h>
72BPF_TABLE("array", int, u64, stats, 1);
73static void incr(int idx) {
74    u64 *ptr = stats.lookup(&idx);
75    if (ptr)
76        ++(*ptr);
77}
78int count(struct pt_regs *ctx) {
79    bpf_trace_printk("count() uprobe fired");
80    u32 pid = bpf_get_current_pid_tgid();
81    if (pid == PID)
82        incr(0);
83    return 0;
84}"""
85        # Need to import libc from ctypes to access unshare(2)
86        libc = ctypes.CDLL("libc.so.6", use_errno=True)
87
88        # Need to find path to libz.so.1
89        libz_path = None
90        p = subprocess.Popen(["ldconfig", "-p"], stdout=subprocess.PIPE)
91        for l in p.stdout:
92            n = l.split()
93            if n[0] == b"libz.so.1":
94                # if libz was already found, override only if new lib is more
95                # specific (e.g. libc6,x86-64 vs libc6)
96                if not libz_path or len(n[1].split(b",")) > 1:
97                    libz_path = n[-1]
98        p.wait()
99        p.stdout.close()
100        p = None
101
102        self.assertIsNotNone(libz_path)
103
104        # fork a child that we'll place in a separate mount namespace
105        child_pid = os.fork()
106        if child_pid == 0:
107            # Unshare CLONE_NEWNS
108            if libc.unshare(0x00020000) == -1:
109                e = ctypes.get_errno()
110                raise OSError(e, errno.errorcode[e])
111
112            # Remount root MS_REC|MS_PRIVATE
113            if libc.mount(None, b"/", None, (1<<14)|(1<<18) , None) == -1:
114                e = ctypes.get_errno()
115                raise OSError(e, errno.errorcode[e])
116
117            if libc.mount(b"tmpfs", b"/tmp", b"tmpfs", 0, None) == -1:
118                e = ctypes.get_errno()
119                raise OSError(e, errno.errorcode[e])
120
121            shutil.copy(libz_path, b"/tmp")
122
123            libz = ctypes.CDLL("/tmp/libz.so.1")
124            time.sleep(3)
125            libz.zlibVersion()
126            time.sleep(5)
127            os._exit(0)
128
129        libname = "/tmp/libz.so.1"
130        symname = "zlibVersion"
131        text = text.replace("PID", "%d" % child_pid)
132        b = bcc.BPF(text=text)
133        b.attach_uprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
134        b.attach_uretprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
135        time.sleep(5)
136        self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
137        b.detach_uretprobe(name=libname, sym=symname, pid=child_pid)
138        b.detach_uprobe(name=libname, sym=symname, pid=child_pid)
139        os.wait()
140
141if __name__ == "__main__":
142    unittest.main()
143