1#!/usr/bin/env python 2# 3# USAGE: test_usdt3.py 4# 5# Copyright 2018 Facebook, Inc 6# Licensed under the Apache License, Version 2.0 (the "License") 7 8from __future__ import print_function 9from bcc import BPF, USDT 10from unittest import main, TestCase 11from subprocess import Popen, PIPE 12import ctypes as ct 13import inspect, os, tempfile 14 15class TestUDST(TestCase): 16 def setUp(self): 17 common_h = b""" 18#include "folly/tracing/StaticTracepoint.h" 19 20static inline void record_val(int val) 21{ 22 FOLLY_SDT(test, probe, val); 23} 24 25extern void record_a(int val); 26extern void record_b(int val); 27""" 28 29 a_c = b""" 30#include <stdio.h> 31#include "common.h" 32 33void record_a(int val) 34{ 35 record_val(val); 36} 37""" 38 39 b_c = b""" 40#include <stdio.h> 41#include "common.h" 42 43void record_b(int val) 44{ 45 record_val(val); 46} 47""" 48 49 m_c = b""" 50#include <stdio.h> 51#include <unistd.h> 52#include "common.h" 53 54int main() { 55 while (1) { 56 record_a(1); 57 record_b(2); 58 record_val(3); 59 sleep(1); 60 } 61 return 0; 62} 63""" 64 # BPF program 65 self.bpf_text = """ 66BPF_PERF_OUTPUT(event); 67int do_trace(struct pt_regs *ctx) { 68 int result = 0; 69 bpf_usdt_readarg(1, ctx, &result); 70 event.perf_submit(ctx, &result, sizeof(result)); 71 return 0; 72}; 73""" 74 75 def _create_file(name, text): 76 text_file = open(name, "wb") 77 text_file.write(text) 78 text_file.close() 79 80 # Create source files 81 self.tmp_dir = tempfile.mkdtemp() 82 print("temp directory: " + self.tmp_dir) 83 _create_file(self.tmp_dir + "/common.h", common_h) 84 _create_file(self.tmp_dir + "/a.c", a_c) 85 _create_file(self.tmp_dir + "/b.c", b_c) 86 _create_file(self.tmp_dir + "/m.c", m_c) 87 88 # Compilation 89 # the usdt test:probe exists in liba.so, libb.so and a.out 90 include_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + "/include" 91 a_src = self.tmp_dir + "/a.c" 92 a_obj = self.tmp_dir + "/a.o" 93 a_lib = self.tmp_dir + "/liba.so" 94 b_src = self.tmp_dir + "/b.c" 95 b_obj = self.tmp_dir + "/b.o" 96 b_lib = self.tmp_dir + "/libb.so" 97 m_src = self.tmp_dir + "/m.c" 98 m_bin = self.tmp_dir + "/a.out" 99 m_linker_opt = " -L" + self.tmp_dir + " -la -lb" 100 self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + a_obj + " " + a_src), 0) 101 self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + b_obj + " " + b_src), 0) 102 self.assertEqual(os.system("gcc -shared -o " + a_lib + " " + a_obj), 0) 103 self.assertEqual(os.system("gcc -shared -o " + b_lib + " " + b_obj), 0) 104 self.assertEqual(os.system("gcc -I" + include_path + " " + m_src + " -o " + m_bin + m_linker_opt), 0) 105 106 # Run the application 107 self.app = Popen([m_bin], env=dict(os.environ, LD_LIBRARY_PATH=self.tmp_dir)) 108 # os.system("tplist.py -vvv -p " + str(self.app.pid)) 109 110 def test_attach1(self): 111 # enable USDT probe from given PID and verifier generated BPF programs 112 u = USDT(pid=int(self.app.pid)) 113 u.enable_probe(probe="probe", fn_name="do_trace") 114 b = BPF(text=self.bpf_text, usdt_contexts=[u]) 115 116 # processing events 117 self.probe_value_1 = 0 118 self.probe_value_2 = 0 119 self.probe_value_3 = 0 120 self.probe_value_other = 0 121 122 def print_event(cpu, data, size): 123 result = ct.cast(data, ct.POINTER(ct.c_int)).contents 124 if result.value == 1: 125 self.probe_value_1 = 1 126 elif result.value == 2: 127 self.probe_value_2 = 1 128 elif result.value == 3: 129 self.probe_value_3 = 1 130 else: 131 self.probe_value_other = 1 132 133 b["event"].open_perf_buffer(print_event) 134 for i in range(100): 135 if (self.probe_value_1 == 0 or 136 self.probe_value_2 == 0 or 137 self.probe_value_3 == 0 or 138 self.probe_value_other != 0): 139 b.perf_buffer_poll() 140 else: 141 break; 142 143 self.assertTrue(self.probe_value_1 != 0) 144 self.assertTrue(self.probe_value_2 != 0) 145 self.assertTrue(self.probe_value_3 != 0) 146 self.assertTrue(self.probe_value_other == 0) 147 148 def tearDown(self): 149 # kill the subprocess, clean the environment 150 self.app.kill() 151 self.app.wait() 152 os.system("rm -rf " + self.tmp_dir) 153 154if __name__ == "__main__": 155 main() 156